Kafka Consumer - Error Expected space separating root-level values

I'm trying out kafka + neo4j and I'm just at the quickstart step trying to get things to work. I have the plugin installed, I have kafka running producing and consuming topics. When Neo4j is consuming kafka I'm getting this in my logs:

2020-01-23 05:48:31.357+0000 INFO  ======== Neo4j 3.5.14 ========
2020-01-23 05:48:31.367+0000 INFO  Starting...
2020-01-23 05:48:33.128+0000 INFO  Initiating metrics...
2020-01-23 05:48:34.180+0000 INFO  Initialising the Streams Source module
2020-01-23 05:48:34.223+0000 INFO  Initializing Kafka Connector
2020-01-23 05:48:35.678+0000 INFO  Kafka Connector started
2020-01-23 05:48:35.682+0000 INFO  Streams Source module initialised
2020-01-23 05:48:36.045+0000 INFO  Initialising the Streams Sink module
2020-01-23 05:48:37.558+0000 INFO  Starting the Kafka Sink
2020-01-23 05:48:37.844+0000 INFO  Creating Sink daemon Job
2020-01-23 05:48:37.873+0000 INFO  Subscribed topics: [first-topic]
2020-01-23 05:48:37.874+0000 INFO  Kafka Sink started
2020-01-23 05:48:45.539+0000 INFO  Sending metrics to CSV file at C:\Users\miker\.Neo4jDesktop\neo4jDatabases\database-8c2cb7bc-a2cf-4264-926b-eeb11df003a1\installation-3.5.14\metrics
2020-01-23 05:48:47.098+0000 INFO  Bolt enabled on 0.0.0.0:7687.
2020-01-23 05:48:48.000+0000 INFO  Started.
2020-01-23 05:48:48.132+0000 INFO  Mounted REST API at: /db/manage
2020-01-23 05:48:48.223+0000 INFO  Server thread metrics have been registered successfully
2020-01-23 05:48:49.142+0000 INFO  Remote interface available at http://localhost:7474/
2020-01-23 05:49:11.052+0000 ERROR Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
 at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer) Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
 at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=streams.service.errors.ProcessingError: Error processing 1 messages
ErrorData(originalTopic=first-topic, timestamp=1579758549174, partition=1, offset=3, exception=com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): Expected space separating root-level values
 at [Source: (String)"10:49"; line: 1, column: 4], key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer), key=null, value=10:49, executingClass=class streams.kafka.KafkaAutoCommitEventConsumer)
	at streams.service.errors.KafkaErrorService.report(KafkaErrorService.kt:37)
	at streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95)
	at streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:85)
	at streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:132)
	at streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:95)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
	at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
	at kotlinx.coroutines.AbstractContinuation.run(AbstractContinuation.kt:19)
	at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)

Some error when trying to parse the event object. I'm running kafka v2.4.0 just from the command line typing simple strings such as hello world. This is my neo4j.config settings:

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092

streams.sink.enabled=true
streams.sink.topic.cypher.first-topic=CREATE (n:Label) SET n.data = event

Any pointers where to go looking for the cause of the error?

Hi @mike.r.black,

your Sink (consumer) configuration is correct.

Neo4j Streams plugin supports two types of deserializers:

  • org.apache.kafka.common.serialization.ByteArrayDeserializer: if you want manage JSON messages
  • io.confluent.kafka.serializers.KafkaAvroDeserializer: if you want manage AVRO messages

Said so, what you have to do to make your Sink works is just send the message in JSON format. For example:

{"message": "Hello world!"}

Please checkout the Consumer section for further details.

Hope this helps!

Regards,

Mauro

Success! Thanks for the help, I didn't realize how strictly the messages had to be,

1 Like