So the purpose of apoc.periodic.commit()
is to execute the same query over and over until it returns 0. The prototypical use case is something like this, say for deleting all nodes of a certain label:
CALL apoc.periodic.commit("
MATCH (n:ToDelete)
WITH n LIMIT {limit}
DETACH DELETE n
RETURN count(n)", {limit:1000})
Note that the source set (:ToDelete nodes) are diminishing with each run until there's none left, so this is really made to execute when the source is diminishing. If we used this in some other case instead, such as when we need to filter by a property:
CALL apoc.periodic.commit("
MATCH (n:Node)
WHERE n.toDelete = true
WITH n LIMIT {limit}
DETACH DELETE n
RETURN count(n)", {limit:1000})
This is far less efficient. We'll match to and filter out nodes until we find our first batch of 1000, delete them, and then start over from the very beginning and match to (and filter out) the same nodes we filtered out previously, and then matching out further to more nodes until we get our next batch of 1000...and then we start over matching from the beginning again. It starts over from a fresh MATCH with each batch. (For this case using apoc.periodic.iterate()
would be preferred instead of commit()
)
That's why your query doesn't work without a node holding an offset value (which you use and then update), apoc.periodic.commit() isn't meant to work with these kind of offset/paging queries without some way to store and update an offset as you're doing.
If you had a good way to stream all of the json in a single call (streaming it, not needing to use a limit and offset) then you could use apoc.periodic.iterate()
, which streams in results, pausing with each batch to execute the update query for the batch, then continuing to stream (where it left off) until it has another batch to update.
As to what's going on with updating your offset, this is because of the cardinality of your query: operations in Cypher produce rows, and they execute per row.
You've already made a limited call to get JSON yielding 100 rows of values. Then you UNWIND value.item (I don't know how many items are supposed to be per row, but this multiplies out your rows accordingly), creating your :actiCustomer
nodes, then limit those to 100 (I don't know why), then for each of those 100 rows you MATCH to your single :ImportHelper node, and then your SET is run for each of those 100 rows, updating for a sum of 100 (it used to be a sum of 10000 when you had + 100 before, since that was being executed 100 times).
To improve this, you should probably be using FOREACH instead of UNWIND. Remember UNWIND multiplies out your rows (it produces a row for every element of the unwound list), it isn't a looping construct (it only appears that way since Cypher operations execute per row). If you use FOREACH instead, the Cypher inside the FOREACH will execute per element of the list, but it won't increase cardinality, you'll still have your original 100 rows).
Then you can perform your count aggregation, which gives you a single row with the count, and you can update your import helper, which will happen only that one time (since you have only a single row). Then you return the (rounded) count. (Also, you already matched on your importHelper, it's in scope, so no need to match to it again).
CALL apoc.periodic.commit("
MATCH (importHelper:ImportHelper)
CALL apoc.load.jsonParams('https://URL/api/v1/customers?limit=100&offset='+ importHelper.offset,{Authorization:$Auth},null)
YIELD value
FOREACH(customer IN value.items |
create (actiCust:actiCustomer{name:customer.name}) )
WITH importHelper, count(value) as count
SET importHelper.offset = importHelper.offset + count
RETURN CASE WHEN count <100 THEN 0 ELSE count END AS count
",{limit:1000})