hi this is aditya I am new to neo4j and kafka I am trying to load a graph into neo4j using my kafka.
The problem when i try to query the rest api to load the data the task runs but fails , with the following issue.
org.apache.kafka.common.config.ConfigException: Topic 'nyc_taxicab_data' is not assigned a sink strategy\n\tat org.neo4j.connectors.kafka.sink.SinkStrategyHandler$Companion.createForTopic(SinkStrategy.kt:229)\n\tat org.neo4j.connectors.kafka.sink.SinkStrategyHandler$Companion.createFrom(SinkStrategy.kt:134)\n\tat org.neo4j.connectors.kafka.sink.SinkConfiguration.topicHandlers_delegate$lambda$3(SinkConfiguration.kt:93)\n\tat kotlin.SynchronizedLazyImpl.getValue(LazyJVM.kt:83)\n\tat org.neo4j.connectors.kafka.sink.SinkConfiguration.getTopicHandlers(SinkConfiguration.kt:92)\n\tat org.neo4j.connectors.kafka.sink.SinkConfiguration.validateAllTopics(SinkConfiguration.kt:101)\n\tat org.neo4j.connectors.kafka.sink.SinkConfiguration.(SinkConfiguration.kt:50)\n\tat org.neo4j.connectors.kafka.sink.SinkConfiguration.(SinkConfiguration.kt:41)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.start(Neo4jSinkTask.kt:37)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:313)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:199)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
}
],
"type": "sink"
this is my sink.neo4j.yaml file
{
"name": "Neo4jSinkConnectorJSONString",
"config": {
"topics": "nyc_taxicab_data",
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"neo4j.uri": "bolt://neo4j-service:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.topic.strategy": "topic",
"neo4j.topic.cypher.nyc_taxicab_data": "MERGE (p:Location {name: toInteger(event.PULocationID)}) MERGE (d:Location {name: toInteger(event.DOLocationID)}) MERGE (p)-[:TRIP {distance: toFloat(event.trip_distance), fare: toFloat(event.fare_amount)}]->(d)"
}
}
i have tried a few changes to class name , giving the strategy as node and couple of other smaller changes none seem to work.
I have gone through the online connector forum and made its few changes and I was not able to solve this issue.
if additional information is needed please let me know I can share the other zookeeper.yaml and kafka-setup.yaml