I'm struggling with how to properly use the call apoc.periodic.iterate syntax with my query


(Paul Drangeid) #1

It seems no matter how I'm carving this up I get errors typically 'Query cannot conclude with CALL'

I'm not sure exactly what in this cypher would be properly segmented into returning items and action on each iteration, as it is a bit hybrid. The contents are about 100,000 rows, and it hangs when trying to import, so I'm trying to see how I can batch it. Do I somehow need to break this up into a smaller cypher to work within periodic.iterate, or can I somehow batch this entire procedure?

here's the cypher:

MATCH (cdl:Crmdataload {name:'CommitCRM'})
OPTIONAL MATCH (lc:Charge)
WITH max(lc.createdon) as lastnewcharge,timestamp() AS howsoonisnow
WITH apoc.date.format(coalesce(lastnewcharge,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS lastnewcharge,howsoonisnow
CALL apoc.load.driver('com.extendedsystems.jdbc.advantage.ADSDriver')
call apoc.load.jdbc('jdbc:extendedsystems:advantage://commitcrm:6262;catalog=//commitcrm/CommitCRM/Db','SELECT RECID,SLIPDATE,WORKERID,CARDID,TICKETID,ITEMID,DESCRIPTION
,HOURSAMOUNT,QUANTITY,PRICE,TOTAL,CREATEDATE,UPDATEDATE,UPDATEUSER,CREATEUSER,USER1,CONTACTID,BCRECID,BCCODE,FROMTIME,TOTIME from slips WHERE CREATEDATE > \''+lastnewcharge+'\'') YIELD row
MERGE (c:Charge {recid:row.RECID})
SET c.date=row.SLIPDATE,c.description=row.DESCRIPTION,c.hours=row.HOURSAMOUNT,c.quantity=row.QUANTITY,c.price=row.PRICE,c.createdon=row.CREATEDATE,c.updatedon=row.UPDATEDATE,c.fromtime=row.FROMTIME,c.totime=row.TOTIME
WITH row,c,howsoonisnow
OPTIONAL MATCH (e:Crmemployee {acctrecid:row.WORKERID})
//OPTIONAL MATCH (a:Company {acctrecid:row.CARDID})
OPTIONAL MATCH (t:Ticket {recid:row.TICKETID})
OPTIONAL MATCH (i:Crmitem {recid:row.ITEMID})
OPTIONAL MATCH (uu:Crmemployee {acctrecid:row.UPDATEUSER})
OPTIONAL MATCH (cu:Crmemployee {acctrecid:row.CREATEUSER})
OPTIONAL MATCH (ct:Contact {acctrecid:row.CONTACTID})
OPTIONAL MATCH (bc:Contract {recid:row.BCRECID})
FOREACH (ignoreMe in CASE WHEN not(exists(e.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:ENTRY_FOR]->(e))
//FOREACH (ignoreMe in CASE WHEN not(exists(e.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:ENTRY_FOR_ACCOUNT]->(a))
FOREACH (ignoreMe in CASE WHEN not(exists(t.recid)) THEN [1] ELSE [] END | MERGE (c)-[:PART_OF_TICKET]->(t))
FOREACH (ignoreMe in CASE WHEN not(exists(i.recid)) THEN [1] ELSE [] END | MERGE (c)-[:INCLUDES_ITEM]->(i))
FOREACH (ignoreMe in CASE WHEN not(exists(uu.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:LAST_UPDATED_BY]->(uu))
FOREACH (ignoreMe in CASE WHEN not(exists(cu.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:CREATED_BY]->(cu))
FOREACH (ignoreMe in CASE WHEN not(exists(ct.recid)) THEN [1] ELSE [] END | MERGE (c)-[:CLIENT_CONTACT]->(ct))
FOREACH (ignoreMe in CASE WHEN not(exists(bc.recid)) THEN [1] ELSE [] END | MERGE (c)-[:UNDER_CONTRACT]->(bc))
WITH howsoonisnow,c
MATCH (cdl:Crmdataload {name:'CommitCRM'})
// SET the timestamp for when this :Charge update procedure last ran
SET cdl.lastnewcharge=howsoonisnow
RETURN count(*)

(M. David Allen) #2

I can't provide much advice on specifically how to break this up because it depends on data volumes and stuff. But with periodic.iterate, what you want is a RETURN clause at the end of the chunk you're breaking up. This creates a stream of results, which is what you're going to be batching/chunking. Also, by having a RETURN clause you're being forced to name the things in your batch, so that when you write the second query (the "action on the batch") you're referring to a consistently named set of things.

A thing I'd try in your situation is to break your query into two chunks, the "batch read" and the "do a bunch of mutations based off of what came back. This part looks to me like the batch read:

MATCH (cdl:Crmdataload {name:'CommitCRM'})
OPTIONAL MATCH (lc:Charge)
WITH max(lc.createdon) as lastnewcharge,timestamp() AS howsoonisnow
WITH apoc.date.format(coalesce(lastnewcharge,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS lastnewcharge,howsoonisnow
CALL apoc.load.driver('com.extendedsystems.jdbc.advantage.ADSDriver')
call apoc.load.jdbc('jdbc:extendedsystems:advantage://commitcrm:6262;catalog=//commitcrm/CommitCRM/Db','SELECT RECID,SLIPDATE,WORKERID,CARDID,TICKETID,ITEMID,DESCRIPTION
,HOURSAMOUNT,QUANTITY,PRICE,TOTAL,CREATEDATE,UPDATEDATE,UPDATEUSER,CREATEUSER,USER1,CONTACTID,BCRECID,BCCODE,FROMTIME,TOTIME from slips WHERE CREATEDATE > \''+lastnewcharge+'\'') YIELD row

And the "do a bunch of mutations" is everything south of that, starting with MERGE which is a mutation.

After these various CALL statements, you probably just need to add "WITH a, b, c RETURN a, b, c" (whatever naming) -- and that concludes your data fetch query. The rest then can operate over that batch with the same naming.

Hope this helps


(Paul Drangeid) #3

Hmmm... I tried wrapping the query part as suggested (ending with YIELD ROW)
I end up with this error:

Neo.ClientError.Statement.SyntaxError: Procedure call inside a query does not support naming results implicitly (name explicitly using YIELD instead) (line 6, column 1 (offset: 332))
"CALL apoc.periodic.iterate(""

What is being referred to with the 'naming results implicitly'?


(Andrew Bowman) #4

This has to do with the variables YIELDed by a procedure call, and the only variables you can YIELD are defined by the procedure's signature (though you can alias them as you YIELD them).

You can use CALL apoc.help('periodic.iterate') to see the signature of the iterate call. You only need to YIELD a subset of those variables, minimum 1. I typically default to YIELD batches, total, errorMessages and then RETURN those variables on the next line.


(Paul Drangeid) #5

Ok, I added the YIELD batches,total and return them at the end. (btw - would be helpful if that syntax is included in the APOC user guide.... that is left off in those examples.)

I suspect where I may be getting hung up now is that my jdbc select string is concatenated using quoted variables, and then the entire query needs to be quoted again within the apoc.periodic.iterate function call, and my brain is not letting me figure out the dizzying escape single quote, double quote mess I've got myself in.


(Paul Drangeid) #6

I attempted to get around the quoting by pre-building the select string into a variable:

MATCH (cdl:Crmdataload {name:'CommitCRM'})
OPTIONAL MATCH (lc:Crmcharge)
WITH max(lc.createdon) as lastnewcharge,timestamp() AS howsoonisnow, apoc.date.format(coalesce(cdl.chargerefresh,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS refreshtime
WITH apoc.date.format(coalesce(lastnewcharge,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS lastnewcharge,howsoonisnow,refreshtime,'SELECT RECID,SLIPDATE,WORKERID,CARDID,TICKETID,ITEMID,DESCRIPTION
,HOURSAMOUNT,QUANTITY,PRICE,TOTAL,CREATEDATE,UPDATEDATE,UPDATEUSER,CREATEUSER,USER1,CONTACTID,BCRECID,BCCODE,FROMTIME,TOTIME from slips WHERE CREATEDATE < \''+lastnewcharge+'\' AND UPDATEDATE > \''+refreshtime+'\'' as thequery
CALL apoc.load.driver('com.extendedsystems.jdbc.advantage.ADSDriver')
CALL apoc.periodic.iterate("call apoc.load.jdbc('jdbc:extendedsystems:advantage://commitcrm:6262;catalog=//commitcrm/CommitCRM/Db',"+thequery+") YIELD row
","do a bunch of mutations"
,{batchSize:1000, parallel:true})YIELD batches,total
return batches,total

Now the error is Neo.ClientError.Procedure.ProcedureCallFailed: Failed to invoke procedureapoc.periodic.iterate: Caused by: org.neo4j.cypher.internal.util.v3_4.SyntaxException: Invalid input 'R': expected whitespace, comment, '{', node labels, MapLiteral, a parameter, a relationship pattern, '(', '.', '[', "=~", IN, STARTS, ENDS, CONTAINS, IS, '^', '*', '/', '%', '+', '-', '=', "&lt;&gt;", "!=", '&lt;', '&gt;', "&lt;=", "&gt;=", AND, XOR, OR, ',' or ')' (line 1, column 132 (offset: 131))


(Paul Drangeid) #7

OK - made progress with this.. hopefully this can help others. One problem was actually fairly simple,as I suspected one difficulty for me was getting my brain wrapped around the quoting, and I was just missing some simple syntax (of course!):

First to stop myself from going completely insane, I take the entire query statement (dynamically generated because I'm using a lookup to get a date field to BUILD the query.) and put it into a variable 'thequery'

Then within the periodic.iterate notice how 'thequery' gets added: Because periodic iterate needs thequery quoted, I have to open double quote, close single quote, add in the contents of 'thequery' open single quote, close double quote, finish the remainder of the apoc.load.jdbc, then close single quote, then move onto the 'do these changes' section of the periodic iterate call. Whew!

The other issue I ran into (not sure if this would be considered a bug) but the mutations part of the apoc.periodic iteration does NOT seem to support carriage returns. I perform several OPTIONAL MATCH and some FOREACH and for readability in my CYPHER script I have line returns between statements.

VisualStudio CODE properly interprets the entire statement as quoted, but if I paste it into the Neo4j browser, it ONLY actually performs the FIRST line (in my case a MERGE statement) and ignores the remaining cypher statements... so to make it work I had to remove all the linefeeds (which makes it VERY difficult to view/edit in the IDE, but will execute properly).

For reference this is running on Neo4j 1.4.9 and a TEST version of APOC (3.4.0.3) that fixed the advantage JDBC queries for me.

Finally, here's the working CYPHER:

MATCH (cdl:Crmdataload {name:'CommitCRM'})
OPTIONAL MATCH (lc:Crmcharge)
WITH max(lc.createdon) as lastnewcharge,timestamp() AS howsoonisnow, apoc.date.format(coalesce(cdl.chargelimiter,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS historylimit
WITH apoc.date.format(coalesce(lastnewcharge,0),'ms','yyyy-MM-dd HH:mm:ss.sss','CST') AS lastnewcharge,howsoonisnow,historylimit
WITH *,'SELECT RECID,SLIPDATE,WORKERID,CARDID,TICKETID,ITEMID,DESCRIPTION,HOURSAMOUNT,QUANTITY,PRICE,TOTAL,CREATEDATE,UPDATEDATE,UPDATEUSER,CREATEUSER,USER1,CONTACTID,BCRECID,BCCODE,FROMTIME,TOTIME from slips WHERE CREATEDATE > \''+lastnewcharge+'\' AND CREATEDATE > \''+historylimit+'\'' as thequery
CALL apoc.load.driver('com.extendedsystems.jdbc.advantage.ADSDriver')
CALL apoc.periodic.iterate('call apoc.load.jdbc("jdbc:extendedsystems:advantage://commitcrm:6262;catalog=//commitcrm/CommitCRM/Db","'+thequery+'")',
'MERGE (c:Crmcharge {recid:row.RECID}) ON CREATE SET c.date=row.SLIPDATE,c.description=row.DESCRIPTION,c.hours=row.HOURSAMOUNT,c.quantity=row.QUANTITY,c.price=row.PRICE,c.createdon=row.CREATEDATE,c.updatedon=row.UPDATEDATE,c.fromtime=row.FROMTIME,c.totime=row.TOTIME WITH * OPTIONAL MATCH (e:Crmemployee {acctrecid:row.WORKERID}) OPTIONAL MATCH (t:Ticket {recid:row.TICKETID}) OPTIONAL MATCH (i:Crmitem {recid:row.ITEMID}) OPTIONAL MATCH (uu:Crmemployee {acctrecid:row.UPDATEUSER}) OPTIONAL MATCH (cu:Crmemployee {acctrecid:row.CREATEUSER}) OPTIONAL MATCH (ct:Contact {acctrecid:row.CONTACTID}) OPTIONAL MATCH (bc:Contract {recid:row.BCRECID}) FOREACH (ignoreMe in CASE WHEN not(exists(e.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:ENTRY_FOR]->(e)) FOREACH (ignoreMe in CASE WHEN not(exists(t.recid)) THEN [1] ELSE [] END | MERGE (c)-[:PART_OF_TICKET]->(t)) FOREACH (ignoreMe in CASE WHEN not(exists(i.recid)) THEN [1] ELSE [] END | MERGE (c)-[:INCLUDES_ITEM]->(i)) FOREACH (ignoreMe in CASE WHEN not(exists(uu.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:LAST_UPDATED_BY]->(uu)) FOREACH (ignoreMe in CASE WHEN not(exists(cu.acctrecid)) THEN [1] ELSE [] END | MERGE (c)-[:CREATED_BY]->(cu)) FOREACH (ignoreMe in CASE WHEN not(exists(ct.recid)) THEN [1] ELSE [] END | MERGE (c)-[:CLIENT_CONTACT]->(ct)) FOREACH (ignoreMe in CASE WHEN not(exists(bc.recid)) THEN [1] ELSE [] END | MERGE (c)-[:UNDER_CONTRACT]->(bc))',{batchSize:500, parallel:true}) YIELD batches,total,timeTaken,committedOperations,failedOperations,failedBatches,retries,errorMessages,batch,operations,wasTerminated
WITH *
// SET the timestamp for when this :Crmcharge update procedure last ran
MATCH (cdl:Crmdataload {name:'CommitCRM'})
SET cdl.lastnewcharge=howsoonisnow
RETURN *