cancel
Showing results for 
Search instead for 
Did you mean: 

Error wile trying to publish a message to KAFKA topic "Timed out waiting for a node assignment"

shruthi_j
Node

Using the neo4j version 3.5.23 , plugin version neo4j-streams-3.5.12.jar.

Trying to publish message to a KAFKA cluster with authorization and authentication, via a service account using SASL. The service account does not (cannot) have admin privileges to the cluster.

Added all the required kafka related parameters in neo4j.conf.

Getting below exception while trying to publish a message from Neo4j to KAFKA topic . (source/ streams module)

WARN Cannot retrieve valid topics because the following exception,
next attempt is in 300000 ms: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at streams.kafka.KafkaAdminService$start$1.invokeSuspend(KafkaAdminService.kt:29)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
at kotlinx.coroutines.DispatchedContinuation.run(Dispatched.kt:81)
at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

Upon looking at the neo4j-streams source code (from github), we are assuming this error is related to the listTopics(), describecluster() and other admin client related operations which the plugin tries to do against the kafka cluster.

Please confirm on the above understanding,

Also is there a workaround to publish messages to the secure KAFKA cluster with an account having privileges only for publish / consume. (no admin privileges).

Please suggest.

2 REPLIES 2

conker84
Graph Voyager

@shruthi_j can you please try with the last streams version?

I am having the same issue. I am using Neo4j Desktop 1.4.5 with a 4.2.5 database, 4.0.7 of the plugin and Confluent Kafka all running locally on a Windows 10 laptop.

I have confirmed that I can publish to a topic via a Kafka producer console. I am trying to learn the ins and outs of publishing and ingesting data between neo4j and Kafka. I would like to use the CDC processor to just publish Creates, Updates and Deletes.