Sink to Neo4j from Kafka filing

hi hi all,

ok, this is slightly strange, posted a couple of other messages, including on the discord server.

I’m going to post working examples and then the failing example.

The 2 inbound topics are adults and children as per adults.json.txt and children.json.txt.

children.json.txt (601 Bytes)

adults.json.txt (1.7 KB)

the adult nodes and the as per:

MERGE (t:Adults {nationalid: event.nationalid}) 
ON CREATE SET t += {
  nationalid:     event.nationalid, 
  _id:            event._id, 
  name:           event.name, 
  surname:        event.surname, 
  gender:         event.gender, 
  dob:            event.dob, 
  marital_status: event.marital_status, 
  partner:        event.partner, 
  status:         event.status, 
  family_id:      event.family_id,
  parcel_id:      event.address.parcel_id,
  createdAt:      timestamp()
}
ON MATCH SET t += { 
  _id:            event._id, 
  name:           event.name, 
  surname:        event.surname, 
  gender:         event.gender, 
  dob:            event.dob, 
  marital_status: event.marital_status, 
  partner:        event.partner, 
  status:         event.status, 
  family_id:      event.family_id,
  parcel_id:      event.address.parcel_id,
  updatedAt:      timestamp()
}

called by:

{
  "name": "neo4j-adults-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "adults",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

called by

#!/bin/bash

echo "Creating 'Adults' nodes sink..."
export NEO4J_CYPHER=$(cat create_adults_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_adults_node_sink.json


echo "Adults sink status:"
curl -s http://localhost:8083/connectors/neo4j-adults-node-sink/status | jq '.'

The above works.

Adult and Children address follow the same process, extracting the address from adults.json.txt and children.json.txt using the below example for adult work.

MERGE (add:Address {parcel_id: event.address.parcel_id}) 
ON CREATE SET add += {
  parcel_id:      event.address.parcel_id, 
  street_1:       event.address.street_1, 
  street_2:       event.address.street_2, 
  town:           event.address.town, 
  county:         event.address.county, 
  province:       event.address.province, 
  country:        event.address.country, 
  postal_code:    event.address.postal_code, 
  country_code:   event.address.country_code, 
  neighbourhood:  event.address.neighbourhood,
  createdAt:      timestamp()
} 
ON MATCH SET add += {
  street_1:       event.address.street_1, 
  street_2:       event.address.street_2, 
  town:           event.address.town, 
  county:         event.address.county, 
  province:       event.address.province, 
  country:        event.address.country, 
  postal_code:    event.address.postal_code, 
  country_code:   event.address.country_code, 
  neighbourhood:  event.address.neighbourhood,
  updatedAt:      timestamp()
}

called by :

{
  "name": "neo4j-children-address-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

called by:

#!/bin/bash

# =============================================================================
echo "Creating 'Children' Address nodes sink..."
export NEO4J_CYPHER=$(cat create_children_address_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_address_node_sink.json


echo "Children Address sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-address-node-sink/status | jq '.'

As you can see allot or repetition.

The following is failing.

MERGE (t:Children {nationalid: event.nationalid}) 
ON CREATE SET t += {
  nationalid:         event.nationalid, 
  _id:                event._id, 
  name:               event.name, 
  surname:            event.surname, 
  gender:             event.gender, 
  dob:                event.dob, 
  family_id:          event.family_id, 
  father_nationalid:  event.father_nationalid, 
  mother_nationalid:  event.mother_nationalid, 
  parcel_id:          event.address.parcel_id,
  createdAt:          timestamp()
}
ON MATCH SET t += { 
  _id:                event._id, 
  name:               event.name, 
  surname:            event.surname, 
  gender:             event.gender, 
  dob:                event.dob, 
  family_id:          event.family_id, 
  father_nationalid:  event.father_nationalid, 
  mother_nationalid:  event.mother_nationalid, 
  parcel_id:          event.address.parcel_id,
  updatedAt:          timestamp()
}

called by:

{
  "name": "neo4j-children-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

called by

#!/bin/bash

echo "Creating 'Children' nodes sink..."
export NEO4J_CYPHER=$(cat create_children_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_children_node_sink.json

echo "Children sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'

this is the “complex” one… where i’m not sure about the logic,

WITH event as data
UNWIND data.account AS account
MERGE (acc:Account {fspiAgentAccountId: account.fspiAgentAccountId})
ON CREATE SET acc += {
  nationalid:   data.nationalid,
  accountId:    account.accountId,
  fspiId:       account.fspiId,
  fspiAgentId:  account.fspiAgentId,
  accountType:  account.accountType,
  memberName:   account.memberName,
  cardHolder:   account.cardHolder,
  cardNumber:   account.cardNumber,
  expDate:      account.expDate,
  cardNetwork:  account.cardNetwork,  
  issuingBank:  account.issuingBank,
  createdAt:    timestamp()
}
ON MATCH SET acc += {
  accountId:    account.accountId,
  fspiId:       account.fspiId,
  fspiAgentId:  account.fspiAgentId,
  accountType:  account.accountType,
  memberName:   account.memberName,
  cardHolder:   account.cardHolder,
  cardNumber:   account.cardNumber,
  expDate:      account.expDate,
  cardNetwork:  account.cardNetwork,
  issuingBank:  account.issuingBank,
  updatedAt:    timestamp()
}

called by:

{
  "name": "neo4j-account-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "adults",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

called by

#!/bin/bash

echo "Creating 'Account' nodes sink..."
export NEO4J_CYPHER=$(cat create_account_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_account_node_sink.json

echo "Accounts sink status:"
curl -s http://localhost:8083/connectors/neo4j-account-node-sink/status | jq '.'

Error

curl -s ``http://localhost:8083/connectors/neo4j-children-node-sink/status`` | jq '.'
{
  "name": "neo4j-children-node-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect:8083"
    }
  ],
  "type": "sink"
}

what happens if you remove the $ ?

{
  "name": "neo4j-children-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "children",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.children": "{envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

Wont work.

The problem is not with the sink definition, it’s being loaded, its’ failing on the job execution side,

see other examples using similar $”{envVarProvider:NEO4J_CYPHER}” definition.

G

The error message says you have an extra “$” somewhere :slight_smile:

ye and i can’t figure out where, why i’m sharing all the files being loaded. hoping someone can see where…
G

I think that’s the only one :slight_smile:

the ‘create_children_node_sink.json‘ is working, the ‘create_children_address_node_sink.json‘ is not.

only different is the ‘name‘ value right at the top.

All other differences sits in the cypher file

G

Your error:

Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n      

with $ removed

curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'{"name": "neo4j-children-node-sink","connector": {"state": "RUNNING","worker_id": "connect:8083"},"tasks": [{"id": 0,"state": "FAILED","worker_id": "connect:8083","trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '{': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * {envVarProvider:NEO4J_CYPHER}} RETURN NULL"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"},{"id": 1,"state": "FAILED","worker_id": "connect:8083","trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '{': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * {envVarProvider:NEO4J_CYPHER}} RETURN NULL"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"}],"type": "sink"}