Making a simple linked (:NEXT relations) list should be just that, but isn't. How can I achieve such a thing?
In the test code below the 'unwind as event' part is there to resemble what's coming from a Neo4j kafka sink connector, and problem seems to be that unwind is not acting in a sequential manner (i.e. creating the TST nodes, checks and sets 'last' parameter for then in next run links up previous last with new by :NEXT.
The problem is contrived in its simplest form by only trying to link :TST (tag) nodes. Target is to achieve :NEXT links of both these and the 'c.e' ones
Any tips as to how one would go about making such a thing?
unwind [{tag:'123', c:[{e:1}]},{tag:'234', c:[{e:2},{e:3}]},{tag:'345',c:[{e:5}]},{tag:'456',c:[{e:4},{e:6}]}] as event
merge (p:TST {tag:event.tag})
with p, event
match (h:P) where h.n='n'
merge (h)-[:RELATES_TO]->(p)
with p, event, h
optional match (h)-[:RELATES_TO]->(p_last:TST) where p_last.last=TRUE
foreach(_ in case when exists(p_last.last)
THEN [1] ELSE [] END |
set p_last.last=FALSE
merge (p_last)-[:NEXT]->(p))
set p.last=TRUE
I'm not quite following your example Cypher, and I'm not quite sure I fully understand what you're trying to do either, is there some way you can make a simpler example, and also include desired output (or the desired graph when it's done)?
Answering my own question - using apoc.periodic.iterate to overcome the non-sequential 'problem' of unwind I managed to get the :NEXT list working
unwind [{tag:'123', c:[{e:1}]},{tag:'234', c:[{e:2},{e:3}]},{tag:'345',c:[{e:5}]},{tag:'456',c:[{e:4},{e:6}]}] as event
with collect(event) as events
CALL apoc.periodic.iterate("unwind $events as event return event",
"merge (p:TST {uuid:apoc.create.uuid()}) set p.tag=event.tag with p, event match (h:P) where h.n='n' merge (h)-[:RELATES_TO]->(p) with p, event, h optional match (h)-[:RELATES_TO]->(p_last:TST) where p_last.last=TRUE foreach(_ in case when exists(p_last.last) THEN [1] ELSE [] END | set p_last.last=FALSE merge (p_last)-[:NEXT]->(p)) set p.last=TRUE", {batchSize:1, parallel:false, params:{events:events}}) YIELD batches, total, errorMessages
return total
Your suggestions may work for this particular and simple case, but please remember that this is meant to cover a streaming solution in which data is coming in from a Kafka connect sink and as such I'm bound by the unwind batch paradigm. The :NEXT lists are continuously, and forever, growing.
Then you'll need to make sure there is only ever one thread / tx consuming from the sink, as concurrent queries will likely encounter race conditions when updating the links. If you can't guarantee that only a single transaction will be updating the list at the same time, then you need to synchronize your updates as mentioned in the article.
If you're appending to the end of the list, then the approach would be to match/create the nodes, collect and link them (you could use apoc.nodes.link for this), then match to the tail of the list where you want to append (you should probably have a listhead node or similar to maintain relationship links to the head and tail of the list for fast access to the tail), create the relationship from the tail to the head of the just-created linked list, delete the relationship from the list head to the old tail, and create a new relationship from the list head to the new tail.
And again, if you need to prevent race conditions from concurrent operations, you need to synchronize/lock on some common node (the listhead node, if you have one) such that only one transaction is updating the list at a time.
Guess I'll have to revert to your suggestions as nested apoc.periodic.iterate does not work - i.e. code below hangs forever
unwind [{tag:'123', c:[{e:1}]},{tag:'234', c:[{e:2},{e:3}]},{tag:'345',c:[{e:5}]},{tag:'456',c:[{e:4},{e:6}]}] as event
with collect(event) as events
CALL apoc.periodic.iterate("unwind $events as event return event", "merge (p:TST {uuid:apoc.create.uuid()}) set p.tag=event.tag with p, event match (h:P) where h.n='n' merge (h)-[:RELATES_TO]->(p) with p, event, h optional match (h)-[:RELATES_TO]->(p_last:TST) where p_last.last=TRUE foreach(_ in case when exists(p_last.last) THEN [1] ELSE [] END | set p_last.last=FALSE merge (p_last)-[:NEXT]->(p)) set p.last=TRUE with p, event call apoc.periodic.iterate('unwind $event.c as s return s, $p as p','merge (c:Child uuid:apoc.create.uuid()) on create set c.e=s.e merge (p)-[:HAS_CHILD]->(c) optional match (p)-[:HAS_CHILD]->(c_last:Child) where c_last.last=TRUE foreach(ignoreMe in case when exists(c_last.last) THEN [1] ELSE [] END | set c_last.last=FALSE merge (c_last)-[:NEXT]->(c)) set c.last=TRUE', {batchSize:1, parallel:false, params:{event:event, p:p}}) YIELD batches, total, errorMessages return 0", {batchSize:1, parallel:false, params:{events:events}}) YIELD batches, total, errorMessages
return 0