Operating in batches on a massive list

apoc
(Simon) #1

We have imported the bitcoin blockchain into neo4j. I am trying to find the balance of every single bitcoin address on the network, however there are around 470 million addresses.

A transaction sends bitcoin to addresses, which may then send that bitcoin on in a new transaction.
From the left-most red (:Transaction) node, we see two outputs created, each belonging to a seperate (:Address) node in blue. One of these outputs is then spent in an onwards transaction, whilst the other is still locked to the address.


To find the balance of a given address we can use the following query:

MATCH
  (a :Address)<--(o :Output)
WHERE a.address = "someBitcoinAddress"
AND NOT (o)-[:UNLOCKED_BY]->()
RETURN
  a.address as Address,
  sum(o.bitcoinValue) as balance

This works great for one address, but with 470 million addresses and around 1.5 billion outputs we start to see problems with DB responsiveness. Is there a good way to split a massive list into batches, operate on the batches and preferably stream results to csv in those batches as well? This is as far as I got, but I'm not sure this is doing what I expect as the DB still grinds to a halt:

// match all address nodes
MATCH
  (a :Address)
WITH
  collect(DISTINCT a) AS addresses
// try and batch the large list into chunks
CALL
  apoc.coll.partition(addresses,1000000) YIELD value AS addressBatches
// work on each chunk
UNWIND
  addressBatches AS batch
// apoc.mapParallel is expecting a list so collect the batch into a list
WITH
  collect(batch) as addresses
// run multi-threaded cypher statement on this batch
CALL
  apoc.cypher.mapParallel2(
    "
    MATCH (_)<--(o :Output)
    WHERE NOT (o)-[:UNLOCKED_BY]->()
    RETURN
      _.address AS Address,
      round(sum(o.bitcoinValue)*100000000)/100000000 AS Balance
    ",
    {parallel:True},
    addresses,
    24,
    7200
  ) YIELD value
RETURN
  value.Address,
  value.Balance

Does anyone have any suggestions of how I could do this more efficiently?

Many thanks,

Simon

0 Likes

(M. David Allen) #2

Consider trying apoc.periodic.iterate. I think it's going to do something similar but give you better control over your code flow. You want to stream all of the addresses to it, it will divide into batches for you, and give you parallelism control.

Also understand with the bitcoin dataset that you're going to want as much RAM and page cache as you can afford. Since you're calculating balances for every address, this is going to require pretty much the entire database, which may end up with you loading and reloading a lot of data from disk again and again into your page cache. The bigger your page cache, the better this will perform.

I probably wouldn't try to stream results from this CSV but would write either separate nodes with your results (which you could reconstitute later) or I would use something like neo4j-streams to publish messages with the total for each wallet, thereby creating a stream of results which you could separately manipulate however you chose.

0 Likes

(Benjamin Squire) #3

Have you tried apoc.periodic.iterate? It looks like you may be running into memory issues given the size of your graph. apoc.periodic.iterate([cypher to get Nodes you want to run in batch],[thing you want to do to each batch or each node in a batch], {parallel:true,iterateList:true,batchSize:100000}). Here is my shot at it:

Call apoc.periodic.iterate("Match (a:Address)","With (a) Match (a)<--(o:Output) where not (o)-[:UNLOCKED_BY]->() RETURN a.address as Address, round(sum(o.bitcoinValue)*100000000)/100000000 AS Balance" , {parallel:true,batchSize:100000,iterateList:true}) yield...

I started to post this and realized apoc.periodic.iterate doesn't yield the results back so unless you are willing to write the results back to the graph as some new node/property then you will not be able to achieve this with apoc.periodic.iterate.

1 Like