Large Batch Job - Help would be incredibly appreciated

Hello everyone,

I'm fairly new to Neo4j and Cypher. I'm enjoying the journey of learning and the support this community provides very much. It's incredible how much documentation exists and for that I'm very grateful. I'm starting to step into uncharted territory and I'm having trouble with some of the more advanced queries, relatively speaking.

I've been able to experiment with all sorts of Python code and have written a few helper libraries for my project. In short, I'm trying to run a large batch job (Many GB of data). I've prepopulated the database using the CSV import tools via command line which worked quite well. I'm now at the data analysis / processing step in my project and I thought my queries were going well until I discovered that in fact I pretty much ruined my dataset by carelessly performing two UNWIND operations back to back. That's no biggie as I have the VM snapshotted. Here was my query:

UNWIND [['a1','b2'],['b2','b3']] as datapair
UNWIND datapair as dataelement
MATCH (n:some_node_label {some_property: dataelement})-->(c:cluster_node)
WITH DISTINCT(c.cluster_id) as cids,c
UNWIND cids as cid
MATCH (c)<--(a:random_node)
WITH count(a) as c_count,c
ORDER BY c_count DESC
WITH collect(c) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN "success" as status

Basically what I was wanting to do is pass in thousands of these pairs in a single query, but process each of them independently from one another. They are in no way related to one another. Insert the sigh - Well, this of course "UNWOUND" all of the elements within each pair and merged every single one of them together toward the end of the query. In lieu of thousands of individual queries (in serial mind you) I was hoping to process all of my queries in a single request via a for loop of some sort (which I now know isn't supported). I also tried the foreach hack but discovered that APOC is unable to run from within there - I'm also not sure how or if it would be helpful.

I've even tried experimenting with py2neo and using tx.append, but that not longer seems to be supported, nor am I certain it would help. Is there an efficient way of sending a list of queries for the server to process without having to make a separate call each time? OR perhaps is there a way to send a list of queries for the server to process in parallel via multiple sessions? From having worked in Python so far, I'm not having much luck in terms of figuring out how to speed up or parallel process my requests on the server.

For now I'm stuck having to issue thousands of these Cypher queries via a single Python Bolt session which is incredibly slow. The query essentially figures out which of the two cluster_nodes are the largest and merges the smaller one into the larger one. I have verified that this part works by looking at not only the count of relationships but also the ID's internal to Neo4J. The node with the largest number of relationships persists.

UNWIND ['a1','b2'] as dataelement
MATCH (n:some_node_label {some_property: dataelement})-->(c:cluster_node)
WITH DISTINCT(c.cluster_id) as cids,c
UNWIND cids as cid
MATCH (c)<--(a:random_node)
WITH count(a) as c_count,c
ORDER BY c_count DESC
WITH collect(c) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN "success" as status

I would be incredibly appreciative of any help or guidance. Happy to pay it forward when the right time comes.

Regards,
Al

@stefan.armbruster @michael.hunger

1 Like

I think what you need to do is to continue to pass through the "dataelement" as grouping key. So it always groups by that.
I don't fully understand what you do with the cluster id's you should also keep the cluster id as grouping.

UNWIND $datalist as datapair
UNWIND datapair as dataelement
MATCH (n:some_node_label {some_property: dataelement})-->(c:cluster_node)
// do the ordering of cluster nodes by degree first
WITH * ORDER BY size( (c)<--(:random_node) )  DESC
// keep the pair and cluster id as grouping keys
WITH datapair, c.cluster_id as cids, collect(c) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN count(*) as updates

For large updates you can either pass those lists of pairs in chunks via the python driver as parameters, the chunk size would be determined by how many update operations (the merge nodes) you are doing per chunk, usually it should be between 10k and 50k updates per tx aka chunk.

Otherwise you can also move the same into an apoc.periodic.iterate, which auto-batches the transactional updates.

call apoc.periodic.iterate("
UNWIND $datalist as datapair RETURN datapair
","
UNWIND datapair as dataelement
MATCH (n:some_node_label {some_property: dataelement})-->(c:cluster_node)
// do the ordering of cluster nodes by degree first
WITH * ORDER BY size( (c)<--(:random_node) )  DESC
// keep the pair and cluster id as grouping keys
WITH datapair, c.cluster_id as cids, collect(c) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN count(*) as updates
", {batchSize: 1000})
1 Like

Thank you very much for the response @michael.hunger. Sorry for any confusion surrounding the cluster ID's. I'll try to provide more context. Before I do, would your example merge all of the dataelement within datapair within $datalist all into the same cluster? My original challenge was that Cypher recognized my query as a single statement and ended up merging all of the elements within each of the UNWIND lists. I was really hoping that the data pairs within $datalist would be treated as individual and independent data pairs, similar to a for-loop.

  1. For example:
    Where a and c are two different node types.
    a types have a single connection to a c type
    c can have many connections from different types of a
    This is a hub and spoke model if you will.

  2. node a1 is connected to c1
    node a2 is connected to c2

  3. Out of a1 and a2, which c node has the most number of a nodes connected to it? Merge the c node with the lower number of a type relationships, into the c node with the higher number of a type relationships. In the end, if c2 has the most number of relationships, both a1 and a2 will point to c2 and c1 will be discarded entirely.

I'm hoping to pass in a number of unrelated individual lists which are processed independently of one another. The goal is not to merge all of the elements within the main list into one another.

After testing the query you provided I found that collect(c) as nodes returned only an individual node.

UNWIND $datalist as datapair
UNWIND datapair as dataelement
MATCH (n:some_node_label {some_property: dataelement})-->(c:cluster_node)
// do the ordering of cluster nodes by degree first
WITH * ORDER BY size( (c)<--(:random_node) )  DESC
// keep the pair and cluster id as grouping keys
WITH datapair, c.cluster_id as cids, collect(c) as nodes
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN count(*) as updates

Results in:
image

In the example above, two different datapairs were included in the original query, namely [['a1','a2'],['a3','a4']]. The ordering for the nodes however was correct from top to bottom. Where CID-2 has the most number of nodes, same as with CID-3 for each of the datapairs respectively.

Ideally nodes would contain the following (Shortened version), which would also be correct ordering.

row 1: CID-2, CID-1
row 2: CID-3, CID-4

Right but you said you don't want to group the nodes just by cluster but by dataelement too !!

If you group by cluster only you should get larger groups per cluster.

WITH c.cluster_id as cids, collect(c) as nodes // removed datapair

Thank you for your help. In this example, all elements are merged into a single cluster. That's not the desired outcome. Since posting this question, I think maybe WCC / Union Find might be a better solution. I'll post a new question for it since it differs from this particular thread quite a bit.

In thinking about this more, I could have likely explained it better. The first list is a list of lists that I was hoping to process independently of the next list in the batch job. When two UNWIND's are performed in sequence, it ends up merging all of the nodes within the list of lists which is the problem.

I ended up figuring out exactly what I was looking for. The two UNWIND operations are unnecessary. Only one is needed. Excluding ordering, here is a short version of the cypher query I'll end up using. It's still not the most optimal and I'll definitely take a look at the example query you provided in demonstrating the use of apoc.periodic.iterate. Thank you for all of your help!!

UNWIND [['a1','b2'],['b2','b3']] as row
MATCH (n:some_node_label)-->(c:cluster_node)
WHERE n.cluster_id IN row
WITH collect(c) as nodes,row
CALL apoc.refactor.mergeNodes(nodes, {properties:'discard'})
YIELD node
RETURN *