hi hi all,
ok, this is slightly strange, posted a couple of other messages, including on the discord server.
I’m going to post working examples and then the failing example.
The 2 inbound topics are adults and children as per adults.json.txt and children.json.txt.
children.json.txt (601 Bytes)
adults.json.txt (1.7 KB)
the adult nodes and the as per:
MERGE (t:Adults {nationalid: event.nationalid})
ON CREATE SET t += {
nationalid: event.nationalid,
_id: event._id,
name: event.name,
surname: event.surname,
gender: event.gender,
dob: event.dob,
marital_status: event.marital_status,
partner: event.partner,
status: event.status,
family_id: event.family_id,
parcel_id: event.address.parcel_id,
createdAt: timestamp()
}
ON MATCH SET t += {
_id: event._id,
name: event.name,
surname: event.surname,
gender: event.gender,
dob: event.dob,
marital_status: event.marital_status,
partner: event.partner,
status: event.status,
family_id: event.family_id,
parcel_id: event.address.parcel_id,
updatedAt: timestamp()
}
called by:
{
"name": "neo4j-adults-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "adults",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
called by
#!/bin/bash
echo "Creating 'Adults' nodes sink..."
export NEO4J_CYPHER=$(cat create_adults_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_adults_node_sink.json
echo "Adults sink status:"
curl -s http://localhost:8083/connectors/neo4j-adults-node-sink/status | jq '.'
The above works.
Adult and Children address follow the same process, extracting the address from adults.json.txt and children.json.txt using the below example for adult work.
MERGE (add:Address {parcel_id: event.address.parcel_id})
ON CREATE SET add += {
parcel_id: event.address.parcel_id,
street_1: event.address.street_1,
street_2: event.address.street_2,
town: event.address.town,
county: event.address.county,
province: event.address.province,
country: event.address.country,
postal_code: event.address.postal_code,
country_code: event.address.country_code,
neighbourhood: event.address.neighbourhood,
createdAt: timestamp()
}
ON MATCH SET add += {
street_1: event.address.street_1,
street_2: event.address.street_2,
town: event.address.town,
county: event.address.county,
province: event.address.province,
country: event.address.country,
postal_code: event.address.postal_code,
country_code: event.address.country_code,
neighbourhood: event.address.neighbourhood,
updatedAt: timestamp()
}
called by :
{
"name": "neo4j-children-address-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "children",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
called by:
#!/bin/bash
# =============================================================================
echo "Creating 'Children' Address nodes sink..."
export NEO4J_CYPHER=$(cat create_children_address_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_children_address_node_sink.json
echo "Children Address sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-address-node-sink/status | jq '.'
As you can see allot or repetition.
The following is failing.
MERGE (t:Children {nationalid: event.nationalid})
ON CREATE SET t += {
nationalid: event.nationalid,
_id: event._id,
name: event.name,
surname: event.surname,
gender: event.gender,
dob: event.dob,
family_id: event.family_id,
father_nationalid: event.father_nationalid,
mother_nationalid: event.mother_nationalid,
parcel_id: event.address.parcel_id,
createdAt: timestamp()
}
ON MATCH SET t += {
_id: event._id,
name: event.name,
surname: event.surname,
gender: event.gender,
dob: event.dob,
family_id: event.family_id,
father_nationalid: event.father_nationalid,
mother_nationalid: event.mother_nationalid,
parcel_id: event.address.parcel_id,
updatedAt: timestamp()
}
called by:
{
"name": "neo4j-children-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "children",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.children": "${envVarProvider:NEO4J_CYPHER}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
called by
#!/bin/bash
echo "Creating 'Children' nodes sink..."
export NEO4J_CYPHER=$(cat create_children_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_children_node_sink.json
echo "Children sink status:"
curl -s http://localhost:8083/connectors/neo4j-children-node-sink/status | jq '.'
this is the “complex” one… where i’m not sure about the logic,
WITH event as data
UNWIND data.account AS account
MERGE (acc:Account {fspiAgentAccountId: account.fspiAgentAccountId})
ON CREATE SET acc += {
nationalid: data.nationalid,
accountId: account.accountId,
fspiId: account.fspiId,
fspiAgentId: account.fspiAgentId,
accountType: account.accountType,
memberName: account.memberName,
cardHolder: account.cardHolder,
cardNumber: account.cardNumber,
expDate: account.expDate,
cardNetwork: account.cardNetwork,
issuingBank: account.issuingBank,
createdAt: timestamp()
}
ON MATCH SET acc += {
accountId: account.accountId,
fspiId: account.fspiId,
fspiAgentId: account.fspiAgentId,
accountType: account.accountType,
memberName: account.memberName,
cardHolder: account.cardHolder,
cardNumber: account.cardNumber,
expDate: account.expDate,
cardNetwork: account.cardNetwork,
issuingBank: account.issuingBank,
updatedAt: timestamp()
}
called by:
{
"name": "neo4j-account-node-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "adults",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}
called by
#!/bin/bash
echo "Creating 'Account' nodes sink..."
export NEO4J_CYPHER=$(cat create_account_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @create_account_node_sink.json
echo "Accounts sink status:"
curl -s http://localhost:8083/connectors/neo4j-account-node-sink/status | jq '.'
Error
curl -s ``http://localhost:8083/connectors/neo4j-children-node-sink/status`` | jq '.'
{
"name": "neo4j-children-node-sink",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "connect:8083"
}
],
"type": "sink"
}