cancel
Showing results for 
Search instead for 
Did you mean: 

Neo4j Sink not recieving Kafka Topic events

Hi,

I have used docker compose to set up an instance of Neo4j with the streams plug in and the apoc procedures. Code is below:

neo4j-sink:
    image: neo4j:3.5
    hostname: neo4j-sink
    container_name: neo4j-sink
#    depends_on:
#      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
       - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: 0.0.0.0:2181
      NEO4J_kafka_bootstrap_servers: 0.0.0.0:9092
#      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max_size: 2G
      NEO4J_kafka_max_poll_records: 10000
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_kafka_group_id: "neo4j_sink_1"
      NEO4J_enable_auto_commit: "true"
      NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description"

This has led to the sink initialising and listening to the topic:

neo4j-sink    | 2019-09-14 09:45:54.962+0000 INFO  Starting the Kafka Sink
neo4j-sink    | 2019-09-14 09:45:55.473+0000 INFO  Creating Sink daemon Job
neo4j-sink    | 2019-09-14 09:45:55.478+0000 DEBUG Subscribed topics with Cypher queries: {ARTICLECOMMIT=WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description}
neo4j-sink    | 2019-09-14 09:45:55.479+0000 DEBUG Subscribed topics with CDC configuration: {CDC_SOURCE_ID=[], CDC_SCHEMA=[]}
neo4j-sink    | 2019-09-14 09:45:55.480+0000 INFO  Kafka Sink started
neo4j-sink    | 2019-09-14 09:45:55.480+0000 INFO  Streams Sink module initialised

however from the command line producer OR when I send a message through my stream, it does not register in neo4J. I can confirm that the topic is recieving messages from my streams; I checked through the Kafka-console-consumer and listening in the stream. Please advise if there is anything I am doing wrong.

i tried this:

$ kafka-console-producer --broker-list 0.0.0.0:9092 --topic ARTICLECOMMIT
>{"ID":12345,"og_descrpition":"12312312"}

still did not work

4 REPLIES 4

conker84
Graph Voyager

Hi,
there is an error into the documentation please try with this:
NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"

Hi Conker84, unfortunately it is still not working!

Can you share the full compose file?

hello Conker84

version: '3'

services:

  neo4j-sink:
    image: neo4j:3.5
    hostname: neo4j-sink
    container_name: neo4j-sink
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
       - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: 0.0.0.0:2181
      NEO4J_kafka_bootstrap_servers: 0.0.0.0:9092

      NEO4J_dbms_memory_heap_max_size: 2G
      NEO4J_kafka_max_poll_records: 10000
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_kafka_group_id: "neo4j_sink_1"
      NEO4J_enable_auto_commit: "true"
      NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"

I can confirm that zookeeper is active at that port and bootstrap server is at 9092

and there are messages being published to that particular topic through a java producer