Neo4J Streams

Hi,

I am testing data from Kafka to Neo4J using Neo4J Streams sink process. After sending the data from Kafka producer, I dont see it got updated to Neo4J. I checked the logs and I dont see any error/issue.

kafka.zookeeper.connect=
kafka.bootstrap.servers=
streams.sink.topic.cypher.streaminp=
streams.sink.enabled=true

How to debug this issue. I use latest streams jar which is 4.0.1.

Hi @chasrini ,

could you please set the log level to DEBUG, restart Neo4j, retry sending an event via Kafka producer and then share the neo4j.log?
In order to set the log level to DEBUG, in the neo4j.conf file you have to set the following property:

dbms.logs.debug.level=DEBUG

Could you also please ensure that the cypher template you have used matches the event structure? If you could also share the Cypher template and a sample event it would be great.

Regards,

Mauro

Need help as well!!! I am also facing the same issue. Streams seems to work, these are my loks from neo4j.logs, the debug file had no outputs after queuing the data in kafka. I have tried about everything I can, I really want to be able to use just the streams plugin and record cud data. I can provide additional data as well, but neo4j streams does't even seem to open a listener on the kafka broker side. I am sending basic data in the form of json {name:"", name1:""....} to kafka streams using confluent-kafka for python. Thanks for anyone that can help!

This is my neo4j.conf sink configuration

kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

streams.sink.topic.cypher.friends=MERGE (p1:Person { name: event.initiated }) MERGE (p2:Person { name: event.accepted }) CREATE (p1)-[:FRIENDS { when: event.date }]->(p2)
streams.sink.enabled=true

streams.check.apoc.timeout=-1 # <ms to await for APOC being loaded, default -1 skip the wait>
streams.check.apoc.interval=1000 # <ms interval awaiting for APOC being loaded, default 1000>

This is my configuration in debug trace

2021-05-14 07:43:51.278+0000 INFO  Starting...
2021-05-14 07:43:54.047+0000 INFO  ======== Neo4j 4.2.1 ========
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:43:56.353+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.353+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 DEBUG [neo4j/2c0e4668] Adding listener for event: CONFIGURATION_INITIALIZED
2021-05-14 07:43:56.359+0000 INFO  [neo4j/2c0e4668] Adding listener for event type CONFIGURATION_INITIALIZED
2021-05-14 07:43:57.337+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:57.344+0000 DEBUG [neo4j/2c0e4668] Waiting for the Neo4j instance to be ready...
2021-05-14 07:43:57.806+0000 INFO  [neo4j/2c0e4668] Starting the connector lifecycle listener
2021-05-14 07:43:57.807+0000 INFO  [neo4j/2c0e4668] StreamsConfig started
2021-05-14 07:43:57.808+0000 DEBUG [neo4j/2c0e4668] Starting StreamsConfig
2021-05-14 07:43:58.588+0000 INFO  Called db.clearQueryCaches(): Query caches successfully cleared of 1 queries.
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] Detected new event change in configuration CONFIGURATION_INITIALIZED
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The event tree is [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.811+0000 DEBUG [neo4j/2c0e4668] The listenerMap contains the following listeners: [CONFIGURATION_INITIALIZED]
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Sink] Configuration is empty
2021-05-14 07:44:07.812+0000 DEBUG [neo4j/2c0e4668] [Source] Configuration is empty
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/***/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.
2021-05-14 07:44:09.797+0000 DEBUG Reading users from /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/data/dbms/auth.ini
2021-05-14 07:44:09.814+0000 INFO  Sending metrics to CSV file at /Users/****/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-9ceb05f0-7575-4c16-967e-a7b7943a5a1d/metrics
2021-05-14 07:44:09.828+0000 INFO  Bolt enabled on localhost:7687.
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDatabaseService] at [/db]
2021-05-14 07:44:09.850+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.ClusteringDbmsService] at [/dbms]
2021-05-14 07:44:09.851+0000 DEBUG Adding JAXRS classes [class com.neo4j.server.rest.causalclustering.LegacyClusteringRedirectService] at [/db/manage]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.rest.discovery.DiscoveryService] at [/]
2021-05-14 07:44:09.857+0000 DEBUG Adding JAXRS classes [class com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider] at [/]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.CypherResource, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter, class org.neo4j.server.http.cypher.format.output.eventsource.EventSourceMessageBodyWriter] at [/db]
2021-05-14 07:44:09.860+0000 DEBUG Adding JAXRS classes [class org.neo4j.server.http.cypher.LegacyTransactionService, class org.neo4j.server.http.cypher.format.input.json.JsonMessageBodyReader, class org.neo4j.server.http.cypher.format.output.json.JsonMessageBodyWriter] at [/db/data]
2021-05-14 07:44:09.931+0000 DEBUG Mounting servlet at [/dbms]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/manage]
2021-05-14 07:44:10.019+0000 DEBUG Mounting servlet at [/db/data]
2021-05-14 07:44:10.020+0000 DEBUG Mounting servlet at [/db]
2021-05-14 07:44:10.043+0000 DEBUG Mounting servlet at [/]
2021-05-14 07:44:10.790+0000 INFO  Remote interface available at http://localhost:7474/
2021-05-14 07:44:10.790+0000 INFO  Started.

Hi @IISuperLuminaLII,

i tested the latest version of the plugin and it works as expected. I used a simple kafka-console-producer to send some sample events formatted as follow:

{"initiated": "Foo", "accepted": "Bar", "date": "2022-01-26T16:54:58.737000000"}

Which version of the plugin are you using?

Mauro