Using parallel queries to sum value of bitcoin outputs connected to bitcoin address nodes

performance
cypher
(Simon) #1

We have imported the bitcoin blockchain into a neo4j graph database. The DB schema looks like this:

For each address, for a given entity (company) I would like to calculate the current bitcoin balance of an address.

To do this we must sum up the bitcoinValue property of all :Output nodes that belong to each :Address node via the [:LOCKED_BY] that do not have an [:UNLOCKED_BY].

My current workflow to do this is like so:

// set the query parameter first.
:params "entity" : "Binance"

Then run the query:

// match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity
// distinct must be included or the query will run for a very long time
with distinct a

// match outputs locked by address
match (a)<-[:LOCKED_BY]-(o: Output)
// exclude those outputs that have been subsequently spent
where not (o)-[:UNLOCKED_BY]->()

return
  a.address as address,
  round(sum(o.bitcoinValue)*100000000)/100000000 as balance

This works fine, but for a large number of addresses it can take some time. Is there a way to parallelise this using apoc.cypher.parallel() or some other apoc query? There wasn't documentation for these functions that I could find.

Many thanks.

0 Likes

(Simon) #2

I found the answer was to use apoc.cypher.mapParallel2(). The documentation was a little hard to understand because there were no examples, but this reduced the run-time of the above query from about 12 seconds for 300k addresses to around 2 seconds. We have longer queries where this technique is proving useful, but this is what I did:

# match addresses labelled with the entity of interest
match (a :Address)-->(e :Entity)
where e.name = $entity

# mapParallel will iterate over a list, so we `collect`
with collect(distinct a) as addresses

# The first argument is the cypher code we want to run as a string
# The second argument is a map of parameters, e.g. {parallel: true}
# The third argument is the list to iterate over
# (in this example `addresses` from above)
# The fourth argument is an integer to split the list into partitions,
# (though I'm not sure how this relates to the batchSize or concurrency
# parameters from the third argument)
CALL apoc.cypher.mapParallel2("
   optional match (_)<-[:LOCKED_BY]-(o: Output)
   where not (o)-[:UNLOCKED_BY]->()
   return _.address as address,
   round(sum(o.bitcoinValue)*100000000)/100000000 as balance"
, {parallel:True, batchSize:1000, concurrency:20}, addresses,  20) yield value

# Extract the columns we want to return from the list (returned by `yield value`)
return
  value.address as address,
  value.balance as balance

Hope this is helpful to someone and open to suggestions if this is not the correct way to use parallelism.

0 Likes