I have setup my Kafka connector by following Neo4j Documentation and testing the connector by publishing messages using Kafka and expecting the message to be transformed to a Neo4j node. Here is my Sink configuration,
{
"name": "SomeName",
"config": {
"topics": "SomeTopicName",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.database": "someDb",
"neo4j.server.uri": "neo4j://X.X.X.X:XXXX",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.batch.parallelize": false,
"neo4j.connection.max.pool.size": 500,
"neo4j.topic.cypher.SomeTopicName": "query"
}
The basic injection works fine, like broadcasting 10k messages is showing 10k nodes in Neo4j. But, when I broadcast 500k messages, I see only 350k nodes in Neo4j. I don't see any error in connector logs or Neo4j logs. If I run the broadcast with the same set of 500k nodes again, missing nodes are added. Like I need to repeat this run until I see the complete 500k nodes in Neo4j.
Expected Behavior
I want to see all nodes in Neo4j or an error for missing nodes
Actual Behavior
Streamed Kafka messages are missing in Neo4j Database
How to Reproduce the Problem
I used basic cypher query,
MERGE(person:Person {uri: 'http://company#Person/' + event.personid})
ON MATCH SET person.hasdescription=event.hasdescription, person.hascount=event.hascount
ON CREATE SET person.hasdescription=person.hasdescription, person.hascount=event.hascount return count(*) as count
Steps
- Create a Connector
- Broadcast Message to Topic
- Check In Neo4j
Versions
- OS: Centos
- neo4j:5-enterprise
- confluentinc/cp-enterprise-control-center:7.3.0
- cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
- confluentinc/cp-schema-registry:7.3.0
- confluentinc/cp-server:7.3.0
- confluentinc/cp-zookeeper:7.3.0