For each address, for a given entity (company) I would like to calculate the current bitcoin balance of an address.
To do this we must sum up the bitcoinValue property of all :Output nodes that belong to each :Address node via the [:LOCKED_BY] that do not have an [:UNLOCKED_BY].
My current workflow to do this is like so:
// set the query parameter first.
:params "entity" : "Binance"
Then run the query:
// match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity
// distinct must be included or the query will run for a very long time
with distinct a
// match outputs locked by address
match (a)<-[:LOCKED_BY]-(o: Output)
// exclude those outputs that have been subsequently spent
where not (o)-[:UNLOCKED_BY]->()
return
a.address as address,
round(sum(o.bitcoinValue)*100000000)/100000000 as balance
This works fine, but for a large number of addresses it can take some time. Is there a way to parallelise this using apoc.cypher.parallel() or some other apoc query? There wasn't documentation for these functions that I could find.
I found the answer was to use apoc.cypher.mapParallel2(). The documentation was a little hard to understand because there were no examples, but this reduced the run-time of the above query from about 12 seconds for 300k addresses to around 2 seconds. We have longer queries where this technique is proving useful, but this is what I did:
# match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity
# mapParallel will iterate over a list, so we `collect`
with collect(distinct a) as addresses
# The first argument is the cypher code we want to run as a string
# The second argument is a map of parameters, e.g. {parallel: true}
# The third argument is the list to iterate over
# (in this example `addresses` from above)
# The fourth argument is an integer to split the list into partitions,
# (though I'm not sure how this relates to the batchSize or concurrency
# parameters from the third argument)
CALL apoc.cypher.mapParallel2("
optional match (_)<-[:LOCKED_BY]-(o: Output)
where not (o)-[:UNLOCKED_BY]->()
return _.address as address,
round(sum(o.bitcoinValue)*100000000)/100000000 as balance"
, {parallel:True, batchSize:1000, concurrency:20}, addresses, 20) yield value
# Extract the columns we want to return from the list (returned by `yield value`)
return
value.address as address,
value.balance as balance
Hope this is helpful to someone and open to suggestions if this is not the correct way to use parallelism.