Parallel Cypher & Apoc

Hi all! We have a broad network, and when traversing the tree, we are attempting to leverage all cores on our machine, parallelising the process (to speed up results!). We have attempted apoc.cypher.mapParallel with little success, and while we can get apoc.periodic.iterate to run across all cores, its performance is still not improved over single threading.

I would love to know if its simply due to query structure (i.e. apoc.path queries should be in the outside statement, not inner) or if there is something else I am missing. The first of the two snippets below runs in parallel with little speed improvement, the second does not even have batch updates resulting.

    CALL apoc.periodic.iterate("
        MATCH (p:Vertex:Root)
    WITH collect(p) as endNodes
    MATCH (n:Vertex) WHERE NOT EXISTS(n.depth) 
    CALL apoc.path.expandConfig(n, {relationshipFilter:'<CONNECTS',
     limit:1, terminatorNodes:endNodes}) 
    YIELD path
    RETURN n, length(path) as depth
         ","
         SET n.depth = depth
         ", {batchSize:1000, parallel:true, iterateList:true, concurrency:70})</code>

        CALL apoc.periodic.iterate("
        MATCH (p:Vertex:Loc) 
        WITH collect(p) as endNodes
        MATCH (n:Vertex) WHERE NOT EXISTS(n.locs)
        CALL apoc.path.subgraphNodes(n, {endNodes:endNodes, relationshipFilter:'CONNECTS>', maxLevel:80}) YIELD node
        RETURN n, count(node.id) as locs, count(distinct node.dslam) as dslams
    ","
        SET n.locs = locs, n.dslams = dslams
        ", {batchSize:1000, parallel:true, iterateList:true, concurrency:70})</code>

Indexed on unique id of the nodes. Hope someone more adept at apoc & parallel cypher will be able to guide me in the right direction!!

Sam

Hi Sam, would you be able to share your model and ideally some of its characteristics like size and avg degree of nodes?
You mention it's a tree and I see there is a Root label, are there multiple trees?
Also you seem to be computing all paths from root nodes to a set of leaf nodes?
I'm trying to understand if there are potential model optimisations that can improve the perf of your query?

Cheers,

JB.

Hi Jesus, happy to.

Yes many trees but due to DQ we cant separate them prior (would have to run a connected components first which seems like unnecessary overhead). Longest tree is around 90 nodes deep, most indegrees are between 1-2 and same for out-degrees. It is around 7 million nodes and 7 mil relationships. Thus my disappointment at the time taken - its definitely not the biggest graph out there!

So the first query (happy to take a different more performant tack) is to identify loops and break the correct relationship (where parent "depth" is greater than child "depth" from the root). Thus all nodes need this identifier in this process. This query while still slow, is less heavy than the second.

The second is to identify how many leaves are under any given node. This is used for later calculations around weighted averages etc and are essential. The metric itself has value too, and needs to be on every node.

Hopefully that helps give some context - thanks for responding!

S

Would you have a test data generator to recreate the database?

Did you test the statements in isolation? and check their profile?

I'd be interested in the cardinalities of:

  • (p:Vertex:Root)
  • (n:Vertex) WHERE NOT EXISTS(n.depth)

Did you index :Vertex(depth)

It could help to set a different uniqueness, e.g. node-global or so.
I think most of the work is in the first query which would benefit form paralellizing.

Could you try this too:

CALL apoc.periodic.iterate("
    MATCH (p:Vertex:Root)
WITH collect(p) as endNodes
MATCH (n:Vertex) WHERE NOT EXISTS(n.depth) RETURN n, endNodes
     ","
CALL apoc.path.expandConfig(n, {relationshipFilter:'<CONNECTS',
 limit:1, terminatorNodes:endNodes}) 
YIELD path
WITH n, length(path) as depth
     SET n.depth = depth
     ", {batchSize:1000, parallel:true, iterateList:true, concurrency:70})</code>

Thanks Michael - no no test generator unfortunately, hasn't been on the priority list. By cardinalities do you mean count? The former is around 30k, the latter is just a catch for when I get halfway through a run and it crashes, I don't try to recalculate "depth". As this process is for setting "depth" I haven't indexed it no. The individual queries themselves do not run quickly enough for me to do an accurate Profile, I apologise. I have trialled the query you gave (with the path in the outer query) - it grows quickly then crashes.

However you may be able to help me with a separate approach. I have run Connected Components (Union Find) to create partitions, and run the basic statement in parallel across them. Would apoc.cypher.parallel with the partitions from the above be the most appropriate way? Or have I missed a better apoc process for parallelisation.

Cheers,
S

I saw the same issues with mapParallel(2) not scaling as much as I intended, I have to go back and investigate.

I really wonder why it "crashes", that rather sounds like a setup issue? What are your settings for heap and page-cache and how many CPUs do you have?

But really having a copy of the graph to reproduce would help a lot.

Hmm thats a shame, would love to switch to a partition parallelisation approach.

I would say its process in combination with setup - our setup is massive but the process seems to grow and grow exponentially very quickly. It is too small of a graph against 32/64gb of heap (done both), 72 cpus each with 16 core, plenty of ram etc to be braking that easily.

Yea apologies cannot spend the time manually recreating, but it is a tree generically, with some dual parentage, and loops (DQ issues which the above queries are trying to resolve). There are identifiable roots & leaves.

S

Samuel, thanks for raising this point. At the moment I am running the commands for creating relationships using the big Yelp database, as per the tutorial - The Neo4j Graph Data Science Library Manual v2.2 - Neo4j Graph Data Science - I had to make several adjustments to resize initial and max heap, and so far did not manage to complete the creation. One of the things that I am particularly curious is how cypher leverage multithreading processing, as my computer has 12 cores and during this operation it is only using two cores.

So, looking at the command below (Yelp example), how can we incorporate elements that triggers multithreading?

Best
Roberto Shimizu

CALL apoc.periodic.iterate("
CALL apoc.load.json('file:///dataset/business.json') YIELD value RETURN value
","
MERGE (b:Business{id:value.business_id})
SET b += apoc.map.clean(value, ['attributes','hours','business_id','categories','address','postal_code'],[])
WITH b,value.categories as categories
UNWIND categories as category
MERGE (c:Category{id:category})
MERGE (b)-[:IN_CATEGORY]->(c)
",{batchSize: 10000, iterateList: true});

If the json is one element per row it should parallelize.

It could be that some of your merge statements are waiting for each other, esp. shared categories and such.

You could try to import businesses first,
then categories
and then connect them by category in a third pass.