Neo4j Sink Connector: CDC ingestion with schema strategy not working properly

Hy guys,

I'm running a docker container with multiple Neo4j instances and Kafka Connect.
In one Neo4j instance I want to to ingest CDC events coming from another Neo4j Instance with a Neo4j Sink Connector.

I want to use the schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined into the graph model.

But the events sink in with the sourceId strategy and get only the default label (:SourceEvent), other labels get discarded.

I setup the schema strategy with this rest-api call to Kafka connect
(since i want to ingest cdc events from the kg-cdc topic, I added the "neo4j.topic.cdc.schema":"kg-cdc"):

{
  "name": "sink-cdc",
  "config": {
    "topics": "kg-cdc",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.deadletterqueue.topic.name": "error-kg2app",
    "errors.deadletterqueue.topic.replication.factor":"1",
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neoapp:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "pass",
    "neo4j.encryption.enabled": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
     "value.converter.schemas.enable": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    
    "neo4j.topic.cdc.schema":"kg-cdc"	
  }
}

and get the next Neo4jSinkConnectorConfig values in Kafka connect:

INFO Neo4jSinkConnectorConfig values: 
neo4j.authentication.basic.username = neo4j
neo4j.authentication.basic.username = neo4j  
neo4j.authentication.kerberos.ticket = [hidden]
neo4j.authentication.type = BASIC
neo4j.batch.size = 1000
neo4j.batch.timeout.msecs = 30000
neo4j.connection.acquisition.timeout.msecs = 60000
neo4j.connection.liveness.check.timeout.msecs = 60000
neo4j.connection.max.lifetime.msecs = 3600000
neo4j.connection.max.pool.size = 100
neo4j.encryption.ca.certificate.path =
neo4j.encryption.enabled = false
neo4j.encryption.trust.strategy = TRUST_ALL_CERTIFICATES
neo4j.load.balance.strategy = LEAST_CONNECTED
neo4j.retry.backoff.msecs = 30000
neo4j.retry.max.attemps = 5
neo4j.server.uri = bolt://neoapp:7687
neo4j.topic.cdc.schema = kg-cdc
neo4j.topic.cdc.sourceId =
neo4j.topic.cdc.sourceId.idName = sourceId
neo4j.topic.cdc.sourceId.labelName = SourceEvent
(streams.kafka.connect.sink.Neo4jSinkConnectorConfig)

But on the other hand when I use the neo4j streams plugin for neo, the schema strategy works.
Does anybody know what I might have done wrong or has anyone run into the same problem?

Thank you in advance :grin:

1 Like

Hi LampicJ15,

here you will find a working docker-compose file which prepare an environment with:

  • a Neo4j instance that act as source
  • a Neo4j instance that act as sink
  • a Kafka broker
  • a Kafka Sink Connector

docker-compose.yml

version: '3'

services:
  neo4j-source:
    image: neo4j:3.5
    hostname: neo4j-source
    container_name: neo4j-source
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8474:7474"
      - "8687:7687"
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/source
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_source_topic_nodes_neo4jnodes: Movie{*};Person{*}
      NEO4J_streams_source_topic_relationships_neo4jrelationships: DIRECTED{*};ACTED_IN{*}
      NEO4J_streams_source_schema_polling_interval: 10000

  neo4j-sink:
    image: neo4j:3.5
    hostname: neo4j-sink
    container_name: neo4j-sink
    depends_on:
      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_logs_debug_level: DEBUG

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      
  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command: 
      - bash 
      - -c 
      - |
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.2
        /etc/confluent/docker/run

and a JSON which configure a Kafka SINK connector:

contrib.sink.string-json.neo4j.json

{
  "name": "Neo4jSinkConnectorJSON",
  "config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "topics": "neo4jnodes,neo4jrelationships",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j-sink:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "sink",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cdc.schema": "neo4jnodes;neo4jrelationships"
  }
}

In this example configuration i've used the Movie dataset.
Let's follow theese steps:

  1. execute command docker-compose up -d
  2. Configure the Kafka SINK connector:
curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @contrib.sink.string-json.neo4j.json
  1. now go to localhost:8474 (which is the Neo4j source instance) from your browser and login with the credentials provided into the docker-compose file
  2. create the constraints for the Movie dataset in order to speed up the graph creation process:
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Movie) ASSERT p.title IS UNIQUE;
  1. now create the Movie graph typing :play movie-graph and following the second step of the wizard
  2. if you access into the Neo4j sink instance (localhost:7474) you can see that nodes and relationships created into the Neo4j source instance have been replicated

Let me know if this helps.

Regards,
Mauro

1 Like

For completeness of information, the thing that makes it works is the property NEO4J_streams_source_schema_polling_interval. In this case it's value is 10000 ms, this means that schema changes will be polled every 10 seconds or in other words how quickly the database picks up new indexes/schema changes.
Please note that if you don't specify a custom value, the default is 300000 milliseconds, that means you have to wait 5 minutes before the Streams plugin polls the DB in order to retrieve schema changes and store them.
At the following links you will find some documentation about this property:

Hey @mauro.roiter thank you for the quick reply.
I have tried your example with the movie dataset and the belonging docker-compose.yml but it still does not work.

But the interesting part was when I modified your contrib.sink.string-json.neo4j.json to work for my example, the nodes and the relationships were replicated but again with the sourceId strategy insted of the schema strategy.

Hi @LampicJ15,
could you please share your CDC configuration? Which versions are you using for Neo4j Streams plugin and Kafka Connect Sink plugin?

Hi @mauro.roiter,

these are the neo4j configurations for the source instance
(where I am using neo4j-streams-3.5.4 plugin)

dbms.security.procedures.unrestricted=algo.*,apoc.*,sc.*
dbms.security.procedures.whitelist=algo.*,apoc.*,hekovnik.*
apoc.trigger.enabled=true

#neo4j streams CDC source config
kafka.zookeeper.connect=zk:2181
kafka.bootstrap.servers=kafka:9092

streams.source.topic.nodes.kg-cdc=Movie{*};Person{*}
streams.source.topic.relationships.kg-cdc=DIRECTED{*};ACTED_IN{*}

streams.source.enabled=true
streams.source.schema.polling.interval=10000
streams.procedures.enabled=true

As for the kafka connect sink plugin I am using kafka-connect-neo4j:1.0.2

Hi @LampicJ15,
there is a bug in the kafka-connect-sink about the ingestion strategy resolution:

I created this issue to track it:

I also worked to a fix can you please try to download this package:

And replace the connect service provided by Mauro with this one:

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

please consider that you must place the unzipped downloaded package into the plugin directory
placed in the same docker-compose.yml file.

I look forward to your feedback.
Thanks a lot!
Andrea

1 Like

Hey @conker84 thank you for the quick response, but the sink connector with the schema ingestion strategy is still not working properly. It sinks nodes but not relationships, I found the bug in SchemaIngestionStrategy.kt in the function prepareRelationshipEvents
when getting startNodeConstraints and endNodeConstraints, because the function getNodeConstraints always returns an empty list. That happens because there are no node constraints defined in the cdc format for relationships and also the function getNodeConstraints does not handle relationship cdc records.

1 Like

@LampicJ15 can you provide a simple dataset with related constrains that is not correctly replicated?

A simple example is with the movie dataset:

In the source Neo4j I set these constraints:

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Movie) ASSERT p.title IS UNIQUE;

And then load the movie graph data set :play movie-graph (second step).

After that I create a sink connector which only sinks in the nodes.
So I get all of the 171 nodes but no relationships.

I suspect that the problem is with the getNodeConstraints function:

private fun getNodeConstraints(event: StreamsTransactionEvent,
                                   filter: (Constraint) -> Boolean): List<Constraint> = event.schema.constraints.filter { filter(it) }

which tries to get node constraints from the relationship payload. But there are no constraints defined in the relationship payload. The CDC record for the relationship in this example is the following:

{"meta": {"timestamp":1574859738518,"username":"neo4j","txId":34,"txEventId":366,"txEventsCount":424,"operation":"created","source":{"hostname":"neo"}}, "payload": {"id":"33", "start":{"id":"23","labels":["Person"],"ids":{"name":"Kevin Pollak"}}, "end":{"id":"15","labels":["Movie"],"ids":{"title":"A Few Good Men"}}, "before":null, "after": {"properties":{"roles":["Lt. Sam Weinberg"]}},"label":"ACTED_IN","type":"relationship"}, "schema":{"properties":{"roles":"String[]"}, "constraints":[]}}

As you can see the field at the end schema.cosntraints is empty.

Oh I see, we already fixed that:

It's in the master branch but we didn't release it yet, I complied a version that contains the fix for you:

I look forward to your feedback.

wrong link please use this:

1 Like

@conker84 thank you for the quick response, the source and sink now work properly.

1 Like