How best to do parallel processing

We have a lot of (n1:EffortUser)-[r1:EFFORT]->(n2:EffortObject) that need to be counted by day and week, i.e. how many EffortObject:Email did a EffortUser SENT ... if you have a lot of users and emails that can take quite some time so we would like to parallelize this query ...

Right now we are using ...

match(n1:EffortUser)-[r1:EFFORT]-(n2:EffortObject) 
where r1.Effort = 'yes' and r1.TimeEvent>='2017-01-01' and r1.TimeEvent<='2017-12-31'
return distinct n1.Name as User, date(datetime(r1.TimeEvent)) as date, count(distinct r1.IdUnique) as count
order by user, date

There seem to be a few options to parallelize / optimize this but all are rather poorly documented so I need some help please!

I did a bit of research and found the following APOC functions but try as I might I cannot get them to work (and I could not find much here or on Stackoverflow either) ... could someone PLEASE provide some guidance on which of the below options is best incl. an example using the above sample code? This is driving me nuts ... we have 4 cores and 32 GB of memory so this should run pretty fast but I just cannot get it to work ...

https://neo4j.com/docs/labs/apoc/current/cypher-execution/

CALL apoc.cypher.runMany('cypher;\nstatements;',{params},{config})

runs each semicolon separated statement and returns summary - currently no schema operations

CALL apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value

executes fragment in parallel batches with the list segments being assigned to _

https://neo4j.com/docs/labs/apoc/current/cypher-execution/running-cypher/

apoc.cypher.mapParallel(fragment :: STRING?, params :: MAP?, list :: LIST? OF ANY?) :: (value :: MAP?)

apoc.cypher.mapParallel(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _

apoc.cypher.mapParallel2(fragment :: STRING?, params :: MAP?, list :: LIST? OF ANY?, partitions :: INTEGER?, timeout = 10 :: INTEGER?) :: (value :: MAP?)

apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _

apoc.cypher.parallel(fragment :: STRING?, params :: MAP?, parallelizeOn :: STRING?) :: (value :: MAP?)

apoc.cypher.parallel2(fragment :: STRING?, params :: MAP?, parallelizeOn :: STRING?) :: (value :: MAP?)

apoc.cypher.runMany(cypher :: STRING?, params :: MAP?, config = {} :: MAP?) :: (row :: INTEGER?, result :: MAP?)

apoc.cypher.runMany('cypher;\nstatements;',{params},[{statistics:true,timeout:10}]) - runs each semicolon separated statement and returns summary - currently no schema operations

I don't think adding a query profile etc as per community guidelines is useful here but please correct me if I'm wrong. Any help here will be greatly appreciated so happy to add whatever info is required.

Before thinking around parallization I'd first revisit the data model:
Consider to embed the relationship property filter r1.Effort and r1.TimeEvent into the relationship type. Not sure about sematics of your Effort property.

Embedding time is a good idea if you always to queries on fixed time ranges. If you always query for the same year, you could have :EFFORT_2017 instead of :EFFORT.

With that in place your MATCH will be way more selective and you don't have to filter out that much.

Once done with this, I'd probably use apoc.cypher.mapParallel2. Note I haven't actually tested the snippet below, it should more outline the idea:

MATCH (u:EfforUser) WITH collect(u) AS users
CALL apoc.cypher.mapParallel2(
 "MATCH (_)-[r:EFFORT_2017]-(n:EffortObject) 
  RETURN _.name as user, date(datetime(r.TimeEvent)) as date, count(distinct r.IdUnique) as count",
  {}, users, 4) YIELD value
RETURN value.user as user, value.date as date, value.count as count

The _ refers to the current iteration.

1 Like

Danke Stefan und frohes Neues!

Thanks to your example above as well as @simon's post here Using parallel queries to sum value of bitcoin outputs connected to bitcoin address nodes which I found a little later I got the following apoc.cypher.mapParallel2() Cypher to work ...

MATCH (u:EffortUser) 
WITH collect(u) AS users 
CALL apoc.cypher.mapParallel2("
	MATCH (_)-[r]-(:EffortObject) 
    WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31' 
    RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
    {}, users, 4) YIELD value 
RETURN value.user as user, value.date as date, sum(value.count) as count 
ORDER BY user, date

Regarding r.Effort = 'yes', I checked and unfortunately we do need this additional filter as not all relationships between EffortUser and EffortObject are true 'effort'. Either way, the query with r.Effort = 'yes' takes 16,622 ms and without 16,429 ms so not a big enough difference to worry about this for now.

Regarding your suggestion of ...

Embedding time is a good idea if you always to queries on fixed time ranges. If you always query for the same year, you could have :EFFORT_2017 instead of :EFFORT .

... I decided against implementing this as the min max TimeEvent value inputs are very fluid so not sure this would do us much good, besides the above query only took 16,622 ms to complete which is awesome! I'm making a note of this though to revisit it later, there might be something there worth exploring when we need those extra few ms :slight_smile:

Anyway, the basic query is working now, which is great, however when I start customizing it the results quickly become erratic and I'm not sure why - most likely operator error but I can't figure it out. Here are the main two issues I'm grappling with:

Issue 1: Query fails when making minor Cypher changes

I'm only removing the and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31', everything else stays the same (see below) but now the query fails and for some reason returns no results.

MATCH (u:EffortUser) 
WITH collect(u) AS users 
CALL apoc.cypher.mapParallel2("
	MATCH (_)-[r]-(:EffortObject) 
    WHERE r.Effort = 'yes' 
    RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
    {}, users, 4) YIELD value 
RETURN value.user as user, value.date as date, sum(value.count) as count 
ORDER BY user, date

Here are the PROFILE results ...

Right around the same time my CPU started going to 100% even though the previous queries that returned results never really exceeded 25% (see below).

To me this looks like I'm somehow overloading the process by removing the date range filter? Can that be? Are there any additional settings I should make to avoid this? I noticed your sample query has {} but I saw @simon using some additional parameters here such as {parallel:true, batchSize:1000, concurrency:4}.

Issue 2: Additional filter settings are not working but should

This might be related to issue 1 above but I wasn't sure so thought I'd outline it anyway - as this is the real challenge to productionize this query for our GraphQL API setup.

Our graph really looks like this ...

(:Employee)-[]-(:EffortUser)-[]-(:EffortObject)

... so one employee can have multiple users across multiple platforms. Looks a bit like this from left to right, i.e. Employee LINKED_TO one EffortUser (in reality there are multiple here) and multiple EffortObject nodes (emails in this case).

When trying to incorporate the above structure into the Cypher query I have the same issue as in suddenly the query returns no results even though it should as the link between Employee 'cbartens' and EffortUser clearly exists (see graph example above) ...

MATCH (e:Employee)-[]-(u:EffortUser) 
WHERE e.Name = 'cbartens' 
WITH collect(u) AS users 
CALL apoc.cypher.mapParallel2("
	MATCH (_)-[r]-(:EffortObject) 
    WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31' 
    RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
    {}, users, 4) YIELD value 
RETURN value.user as user, value.date as date, sum(value.count) as count 
ORDER BY user, date

Now maybe that is related to issue 1 above as in the additional filter makes the query more complex and overloads it but I just don't know. Any ideas?

I tried different Cypher variations (example below) but the outcome is always the same, query returns no results even though it should, i.e. the nodes and relationships definitely exist.

MATCH (e:Employee) 
WHERE e.Name = 'cbartens' 
WITH collect(e) AS employees 
CALL apoc.cypher.mapParallel2("
	MATCH (_)-[]-(:EffortUser)-[r]-(:EffortObject) 
    WHERE r.Effort = 'yes' and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31' 
    RETURN _.Name as employee, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as count",
    {}, employees, 4) YIELD value 
RETURN value.employees as employees, value.date as date, sum(value.count) as count 
ORDER BY employees, date

My suspicion is that there might be relationships that don't have a IdUnique property (the MATCH you're doing delivers all relationships independent on their type).
In that case the inner statement might break on count(distinct r.IdUnique). Maybe replace it with a count(distinct coalesce(r.IdUnique, "n/a")) or filter them out before using WHERE r.IdUnique is not null

1 Like

Good thinking ... let me check ... no, sorry, that's not it I think, see below.

same with r.TimeEvent ?

1 Like

That must be it ... see below ... the last 4 are Effort='yes' relationships

Do I just add where not r.TimeEvent is null to the filter?

@stefan.armbruster this query here works now but it only returns results by EffortUser, I cannot figure out how to pass the Employee.Name into the results as well.

MATCH (e:Employee)-[]-(u:EffortUser) 
WHERE e.Name = 'cbartens' 
WITH collect(u) AS users 
CALL apoc.cypher.mapParallel2("
	MATCH (_)-[r]-(:EffortObject) 
    WHERE r.Effort = 'yes' and not r.TimeEvent is null and r.TimeEvent >= '2016-01-01' and r.TimeEvent <= '2018-12-31' 
    RETURN _.Name as user, date(datetime(r.TimeEvent)).year as date, count(distinct r.IdUnique) as events",
    {}, users, 4) YIELD value 
RETURN value.user as user, value.date as date, sum(value.events) as events 
ORDER BY date, user

Also, every time I try something that does not work the server kind of enters a DoS ... i.e. the CPU goes to 100% and the instance becomes unresponsive and has to be restarted ... is there any way to avoid this? It kind of makes trial and error very difficult to get the query right.


Would it be possible to wrap the above statement into the following?

apoc.cypher.runTimeboxed(cypher :: STRING?, params :: MAP?, timeout :: INTEGER?) :: (value :: MAP?)

apoc.cypher.runTimeboxed('cypherStatement',{params}, timeout) - abort kernelTransaction after timeout ms if not finished

Only other thing I found is this https://github.com/neo4j/neo4j-javascript-driver/issues/318 but I don't think this helps given I use Neo4j Desktop.

You can try to move the first traversal into the inner statement:

MATCH (e:Employee)
WITH collect(e) as employees
call apoc.cypher.mapParallel2("match (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) ....

I'd also add relationship directtion > or <.

If you database seems to become unresponsive it's maybe excessive heap usage due to the `collect´. Do you see any suspicious messages in debug.log?

@stefan.armbruster some more news from the parallel query front ...

Regarding issue 1 - CPU goes to 100% and instance becomes unresponsive

This still happens, no idea why. I will check the logs when I have some time. Would be good to find out - or even better if one could wrap the parallel function into some other function that prevents this from happening. Reality is it's unlikely that a normal user would get these queries right on the first try so it involves a lot a trial and error and having to shut down the instance every so often as it became unresponsive it not making that any easier, not to mention having to apologize to colleagues :slight_smile:

Regarding issue 2 - Success, I now have a query that delivers what I need

After some playing around I settled on the following query which works fine.

MATCH (e:Employee)-[:LINKED_TO|:MATCHED_TO]->(u:EffortUser)
WHERE e.Name = 'cbartens'
WITH collect(u) AS items 
CALL apoc.cypher.mapParallel2("
	OPTIONAL MATCH (_)-[r2]-(o:EffortObject) 
    WHERE r2.Effort = 'yes' and not r2.TimeEvent is null and r2.TimeEvent >= '2017-10-01' and r2.TimeEvent <= '2017-12-31' 
    RETURN date(datetime(r2.TimeEvent)) as date, count(distinct r2.IdUnique) as events",
    {parallel:True, batchSize:2000, concurrency:20}, items, 4) YIELD value 
RETURN value.date as date, sum(value.events) as events 
ORDER BY date

Some key observations that might help others:

  • The parallel query is about 3x times faster than the normal query, 5s vs 15s
  • Using MATCH inside the parallel query does not work but OPTIONAL MATCH does
  • The parallel query does not seem to be able to cope with longer MATCH paths, i.e. (_)-[r2]-(o:EffortObject) works but (_)-[r1]-(u:EffortUser)-[r2]-(o:EffortObject) does not
  • The best results are achieved by splitting the MATCH between the normal and parallel query, i.e. the complete MATCH phrase is (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser)-[r]-(:EffortObject) so in my case the normal part of the query takes care of (:Employee{Name:'cbartens'})-[:LINKED_TO|:MATCHED_TO]->(:EffortUser) the results of which are handed off to the parallel query as (_) and then the parallel query does the rest as in (:EffortUser)-[r]-(:EffortObject) respectively (_)-[r]-(:EffortObject)
  • Using RETURN DISTINCT inside the parallel query can break the query
  • The variables you add to the parallel query RETURN part matter a lot, to the point where adding or removing the wrong variable can break the entire query
  • The parallel query config part {parallel:True, batchSize:2000, concurrency:20} does impact the query and shaves another 1s off it reducing it from 5s to 1s and batchSize:2000 and concurrency:20 seem to be the optimal settings, anything higher or lower than that and the query time increases again
  • But the most important observation I made is that the parallel query returns slightly different results to the normal query, details see table below - for us I consider this delta to be low enough to be useable but I observed a max 28% difference on one day which needs monitoring

If you spot anything in the above that makes no sense or can be optimized further please let me know! I'd be especially interested to hear any tips about making sure that the parallel query does not return different results ... that is a bit worrying if you ask me (see below).

Thanks for the great feedback. Would you mind adding this as github issue as well. Otherwise I see the danger of stuff getting lost. Thanks in advance.

No worries, happy to @stefan.armbruster. See https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373. And thanks again for your help.

1 Like

I've been trying to use apoc.cypher.mapParallel2() with the latest Neo4j and APOC versions and clearly observed the issue with it randomly returning partial or no results. I put more details into the GitHub issue: https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373#issuecomment-573161323

1 Like

Update - After running and comparing many more standard vs parallel queries I'm pretty sure the partial results issue is limited to querying / counting relationships. Whenever I count nodes the counts returned by the parallel query always match the standard query 100%.

I've observed this problem on counting nodes as well: https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/1373#issuecomment-576059374