Hello,
We are running kafka connect with the neo4j sink connector (latest version).
The process itself is very slow, we have around 2M messages lag between kafka to neo4j.
The sink topic has 40 partitions.
The main error we see in the logs is this:
connect-br-cloud | [2023-06-06 05:45:25,512] WARN WorkerSinkTask{id=neo4j-br-cloud-1} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-br-cloud | [2023-06-06 05:45:30,502] WARN WorkerSinkTask{id=neo4j-br-cloud-14} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-br-cloud | [2023-06-06 05:45:43,139] WARN WorkerSinkTask{id=neo4j-br-cloud-17} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-br-cloud | [2023-06-06 05:45:51,230] WARN WorkerSinkTask{id=neo4j-br-cloud-12} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-br-cloud | [2023-06-06 05:45:51,728] WARN WorkerSinkTask{id=neo4j-br-cloud-10} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect-br-cloud | [2023-06-06 05:45:54,292] WARN WorkerSinkTask{id=neo4j-br-cloud-11} Commit of offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
The neo4j connector configuration is:
"tasks.max": 20,
"topics": "sink.neo4j.hierarchy.v0",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"errors.retry.timeout": "0",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "none",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "sink.neo4j.hierarchy.v0.dlq",
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "neo4j+s://someurl:7687",
"neo4j.authentication.basic.username": "someuser",
"neo4j.authentication.basic.password": "somepass",
"neo4j.topic.cud": "sink.neo4j.hierarchy.v0",
"neo4j.connection.max.pool.size": "100",
"neo4j.batch.size": "300",
"max.poll.records": "300",
"neo4j.batch.parallelize": true
Any idea what can cause it ?