Conditional include field in Kafka Connect Sink

Hi all

Trying to stitch the following cypher together...

It will be used as per below (right at the bottom) as a inline cypher text for a kafka connect sink job into neo3j.

MERGE 
    (a:Person {accountEntityId: event.accountEntityId}) 
ON CREATE SET  {a.accountEntityType = event.accountEntityType, a.tenantId = event.tenantId, a.fullName = event.fullName }
ON MATCH SET += {a.idnumber = event.idnumber, a.address = event.address, a.dob = event.dob, a.reg_id = event.reg_id, a.home_phone = event.home_phone}
}

which will become a one liner similar:

"neo4j.cypher.topic.person": "MERGE (a:Person {accountEntityId: event.accountEntityId}) ON CREATE SET  {a.accountEntityType = event.accountEntityType, a.tenantId = event.tenantId, a.fullName = event.fullName, a.dob = event.dob, a.reg_id = event.reg_id } ON MATCH SET += {a.dob = event.dob, a.reg_id = event.reg_id}"

I only want to add the update if present or add if not the MATCH components, i.e, dob, reg_id and value if present in the topic-> HOW?

{
  "name": "neo4j-person-node-sink",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "topics": "person",
    "neo4j.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "dbpassword",
    "neo4j.cypher.topic.person": "MERGE (a:Person {accountEntityId: event.accountEntityId}) ON CREATE SET  {a.accountEntityType = event.accountEntityType, a.tenantId = event.tenantId, a.fullName = event.fullName} ON MATCH SET += {a.dob = event.dob, a.reg_id = event.reg_id}"
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "tasks.max": "1",
    "neo4j.batch.size": "1000",
    "neo4j.batch.timeout.msecs": "5000",
    "neo4j.retry.backoff.msecs": "3000",
    "neo4j.retry.max.attemps": "5"
  }
}

I am sorry I don't understand, can you explain what is not working with your cypher?

I ... saw some cypher some where with inline if statement if event. is defined then push into property... and was wondering if that might be needed here.

if one of these fields does not exist in my payload, what would the cypher behaviour be.

i.e. address might not be in the source message. Whats cyphers behaviour going to be in the line above in my connect sink.

G

Let me say it this way...

the ON MATCH SET += { < This, fields available is not fixed in the inbound message payload> }

G

The properties will be set to null if event.x returns null. This will remove an existing property or not add it.

Your syntax is a little off. I typically set the individual properties, and only use a map if I had a map already. Here are the two correct syntaxes. Note, you still need to use the += operator for the ON CREATE clause because you need to preserve the accountEntityId set in the MERGE itself when a new node is created.

Using individual properties:

MERGE (a:Person {accountEntityId: event.accountEntityId}) 
ON CREATE SET a.accountEntityType = event.accountEntityType, a.tenantId = event.tenantId, a.fullName = event.fullName
ON MATCH SET a.idnumber = event.idnumber, a.address = event.address, a.dob = event.dob, a.reg_id = event.reg_id, a.home_phone = event.home_phone

Using a map of properties:

MERGE (a:Person {accountEntityId: event.accountEntityId}) 
ON CREATE SET a += {accountEntityType: event.accountEntityType, tenantId: event.tenantId, fullName: event.fullName}
ON MATCH SET a += {idnumber: event.idnumber, address: event.address, dob: event.dob, reg_id: event.reg_id, home_phone: event.home_phone}

I used a map as I push this as a single line into Kafka Connect sink connect, but will try you individual method.

Still not 100%... what will happen if the address variable is not in the event, as we're referencing it address: event.address. My source only gives me what changed. as such on the source decipher side I need to see if a field is there or not. if not then I dont have a value to pass through, ifs include it on my topic being consumed, I can't say null as that will remove what value I have...

Basically want if specified/defined then do the assignment, if it is not specified leave it as it is.

G

Now I understand. You could try this. During the ON MATCH, the variables will be set to the new values if provided, the existing values if not, or set to null if neither new or old is not present.

MERGE (a:Person {accountEntityId: event.accountEntityId}) 
ON CREATE SET 
    a.accountEntityType = event.accountEntityType, 
    a.tenantId = event.tenantId, 
    a.fullName = event.fullName
ON MATCH SET 
    a.idnumber = coalesce(event.idnumber, a.idnumber),
    a.address = coalesce(event.address, a.address),
    a.dob = coalesce(event.dob, a.dob),
    a.reg_id = coalesce(event.reg_id, a.reg_id),
    a.home_phone = coalesce(event.home_phone, a.home_phone)
2 Likes