Hy guys,
I'm running a docker container with multiple Neo4j instances and Kafka Connect.
In one Neo4j instance I want to to ingest CDC events coming from another Neo4j Instance with a Neo4j Sink Connector.
I want to use the schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined into the graph model.
But the events sink in with the sourceId strategy and get only the default label (:SourceEvent
), other labels get discarded.
I setup the schema strategy with this rest-api call to Kafka connect
(since i want to ingest cdc events from the kg-cdc topic, I added the "neo4j.topic.cdc.schema":"kg-cdc"
):
{
"name": "sink-cdc",
"config": {
"topics": "kg-cdc",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "error-kg2app",
"errors.deadletterqueue.topic.replication.factor":"1",
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neoapp:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "pass",
"neo4j.encryption.enabled": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"neo4j.topic.cdc.schema":"kg-cdc"
}
}
and get the next Neo4jSinkConnectorConfig values in Kafka connect:
INFO Neo4jSinkConnectorConfig values:
neo4j.authentication.basic.username = neo4j
neo4j.authentication.basic.username = neo4j
neo4j.authentication.kerberos.ticket = [hidden]
neo4j.authentication.type = BASIC
neo4j.batch.size = 1000
neo4j.batch.timeout.msecs = 30000
neo4j.connection.acquisition.timeout.msecs = 60000
neo4j.connection.liveness.check.timeout.msecs = 60000
neo4j.connection.max.lifetime.msecs = 3600000
neo4j.connection.max.pool.size = 100
neo4j.encryption.ca.certificate.path =
neo4j.encryption.enabled = false
neo4j.encryption.trust.strategy = TRUST_ALL_CERTIFICATES
neo4j.load.balance.strategy = LEAST_CONNECTED
neo4j.retry.backoff.msecs = 30000
neo4j.retry.max.attemps = 5
neo4j.server.uri = bolt://neoapp:7687
neo4j.topic.cdc.schema = kg-cdc
neo4j.topic.cdc.sourceId =
neo4j.topic.cdc.sourceId.idName = sourceId
neo4j.topic.cdc.sourceId.labelName = SourceEvent
(streams.kafka.connect.sink.Neo4jSinkConnectorConfig)
But on the other hand when I use the neo4j streams plugin for neo, the schema strategy works.
Does anybody know what I might have done wrong or has anyone run into the same problem?
Thank you in advance