Unpacking a event.accounts thats array of record

Hi all.

Trying to get a Kafka → Neo4J sink to work.

Going to include my shells script, the Kafka sink json and the “base” cypher script and an example of the message.

I’m trying to extract the accounts from the event.account and create them as individual nodes.

G

Shell script

#!/bin/bash

echo "Creating 'Account' nodes sink..."
export NEO4J_CYPHER=$(cat create_account_node_sink.cypher)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @create_account_node_sink.json


echo "Accounts sink status:"
curl -s http://localhost:8083/connectors/neo4j-accounts-node-sink/status | jq '.'

Kafka Sink.

{
  "name": "neo4j-accounts-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "adults",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.adults": "${envVarProvider:NEO4J_CYPHER}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "2",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

Sink/Cypher

MERGE (t:Accounts {fspiAgentAccountId: event.fspiAgentAccountId}) 
ON CREATE SET t += {
  fspiAgentAccountId: event.fspiAgentAccountId, 
  accountId: event.accountId, 
  fspiId: event.fspiId, 
  fspiAgentId: event.fspiAgentId, 
  accountType: event.accountType, 
  memberName: event.memberName
} 
ON MATCH SET t += {
  accountId: event.accountId, 
  fspiId: event.fspiId, 
  fspiAgentId: event.fspiAgentId, 
  accountType: event.accountType, 
  memberName: event.memberName
}

Message Example

{
  "_id": "152b85fb-a83b-4d6b-8006-76cbd94850e1",
  "name": "Fergal",
  "surname": "Freeley",
  "nationalid": "2023852B",
  "gender": "M",
  "dob": "05/10/16",
  "marital_status": "Married",
  "partner": "7733541K",
  "status": "Living",
  "account": [
    {
      "fspiAgentAccountId": "BARCIE2D-32370059",
      "accountId": "32370059",
      "fspiId": "BARCIE2D",
      "fspiAgentId": "BARCIE2D",
      "accountType": "Current Accounts",
      "memberName": "Barclays Bank Ireland plc"
    },
    {
      "fspiAgentAccountId": "BOFIIE2D-68831794",
      "accountId": "68831794",
      "fspiId": "BOFIIE2D",
      "fspiAgentId": "BOFIIE2D",
      "accountType": "Savings/Deposit",
      "memberName": "Bank of Ireland plc"
    },
    {
      "fspiAgentAccountId": "BARCIE2D-71834892",
      "accountId": "71834892",
      "fspiId": "BARCIE2D",
      "fspiAgentId": "BARCIE2D",
      "accountType": "Savings/Deposit",
      "memberName": "Barclays Bank Ireland plc"
    },
    {
      "fspiAgentAccountId": "AIBKGB2-31430193",
      "accountId": "31430193",
      "fspiId": "AIBKGB2",
      "fspiAgentId": "AIBKGB2",
      "accountType": "Savings/Deposit",
      "memberName": "Allied Irish Banks plc"
    },
    {
      "cardHolder": "F Freeley",
      "cardNumber": "4420159224478543",
      "expDate": "10/25",
      "cardNetwork": "Visa",
      "issuingBank": "Allied Irish Banks plc"
    }
  ],
  "address": {
    "street_1": "90 Gilgunn Street Street",
    "street_2": "",
    "neighbourhood": "Carrick Road",
    "town": "Boyle",
    "county": "Roscommon",
    "province": "Connacht",
    "country": "Ireland",
    "country_code": "IE",
    "postal_code": "F52 T3N8",
    "parcel_id": "F52 T3N8-16688"
  }
}

Since one event contains a list of accounts, I think it is as simple as:

unwind event.account as account 
MERGE (t:Accounts {fspiAgentAccountId: account.fspiAgentAccountId}) 
ON CREATE SET t += {
  fspiAgentAccountId: account.fspiAgentAccountId, 
  accountId: account.accountId, 
  fspiId: account.fspiId, 
  fspiAgentId: account.fspiAgentId, 
  accountType: account.accountType, 
  memberName: account.memberName
} 
ON MATCH SET t += {
  accountId: account.accountId, 
  fspiId: account.fspiId, 
  fspiAgentId: account.fspiAgentId, 
  accountType: account.accountType, 
  memberName: account.memberName
}

Remark 1: In addition, do you really want the label to be Accounts and not Account?
Remark 2: There is also a FOREACH that can be used instead of UNWIND (depending on what the rest of the query will look like once/if you also deal with the Person and Address information )

Hi hi

Not stuck on the Accounts… def be Account.

mind sharing a foreach example… the address i pull out into a Address node.

G

curious…
can we do something.. like add a field… Acc_Type that is either “Account” or “Card” based on the account information populated or the card information specified.

you will see the last account record is a CC type.

error time…

need to see about my children topic sink also… somehow it was working, now suddenly not…

Wondering. how would the code handle a empty, null field.

G

err.txt (14.2 KB)

weird… the children_address and adult_address is working….

but the simpler children is failing…

never mind still need to look at accounts.

rabbit holes.

G

so trying various structures.

below is the newest cypher and the error below.

WITH event as data
UNWIND data.account AS account
MERGE (acc:Account {fspiAgentAccountId: account.fspiAgentAccountId})
ON CREATE SET acc += {
  nationalid:   data.nationalid,
  accountId:    account.accountId,
  fspiId:       account.fspiId,
  fspiAgentId:  account.fspiAgentId,
  accountType:  account.accountType,
  memberName:   account.memberName,
  cardHolder:   account.cardHolder,
  cardNumber:   account.cardNumber,
  expDate:      account.expDate,
  cardNetwork:  account.cardNetwork,  
  issuingBank:  account.issuingBank,
  createdAt:    timestamp()
}
ON MATCH SET acc += {
  accountId:    account.accountId,
  fspiId:       account.fspiId,
  fspiAgentId:  account.fspiAgentId,
  accountType:  account.accountType,
  memberName:   account.memberName,
  cardHolder:   account.cardHolder,
  cardNumber:   account.cardNumber,
  expDate:      account.expDate,
  cardNetwork:  account.cardNetwork,
  issuingBank:  account.issuingBank,
  updatedAt:    timestamp()
}

Payload

{
  "_id": "152b85fb-a83b-4d6b-8006-76cbd94850e1",
  "name": "Fergal",
  "surname": "Freeley",
  "nationalid": "2023852B",
  "gender": "M",
  "dob": "05/10/16",
  "marital_status": "Married",
  "partner": "7733541K",
  "status": "Living",
  "account": [
    {
      "fspiAgentAccountId": "BARCIE2D-32370059",
      "accountId": "32370059",
      "fspiId": "BARCIE2D",
      "fspiAgentId": "BARCIE2D",
      "accountType": "Current Accounts",
      "memberName": "Barclays Bank Ireland plc"
    },
    {
      "fspiAgentAccountId": "BOFIIE2D-68831794",
      "accountId": "68831794",
      "fspiId": "BOFIIE2D",
      "fspiAgentId": "BOFIIE2D",
      "accountType": "Savings/Deposit",
      "memberName": "Bank of Ireland plc"
    },
    {
      "fspiAgentAccountId": "BARCIE2D-71834892",
      "accountId": "71834892",
      "fspiId": "BARCIE2D",
      "fspiAgentId": "BARCIE2D",
      "accountType": "Savings/Deposit",
      "memberName": "Barclays Bank Ireland plc"
    },
    {
      "fspiAgentAccountId": "AIBKGB2-31430193",
      "accountId": "31430193",
      "fspiId": "AIBKGB2",
      "fspiAgentId": "AIBKGB2",
      "accountType": "Savings/Deposit",
      "memberName": "Allied Irish Banks plc"
    },
    {
      "cardHolder": "F Freeley",
      "cardNumber": "4420159224478543",
      "expDate": "10/25",
      "cardNetwork": "Visa",
      "issuingBank": "Allied Irish Banks plc"
    }
  ],
  "address": {
    "street_1": "90 Gilgunn Street Street",
    "street_2": "",
    "neighbourhood": "Carrick Road",
    "town": "Boyle",
    "county": "Roscommon",
    "province": "Connacht",
    "country": "Ireland",
    "country_code": "IE",
    "postal_code": "F52 T3N8",
    "parcel_id": "F52 T3N8-16688"
  }
}

error:

curl -s ``http://localhost:8083/connectors/neo4j-account-node-sink/status`` | jq '.'
{
  "name": "neo4j-account-node-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    },
    {
      "id": 1,
      "state": "FAILED",
      "worker_id": "connect:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: org.neo4j.driver.exceptions.ClientException: Invalid input '$': expected ',', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FINISH', 'FOREACH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WHERE', 'WITH' or '}' (line 1, column 187 (offset: 186))\n\"CYPHER 5 UNWIND $events AS message WITH message.value AS event, message.timestamp AS __timestamp, message.header AS __header, message.key AS __key, message.value AS __value CALL {WITH * ${envVarProvider:NEO4J_CYPHER}} RETURN NULL\"\n                                                                                                                                                                                           ^\n\tat org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:111)\n\tat org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages$lambda$8$lambda$6$lambda$5(Neo4jSinkTask.kt:60)\n\tat org.neo4j.driver.internal.InternalSession.lambda$transaction$4(InternalSession.java:137)\n\tat org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic.retry(ExponentialBackoffRetryLogic.java:106)\n\tat org.neo4j.driver.internal.InternalSession.transaction(InternalSession.java:134)\n\tat org.neo4j.driver.internal.InternalSession.writeTransaction(InternalSession.java:113)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:59)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.processMessages(Neo4jSinkTask.kt:71)\n\tat org.neo4j.connectors.kafka.sink.Neo4jSinkTask.put(Neo4jSinkTask.kt:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\n\tSuppressed: org.neo4j.driver.internal.util.ErrorUtil$InternalExceptionCause\n\t\tat org.neo4j.driver.internal.util.ErrorUtil.newNeo4jError(ErrorUtil.java:76)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher.handleFailureMessage(InboundMessageDispatcher.java:107)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.unpackFailureMessage(CommonMessageReader.java:75)\n\t\tat org.neo4j.driver.internal.messaging.common.CommonMessageReader.read(CommonMessageReader.java:53)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:81)\n\t\tat org.neo4j.driver.internal.async.inbound.InboundMessageHandler.channelRead0(InboundMessageHandler.java:37)\n\t\tat io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat org.neo4j.driver.internal.async.inbound.MessageDecoder.channelRead(MessageDecoder.java:42)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\t... 1 more\n"
    }
  ],
  "type": "sink"
}