Procedure parameter passing

I have a neo4j procedure such that ::

@Procedure(
        name = "com.comeon.procedures.checkAndAddProcedure",
        mode = Mode.WRITE
)
@Description("Find and check if user needs to be added to a segmentation node")
public void triggerProcedure(@Name("inPlayerId") Number inPlayerId,@Name("inActivatorsData") String inActivatorsData) {
    log.info("fire checkAndAddProcedure entered for node player : " + inPlayerId);
    Result result = tx.execute(SOME CPHYPER)
}

I know it works as I can call it manully from the ui like com.comeon.procedures.checkAndAddProcedure(2345,"test")
and see 2345 in the logs.

I've created a trigger :

CALL apoc.trigger.add('segment_3', '
UNWIND $createdNodes AS accountAction
CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, "deposit")
    ', {phase:'after'})

so that anytime a accountAction node is created the accountAction.playerId along with a string
is sent to the procedure. I know the playerId is populated as i can see it but the
server logs always show : fire checkAndAddProcedure entered for node player : null !!

I belive that the trigger may be being fired on the creation of any node hence the null ??
so I amended it to

CALL apoc.trigger.add('segment_3', '
UNWIND $createdNodes AS accountAction
WITH accountAction
WHERE "AccountAction" IN labels(accountAction)
CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, "deposit")
', {phase:'after'})

but that just gave a io.netty.channel.StacklessClosedChannelException !!

Am i sending the parameter incorrectly or is there something missing?
I'm currently using neo4j db 5.15.0

You can also use WHERE accountAction: AccountAction

I don't see anything obvious. What is the 'SOME CPHYPER'?

Hi again Gary !!

I don't think it's the procedure that's the problem but the trigger

CALL apoc.trigger.add('segment_3', '
UNWIND $createdNodes AS accountAction
CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, "deposit")
', {phase:'after'})

I suspect the above calls the procedure whenever there's a create so will give null as a lot of the
nodes don't have certain properties.

this:

CALL apoc.trigger.add('segment_3', '
WITH $createdNode AS accountAction
WHERE "AccountAction" IN labels(accountAction)
CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, "deposit")
', {phase:'after'})

or this:

CALL apoc.trigger.add('segment_3',"UNWIND apoc.trigger.nodesByLabel($assignedLabels,'AccountAction') AS accountAction
CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, 'deposit')", {phase:'before'})

works fine if I create the nodes though neo4j ui.

but using java code from a remote machine with the identical query they produce the same error :

org.neo4j.bolt.protocol.error.streaming.BoltStreamingWriteException: Failed to transmit operation result: Response write failure
at org.neo4j.bolt.protocol.common.fsm.response.NetworkResponseHandler.onSuccess(NetworkResponseHandler.java:184) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.lambda$interrupt$6(AtomicSchedulingConnection.java:462) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJob(AtomicSchedulingConnection.java:336) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.doExecuteJobs(AtomicSchedulingConnection.java:315) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJobs(AtomicSchedulingConnection.java:221) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) [?:?]
at java.lang.Thread.run(Thread.java:1589) [?:?]
Caused by: io.netty.channel.StacklessClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source) ~[netty-transport-4.1.100.Final.jar:4.1.100.Final]

from the error it seems like a network issue but all other nodes are created fine. this only occurs when the trigger is fired

You are exactly correct about the trigger executing for every node created, as well as other events. As such, you need to filter out the $createdNodes list to just the ones you want to process. You were in the correct direction by inserting a 'with/where' pair to only pass the nodes with the label of the nodes to process. After that, you can assume the nodes all have that label. The 'apoc.trigger.nodesByLabel' is also good.

Your observation that the value is null due to the property not being defined for nodes other than the AccountAction seems reasonable as explanation for the null values in the log.

Now that you solve that, let's look at the new error. You state this only happens when calling from the java code. Can you provide java code here?

Just a note, I don't understand why you are using a custom procedure in this case. You custom procedure does not leverage the java API at all, except for creating a relationship between two nodes. Your custom procedure logic is almost entirely in an 'execute' method. The cypher query in the execute method, and the post processing of the results can be done directly in a cypher query via a driver. I could see where this maybe be justified if your cypher query returned thousands of nodes to create relationships to. Doing this on the server would eliminating sending them the client to process.

What was your thought process to using a custom procedure? I have used them in my application to traverse graphs and accumulate metrics. This is something I can not do efficiently in cypher.

Hi Gary
Thank for the reply. no i'm not 100% if a stored custom procedure is the way to go at the moment it's a proof of concept sys.
The idea is each group node has a property "query" that holds a query matching the cretaria of being a part of it's group.
such as :

MATCH (aca:AccountAction)<-[:PLAYER_ACCOUNT_ACTION]-(aaha:AccountActionHourlyAggregate)<-[paat:PLAYER_ACCOUNT_ACTION_TIME]-(aada:AccountActionDailyAggregate)<-[paad:PLAYER_ACCOUNT_ACTION_DATE]-(player:Player)
WHERE date(paad.date) >= date("2024-02-01") AND date(paad.date) <= date("2024-02-15")
WITH player, COLLECT({action: aca, amount: aca.amount}) AS actions
WITH player, REDUCE(total = 0, action IN actions | total + action.amount) AS totalAmount
WHERE totalAmount == 100

when events are triggered on the db such as an AccountAction creation a trigger will send the playerId and the action type to the procedure, the procedure then runs all the group nodes query against the player to see if he needs to be added or removed from a group. A cron job would not be able to do this as it needs to be in real time (give or take a few minutes). I think i'm dealing with about 500,000 hits every 5 seconds it's the 1st time with neo4j but was told it would be a good fit (as it would be used for other bits).

As for the problem I'm convinced it must be how the code interacts in the transaction as its a long query to create the AccountAction :::

MERGE (aaha:AccountActionHourlyAggregate {id:2024020209})
ON CREATE SET aaha.id = 2024020209
,aaha.playerId = 353453
MERGE (aada:AccountActionDailyAggregate {id: 20240202})
ON CREATE SET aada.id = 20240202
,aada.playerId = 353453
MERGE (player:Player {playerId: 353453})
MERGE (aaha)<-[rel1:PLAYER_ACCOUNT_ACTION_TIME]-(aada)
ON CREATE SET rel1.time = datetime("2024-02-02T09:00:43Z")
MERGE (aada)<-[rel2:PLAYER_ACCOUNT_ACTION_DATE]-(player)
ON CREATE SET rel2.date = date("2024-02-02")
WITH aaha
MERGE (aca:AccountAction {accountActionId:74494 })
ON CREATE
SET aca.accountActionId = 74494,
aca.accHistId= 200007303839,
aca.accountActionId= 74494,
aca.actionId= 22,
aca.playerId= 353453,
aca.amount= 50,
aca.amountInOriginalCurrency= 51.0,
aca.transactionCount= 1,
aca.odds= 0,
aca.bets= 0,
aca.internalRef= 'ST-2436535743454-200',
aca.externalRef= 'd-cypAutoTrx2023-11-01T10:55:43.069Z_57_70',
aca.extCode= 'cypAutoRound2023-11-01T10:55:43.069Z_57_70',
aca.aca.aca.aca.aca.aca.text= '',
aca.paymentMethodId= 0,
aca.gameId= 15775,
aca.currencyId= 1,
aca.accStatusId= 11,
aca.paymentProviderId= 0,
aca.time= '2024-02-02T09:00:43.000Z'
WITH aaha,aca
WITH aca,aaha
MATCH (n:AccountActionType)
WHERE n.accountActionTypeId = 22
MERGE (aca)-[:ACCOUNT_ACTION_TYPE]->(n)
WITH aca,aaha
MATCH (n:AccountStatus)
WHERE n.accountStatusId = 11
MERGE (aca)-[:ACCOUNT_STATUS]->(n)
WITH aca,aaha
MATCH (n:Currency)
WHERE n.currencyId = 1
MERGE (aca)-[:CURRENCY]->(n)
WITH aca,aaha
MATCH (n:PaymentMethod)
WHERE n.paymentMethodId = 0
MERGE (aca)-[:PAYMENT_METHOD]->(n)
WITH aca,aaha
MATCH (n:PaymentGroup)
WHERE n.paymentGroupId = 0
MERGE (aca)-[:PAYMENT_PROVIDER]->(n)
WITH aca,aaha
MERGE (aaha)-[:PLAYER_ACCOUNT_ACTION]->(aca)
RETURN aca

I'll try to strip it back to "CREATE (accountAction:AccountAction {playerId: 2345})" in a test to see what happens as for the code ::

public String saveAccountDepositActionAndRelationships(String inAccountActionCreateStr) {
try (Session session = sessionDriver.session()) {
Transaction tx = session.beginTransaction();
var result = tx.run(inAccountActionCreateStr);
tx.commit();
}
return "ok";
}

the input being a string of that very long query!

Hi Gary
just a quick update i've wirten a quick test and stripped the AccountAction node back
during the test i used the apoc.trigger.add and was able to see the printout in the logs !!
the Procedure is called.

String theTriggerTemplate2 = """
             CALL apoc.trigger.add('segment_3',"UNWIND apoc.trigger.nodesByLabel($assignedLabels,'AccountAction') AS accountAction
             CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, 'deposit')", {phase:'afterAsync'})
             """;

moving to the new api call apoc.trigger.install there's no print out
the Procedure is not called.

String theTriggerTemplateNew2 = """
             USE system
             CALL apoc.trigger.install('neo4j','segment_3',"UNWIND apoc.trigger.nodesByLabel($assignedLabels,'AccountAction') AS accountAction
             CALL com.comeon.procedures.checkAndAddProcedure(accountAction.playerId, 'deposit')", {phase:'afterAsync'})
             """;

One thing you changed is the phase of the trigger. You are now using 'afterAsync', instead of 'after'. The document states this is the preferred phase. Could that be the cause of it suddenly working?

Did you wait for the new trigger to take affect before testing with the 'install' method? There is something like a 30 second delay when creating a trigger. The delay is configurable.

There is a lot of parts here, so it is difficult for me to see the entire picture. I would glad to try to troubleshoot a little if you could provide the code. If that is possible, do you have an IntelliJ project or something similar that has the this code isolated?

As a note, I don't use manual transactions as you have in your driver code with 'beginTransaction' and 'commit'. I use 'executeRead' and 'executeWrite', which are managed transactions that have retry logic build in. They take a transationCallback that defines your query logic. A benefit is that you can run multiple 'run' statements in the same transactionCallback in case you have complicated logic.

One thing you may want to try to troubleshoot is to change your trigger to just log the contents of the $createNodes and other trigger parameters that you utilize to see what is happening.

Hi Gary
You’re absolutely correct using afterAsync works but still only with the old api i.e: CALL apoc.trigger.add but not with CALL apoc.trigger.install !!
with the later i now get the log but also receive :

2024-02-26 01:22:09.614+0000 WARN [o.n.b.p.c.c.c.AtomicSchedulingConnection] [bolt-6] Terminating connection due to network error
org.neo4j.bolt.protocol.error.streaming.BoltStreamingWriteException: Failed to transmit operation result: Response write failure
at org.neo4j.bolt.protocol.common.fsm.response.NetworkResponseHandler.onSuccess(NetworkResponseHandler.java:184) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.lambda$interrupt$6(AtomicSchedulingConnection.java:462) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJob(AtomicSchedulingConnection.java:336) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.doExecuteJobs(AtomicSchedulingConnection.java:315) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at org.neo4j.bolt.protocol.common.connector.connection.AtomicSchedulingConnection.executeJobs(AtomicSchedulingConnection.java:221) ~[neo4j-bolt-5.15.0.jar:5.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
at java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: io.netty.channel.StacklessClosedChannelException

Hi Gary
A quick question the trigger below will go though all AccountAction nodes in the db ?
if so is there any way i can just pass the AccountAction node that fired the trigger ?

CALL apoc.trigger.add('0bd17529-036d-4707-be71-bb363f80a2cc',"UNWIND apoc.trigger.nodesByLabel($assignedLabels,'AccountAction') AS accountAction
CALL com.comeon.procedures.checkAndRemoveProcedure(accountAction.playerId)",
{phase:'afterAsync'})

The trigger will be called for every action supported by triggers, I.e., node created, relationship created, node deleted, relationship deleted, property changes, etc. The changes are provided to the trigger’s cypher via the parameters: $createdNodes, $deletedNodes, etc. These parameters will only contain the entities impacted.

Since your trigger action is restricted to AccountAction nodes, you will need a precondition to start your query to filter out all other node labels.

If you are looking to only process created nodes, then you need to process the nodes in $createdNodes. You also need to only process AccountAction nodes. The use of apoc.trigger.nodesByLabel($assignedLabels,'AccountAction') AS accountAction will return return the nodes that had the label AccountAction added to the node. This is not what you want it you want to process newly created AccountAction nodes. You want to do something like the following:

CALL apoc.trigger.add('0bd17529-036d-4707-be71-bb363f80a2cc',"UNWIND $createdNodes as createdNode
With createdNode
Where createdNode:AccountAction
With createdNode
Call com.comeon.procedures.checkAndRemoveProcedure(createdNode.playerId)",
{phase:'afterAsync'})
1 Like