Hi guys
I have a outbound account1:
{
"fullName": "Dom Silva Blues",
"accountEntityId": "JJIVESZA-15412754683",
"accountId": "15412754683",
"tenantId": "JJIVESZA",
"accountAgentId": "JJIVESZA"
}
I have a inbound account2:
{
"fullName": "Martie Smartie",
"accountEntityId": "CABLZAJJ-1148157091",
"accountId": "1148157091",
"tenantId": "Z",
"accountAgentId": "Z"
}
I now have a outbound transaction -> topic ob_txn_edge
{
"transactionId": "1bbaf5de-737a-49d3-8ad1-f404daa0fb78",
"tenantId": "JJIVESZA",
"accountEntityId": "JJIVESZA-15412754683",
"counterpartyEntityId": "CABLZAJJ-1148157091",
"eventTime": "2025-06-20T14:59:59+02:00",
"baseValue": 2000,
"currency": "ZAR"
}
- and Inbound transaction -> topic ib_txn_edge
{
"transactionId": "1bbaf5de-737a-49d3-8ad1-f404daa0fb89",
"tenantId": "CABLZAJJ",
"accountEntityId": "CABLZAJJ-1148157091",
"counterpartyEntityId": "JJIVESZA-15412754683",
"eventTime": "2025-06-20T14:59:59+02:00",
"baseValue": 2000,
"currency": "ZAR"
}
My thinking to link the 3 (ah1, txn, ah2) together is the below, I am however not getting a relation created.
I'd like to create the 2 edge/links, as per below.
this is:
- not creating either of the links.
- and if it did, not sure how these values would be pulled into the link as properties ?
Please help
Sink definition
{
"name": "neo4j-ah1-txn-ah2-edges-sink",
"config": {
"connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
"topics": "ob_txn_edge,ib_txn_edge",
"neo4j.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "dbpassword",
"neo4j.cypher.topic.ob_txn_edge": "MATCH (a1:AccountHolder {accountEntityId: event.accountEntityId}) MATCH (a2:AccountHolder {counterpartyEntityId: event.counterpartyEntityId}) MERGE (ah1)-[r:PAID_FUNDS_TO]->(ah2)",
"neo4j.cypher.topic.ib_txn_edge": "MATCH (a1:AccountHolder {accountEntityId: event.accountEntityId}) MATCH (a2:AccountHolder {counterpartyEntityId: event.counterpartyEntityId}) MERGE (ah1)-[r:RECIEVED_FUNDS_FROM]->(ah2)",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "2",
"neo4j.batch.size": "1000",
"neo4j.batch.timeout.msecs": "5000",
"neo4j.retry.backoff.msecs": "3000",
"neo4j.retry.max.attemps": "5"
}
}