Kafka Integration: How to configure streams.source.topic.nodes and streams.source.topic.relationships to capture all nodes and relationships?

We are using neo4j 3.5.4 version.

We have nearly 200 nodes and relationships. We want to capture all nodes and relationships data even for all CURD operations through Kafka .

In neo4j .conf how I need to configure for all nodes and relationships for topic?

streams.source.topic.nodes.=
streams.source.topic.relationships.=

Can you please give provide your inputs?

Thanks

Hi @hunter ,

just a little premise: if you are still using Neo4j 3.5, i recommend to use the latest available Neo4j Streams plugin version for the 3.5 (it is the 3.5.12: Release Neo4j Streams 3.5.12 · neo4j-contrib/neo4j-streams · GitHub).

Another premise i would like to do is that with the Neo4j Streams CDC module you can capture only the create/update/delete operations (not the read operations).

That said, if you want to capture all the created/updated/deleted nodes and relationships, you have to simply enable the CDC module:

streams.source.enabled=true

Doing so, the plugin will create a default topic named "neo4j" which will contain all the create/update/delete events.

Hope this helps.

Mauro

Hi @mroiter-larus ,
I have used the neo4j streams 3.5.12, but I got the error ' Timed out waiting for a node assignment.'


these are the configuration that I putted in neo4j.conf:

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092

or

streams.source.enabled=true
streams.source.topic.nodes.person=Person{name}

or

streams.sink.enabled=true
streams.sink.topic.cypher.person=CYPHER-QUERY

1 Like

I always receive the same error message when starting the neo4j kafka connector example. Does anybody know a fix for this?

Hi @tinleo,

It seems that Neo4j can't connect to Kafka. Are you sure the zookeeper url and the bootstrap server url are correct? Are you sure Zookeeper and Kafka are correctly up and running?

Mauro

Hi @klug,

which is the example are you referring to? Could you please share the connector json configuration?
Which version of the connector are you using?

Mauro

Hi! I was using the configuration from the documentation:

---
version: "2"
services:
  neo4j:
    image: neo4j:4.1.9
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: "true"
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

  schema_registry:
    image: confluentinc/cp-schema-registry:latest
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: zookeeper:2181

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:9093
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command:
      #      - bash
      #      - -c
      #      - |
      #        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.6
      /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
      CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Hi @klug,

it seems that in the documentation there are some inconsistencies. I'm going to fix them asap.
In the meantime, you can find here a working docker-compose file and a step-by-step guide to quickstart with the Neo4j Kafka Connector.

  1. Download the docker-compose.yml and the sink_config.json files and place them into your desired folder. Inside this folder create also a plugins folder
  2. Download the latest connector version (you can find it here). Unzip the downloaded package into the plugins folder created in the previous step
  3. Open a terminal window and cd to the folder where the docker-compose file is. Run docker-compose up -d
  4. Once the environment is up and running, create, for example, the sink instance by running the following command: curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @sink_config.json. If the installation process completes successfully you should see the json file's content as output. However, you can also verify the connect container logs.
  5. Now connect to the Kafka container: docker exec -it broker /bin/bash
  6. Create a kafka-console-producer on the topic my-topic: kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
  7. Send a json event like the following: {"name": "Foo", "surname": "Bar"}

Here is the docker-compose.yml:

---
version: "2"

services:
  neo4j:
    image: neo4j:4.1.9
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: "true"
      CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"

  connect:
    image: confluentinc/cp-kafka-connect:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:9093
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center:latest
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
      CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

and the sink_config.json:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

Hope this helps.

Mauro

1 Like

Hi @klug,

Have you had the chance to test the provided configurations?

Regards,

Mauro

1 Like

Hi @mroiter-larus,
the docker-compose works fine, thank you so much for this!

I am currently trying to get step 6 and 7 to run and I need to research the commands for that.


Best regards
Fabi

Hi @klug ,

about the step 6, i've already provide the command. You just need to connect to the broker container and run that command. It will open a shell where you just need to copy and paste the event provided into step 7 and then press enter to send it to the topic.

Hope this helps.

Mauro

Hi @mroiter-larus.
Okay I will check if it works in my case. The last time I tried I got error messages when I ran "docker exec -it broker /bin/bash" but I solved them. In the next step the command is my-topic : kafka-console-producer --bootstrap-server broker:9093 --topic my-topic ?
Do you also have a source configuration for neo4j?

Best regards
Fabi

Hi @klug,

the command to create a console producer is just:

kafka-console-producer --bootstrap-server broker:9093 --topic my-topic

about the source configuration you can take the following as an example:

{
  "name": "Neo4jSourceConnector",
  "config": {
    "topic": "topic-source",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.database": "testdb",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.property": "timestamp",
    "neo4j.streaming.from": "NOW",
    "neo4j.enforce.schema": false,
    "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
  }
}

Please refer to the official documentation for further details on the source connector.

Regards,

Mauro

Hi @mroiter-larus,
Thank you for your help, I was able to send the event over the console to kafka, that is a good step for me.
Now I need to automate/trigger events from the database to kafka. I will take a closer look to the documentation.
Thanks again!


Best regards
Fabi