Kafka Sink -> Cypher failing

Pretty sure I’m looking straight over the error.

shell script

#!/bin/bash

echo "Creating 'Children' nodes sink..."
export NEO4J_CYPHER=$(cat create_children_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_node_sink.json

echo ""
echo "Checking connector status..."
echo "=========================="

echo "Children sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'

Kafka sink.

{
  "name": "neo4j-children-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

cypher:

MERGE (t:Children {nationalid: event.nationalid}) 
ON CREATE SET t += {
  nationalid:         event.nationalid, 
  _id:                event._id, 
  name:               event.name, 
  surname:            event.surname, 
  gender:             event.gender, 
  dob:                event.dob, 
  family_id:          event.family_id, 
  father_nationalid:  event.father_nationalid, 
  mother_nationalid:  event.mother_nationalid, 
  parcel_id:          event.address.parcel_id,
  createdAt:          timestamp()
}
ON MATCH SET t += { 
  _id:                event._id, 
  name:               event.name, 
  surname:            event.surname, 
  gender:             event.gender, 
  dob:                event.dob, 
  family_id:          event.family_id, 
  father_nationalid:  event.father_nationalid, 
  mother_nationalid:  event.mother_nationalid, 
  parcel_id:          event.address.parcel_id,
  updatedAt:          timestamp()
}

Example Payload:

{
  "_id": "96d35eb2-dc7f-40df-8128-30c58b250692",
  "dob": "19/03/28",
  "name": "Shaun",
  "gender": "Male",
  "address": {
    "town": "Galway City",
    "county": "Galway",
    "country": "Ireland",
    "province": "Connacht",
    "street_1": "99 Fresh Street Street",
    "street_2": "",
    "parcel_id": "H91 Y9P7-22470",
    "postal_code": "H91 Y9P7",
    "country_code": "IE",
    "neighbourhood": "Salthill"
  },
  "surname": "Doudigan",
  "nationalid": "0002003P",
  "family_id": "4e6f3e02-91ac-42f9-a518-408e780a7c7b",
  "father_nationalid": "7934317B",
  "mother_nationalid": "0181947G"
}

error:

curl -s ``http://localhost:8083/connectors/neo4j-children-node-sink/status`` | jq '.'
{
  "name": "neo4j-children-node-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "sink"
}

This is the end of your error message:

Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n

Which has no relationship with your queries above.

hmmm,

not sure how we can say have no relationship, the status is failed and the error message above is the output from:

curl -s ``http://localhost:8083/connectors/neo4j-children-node-sink/status`` | jq '.'

which was executed after the above scripts were executed to create the sink/merge job into Neo4J.

the same structure is used to sink the adults and it works there. the columns i double checked, as per the example payload, except if i’m blind,

cant see that i missed a comma anywhere..

The curious bit is I have the same error on my account unwind query, which i got in another thread…