Showing results for 
Search instead for 
Did you mean: 

moving changed records from neo4j to kafka


I want to move changes records from neo4j to kafka.

I can do it with Neo4j Streams plugin well but it is announced that the plugin is going to be deprecated after version 5.

So, it is recommended to use kafka connect neo4j connector for this task. According to the official tutorial, I need to create a connector on the kafka side similar to following configuration. Based on my experiments, this really checks the timestamp property of TestSource node. If I set timestamp property of the node when I update it, it really moves data from neo4j to kafka. However, if I don't set timestamp property when I update the node, nothing is going to move kafka. Neo4j does not provide a auto-filled property similar to relational databases. 



  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "topic": "my-topic",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema_registry:8081",
    "value.converter.schema.registry.url": "http://schema_registry:8081",
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.streaming.poll.interval.msecs": 5000,
    "": "timestamp",
    "neo4j.streaming.from": "NOW",
    "neo4j.enforce.schema": true,
    "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN AS name, ts.timestamp AS timestamp"



 The question is that cannot I move changes records from neo4j to kafka with kafka connect neo4j connector if I do not have a last update property in my nodes?



I plan to set this timestamp property if it is not set in the update statement with triggers. In that way, kafka connect neo4j connector will be able to track changes.

CALL apoc.trigger.add('setLastUpdate', "
UNWIND keys($assignedNodeProperties) AS k
UNWIND $assignedNodeProperties[k] AS map
WITH map.node AS node
WHERE id(n) = id(node) AND NOT 'timestamp' in keys($assignedNodeProperties)
SET n.timestamp= timestamp()
", {phase: 'afterAsync'})

This triggger will add timestamp property for all node types. You can customize the node type in the line 5 as MATCH (n:SpecificNode).

Still, you have an alternative approach, I will be appreciate if you share here.