How to configure Neo4j sink instances with Kafka and neo4j-streams?

I'm setting up two instances of neo4j (source and sink) with neo4j-streams plugin and Kafka between.

I've red official documentation and article on medium how to set up such cluster, but I cannot get sink instance work.

I'm using following docker-compose

version: '3'
services:
  neo4j-source:
    image: neo4j:3.4
    hostname: neo4j-source
    container_name: neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    depends_on:
      - kafka
    volumes:
      - ./plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/source
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_zookeeper_connect: zookeeper:12181
      NEO4J_kafka_bootstrap_servers: kafka:19092

  neo4j-sink:
    image: neo4j:3.4
    hostname: neo4j-sink
    container_name: neo4j-sink
    ports:
      - "7475:7474"
      - "7688:7687"
    depends_on:
      - kafka
    volumes:
      - ./plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_zookeeper_connect: zookeeper:12181
      NEO4J_kafka_bootstrap_servers: kafka:19092
      NEO4J_streams_sink_topic_cypher_neo4j:  "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092

When I'm creating a new User node in source neo4j

CREATE (n:User{name: "John Smith"})

I see message in Kafka:

{"payload":{"id":"0","before":null,"after":{"properties":{"name":"John Smith"},"labels":["User"]},"type":"node"},"meta":{"timestamp":1554075819827,"username":"neo4j","txId":4,"txEventId":0,"txEventsCount":1,"operation":"created","source":{"hostname":"neo4j-source"}},"schema":{"properties":[],"constraints":null}}

And logs from docker instances:

neo4j-source    | 2019-03-31 23:43:39.876+0000 DEBUG Trying to send a transaction event with txId 4 and txEventId 0 to kafka
neo4j-source    | 2019-03-31 23:43:40.167+0000 DEBUG Sent record in partition 0 offset 0 data neo4j key size 3
neo4j-sink      | 2019-03-31 23:43:40.228+0000 DEBUG Reading data from topic neo4j, with data [{payload={id=0, before=null, after={properties={name=John Smith}, labels=[User]}, type=node}, meta={timestamp=1554075819827, username=neo4j, txId=4, txEventId=0, txEventsCount=1, operation=created, source={hostname=neo4j-source}}, schema={properties=[], constraints=null}}]
neo4j-sink      | 2019-03-31 23:43:40.236+0000 DEBUG Processing 1 events, for topic neo4j with query: UNWIND {events} AS event WITH event.value.payload AS payload, event.value.meta AS meta FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END | MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE SET n += payload.after.properties ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END | MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE SET n += payload.after.properties ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END | MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE SET n += payload.after.properties ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END | MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE SET n += payload.after.properties ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END | MERGE (s:Answer{neo_id: toInteger(payload.start.id)}) MERGE (e:Question{neo_id: toInteger(payload.end.id)}) CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e) ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END | MERGE (s:Question{neo_id: toInteger(payload.start.id)}) MERGE (e:Tag{neo_id: toInteger(payload.end.id)}) CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e) ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END | MERGE (s:User{neo_id: toInteger(payload.start.id)}) MERGE (e:Answer{neo_id: toInteger(payload.end.id)}) CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e) ) FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END | MERGE (s:User{neo_id: toInteger(payload.start.id)}) MERGE (e:Question{neo_id: toInteger(payload.end.id)}) CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e) )
neo4j-sink      | 2019-03-31 23:43:41.975+0000 DEBUG Query statistics:
neo4j-sink      | <Nothing happened>

And, as you can see in logs, nothing created in sink.

I've tried simple queries, but cannot deliver any node in sink neo4j. How sink streams.sink.topic.cypher query should be changed? Thanks!

p.s. originally I've posted a question on StackOverflow, reposting here with a hope that someone can help me :slight_smile:

I've found a problem, seems like documentation contains an error in consumer config, events from kafka topic has no value in JSON.

Instead of

      ...
      NEO4J_streams_sink_topic_cypher_neo4j:  "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
      ...

Should be

      ...
      NEO4J_streams_sink_topic_cypher_neo4j:  "WITH event.payload AS payload, event.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
      ...

And it works fine, logs:

neo4j-sink      | 2019-04-01 21:21:09.263+0000 DEBUG Query statistics:
neo4j-sink      | Nodes created: 1
neo4j-sink      | Properties set: 2
neo4j-sink      | Labels added: 1