hunter
(Sai)
March 5, 2020, 11:02am
1
We are using neo4j 3.5.4 version.
We have nearly 200 nodes and relationships. We want to capture all nodes and relationships data even for all CURD operations through Kafka .
In neo4j .conf how I need to configure for all nodes and relationships for topic?
streams.source.topic.nodes.=
streams.source.topic.relationships.=
Can you please give provide your inputs?
Thanks
Hi @hunter ,
just a little premise: if you are still using Neo4j 3.5, i recommend to use the latest available Neo4j Streams plugin version for the 3.5 (it is the 3.5.12: Release Neo4j Streams 3.5.12 · neo4j-contrib/neo4j-streams · GitHub ).
Another premise i would like to do is that with the Neo4j Streams CDC module you can capture only the create/update/delete operations (not the read operations).
That said, if you want to capture all the created/updated/deleted nodes and relationships, you have to simply enable the CDC module:
streams.source.enabled=true
Doing so, the plugin will create a default topic named "neo4j" which will contain all the create/update/delete events.
Hope this helps.
Mauro
tinleo
(Tinleo)
June 11, 2021, 3:08am
3
Hi @mroiter-larus ,
I have used the neo4j streams 3.5.12, but I got the error ' Timed out waiting for a node assignment.'
these are the configuration that I putted in neo4j.conf:
kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
or
streams.source.enabled=true
streams.source.topic.nodes.person=Person{name}
or
streams.sink.enabled=true
streams.sink.topic.cypher.person=CYPHER-QUERY
1 Like
klug
(f )
November 4, 2021, 9:59am
4
I always receive the same error message when starting the neo4j kafka connector example. Does anybody know a fix for this?
Hi @tinleo ,
It seems that Neo4j can't connect to Kafka. Are you sure the zookeeper url and the bootstrap server url are correct? Are you sure Zookeeper and Kafka are correctly up and running?
Mauro
Hi @klug ,
which is the example are you referring to? Could you please share the connector json configuration?
Which version of the connector are you using?
Mauro
klug
(f )
January 27, 2022, 6:11am
7
Hi! I was using the configuration from the documentation:
---
version: "2"
services:
neo4j:
image: neo4j:4.1.9
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
schema_registry:
image: confluentinc/cp-schema-registry:latest
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: zookeeper:2181
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9093
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema_registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
command:
# - bash
# - -c
# - |
# confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.6
/etc/confluent/docker/run
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Hi @klug ,
it seems that in the documentation there are some inconsistencies. I'm going to fix them asap.
In the meantime, you can find here a working docker-compose file and a step-by-step guide to quickstart with the Neo4j Kafka Connector.
Download the docker-compose.yml
and the sink_config.json
files and place them into your desired folder. Inside this folder create also a plugins
folder
Download the latest connector version (you can find it here ). Unzip the downloaded package into the plugins
folder created in the previous step
Open a terminal window and cd to the folder where the docker-compose file is. Run docker-compose up -d
Once the environment is up and running, create, for example, the sink instance by running the following command: curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @sink_config.json
. If the installation process completes successfully you should see the json file's content as output. However, you can also verify the connect
container logs.
Now connect to the Kafka container: docker exec -it broker /bin/bash
Create a kafka-console-producer
on the topic my-topic
: kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
Send a json event like the following: {"name": "Foo", "surname": "Bar"}
Here is the docker-compose.yml
:
---
version: "2"
services:
neo4j:
image: neo4j:4.1.9
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_kafka_bootstrap_servers: PLAINTEXT://broker:9093
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:latest
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: "true"
CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"
connect:
image: confluentinc/cp-kafka-connect:latest
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:9093
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center:latest
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: broker:9093
CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONTROL_CENTER_CONNECT_CLUSTER: connect:8083
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
and the sink_config.json
:
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "my-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "connect",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
}
}
Hope this helps.
Mauro
1 Like
Hi @klug ,
Have you had the chance to test the provided configurations?
Regards,
Mauro
1 Like
klug
(f )
February 11, 2022, 9:20am
10
Hi @mroiter-larus ,
the docker-compose works fine, thank you so much for this!
I am currently trying to get step 6 and 7 to run and I need to research the commands for that.
Best regards
Fabi
Hi @klug ,
about the step 6, i've already provide the command. You just need to connect to the broker
container and run that command. It will open a shell where you just need to copy and paste the event provided into step 7 and then press enter to send it to the topic.
Hope this helps.
Mauro
klug
(f )
February 23, 2022, 11:12am
12
Hi @mroiter-larus .
Okay I will check if it works in my case. The last time I tried I got error messages when I ran "docker exec -it broker /bin/bash" but I solved them. In the next step the command is my-topic
: kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
?
Do you also have a source configuration for neo4j?
Best regards
Fabi
Hi @klug ,
the command to create a console producer is just:
kafka-console-producer --bootstrap-server broker:9093 --topic my-topic
about the source configuration you can take the following as an example:
{
"name": "Neo4jSourceConnector",
"config": {
"topic": "topic-source",
"connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.database": "testdb",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"neo4j.streaming.poll.interval.msecs": 5000,
"neo4j.streaming.property": "timestamp",
"neo4j.streaming.from": "NOW",
"neo4j.enforce.schema": false,
"neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
}
}
Please refer to the official documentation for further details on the source connector.
Regards,
Mauro
klug
(f )
March 8, 2022, 12:22pm
14
Hi @mroiter-larus ,
Thank you for your help, I was able to send the event over the console to kafka, that is a good step for me.
Now I need to automate/trigger events from the database to kafka. I will take a closer look to the documentation.
Thanks again!
Best regards
Fabi