Missing nodes in Neo4j with "Neo4j Kafka Connector Sink"

I have setup my Kafka connector by following Neo4j Documentation and testing the connector by publishing messages using Kafka and expecting the message to be transformed to a Neo4j node. Here is my Sink configuration,

{
"name": "SomeName",
"config": {
"topics": "SomeTopicName",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.database": "someDb",
"neo4j.server.uri": "neo4j://X.X.X.X:XXXX",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.batch.parallelize": false,
"neo4j.connection.max.pool.size": 500,
"neo4j.topic.cypher.SomeTopicName": "query"
}

The basic injection works fine, like broadcasting 10k messages is showing 10k nodes in Neo4j. But, when I broadcast 500k messages, I see only 350k nodes in Neo4j. I don't see any error in connector logs or Neo4j logs. If I run the broadcast with the same set of 500k nodes again, missing nodes are added. Like I need to repeat this run until I see the complete 500k nodes in Neo4j.

Expected Behavior

I want to see all nodes in Neo4j or an error for missing nodes

Actual Behavior

Streamed Kafka messages are missing in Neo4j Database

How to Reproduce the Problem

I used basic cypher query,

MERGE(person:Person {uri: 'http://company#Person/' + event.personid}) 
	ON MATCH SET person.hasdescription=event.hasdescription, person.hascount=event.hascount 
	ON CREATE SET person.hasdescription=person.hasdescription, person.hascount=event.hascount  return count(*) as count 

Steps

  1. Create a Connector
  2. Broadcast Message to Topic
  3. Check In Neo4j

Versions

  • OS: Centos
  • neo4j:5-enterprise
  • confluentinc/cp-enterprise-control-center:7.3.0
  • cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
  • confluentinc/cp-schema-registry:7.3.0
  • confluentinc/cp-server:7.3.0
  • confluentinc/cp-zookeeper:7.3.0

The issue is from the broadcasting end, it's not the problem with the connector. Please close this issue.