Mass importing/uploading of data

import

(Massung) #1

I have ~40 GB worth of genetics CSV data in HDFS that I'm attempting to load, which will (roughly) end up creating ~1.2 B nodes and 3x as many relationships.

I'm starting with a - relatively - empty graph and I've pre-created the indexes for the nodes being created and I have a simple program that collects all the part files, then iterates over them running the same LOAD CSV Cypher query on each.

When this program begins running, it is averaging ~40K nodes per second being created. Already - at that rate - it would take ~8 hours to load everything, which is very acceptable. But, after 24 hours, it had only uploaded ~200 M nodes, the rate had declined by an order of magnitude, and after 36 hours it was continuing to decline in performance. Note: this is using Neo4j Community v3.5.1.

I'm basically looking for any - and all - thoughts on how to best go about loading this data into the graph (note: query code is at the end of the post).

Random thoughts that I've had, but before attempting to test each one, I figured others could comment, share experiences, and give advice:

  • Give the machine more CPU + RAM. It's currently a 4 CPU 16 GB RAM machine, which is hardly anything grand. But, while running the query, htop showed almost no CPU usage, and seeing memory usage isn't really possible since Java just steals what it can out the gate (it was using 13 GB, though). I certainly don't mind upping the size of the machine, but before doubling my VM costs, I'd like to be sure I will get what I need for the investment. ;)

  • The indexes are potentially a massive time sink (they are in MySQL when uploading mass data). Is it possible to defer adding data to the indexes until after everything is done? I really would like to avoid deleting and re-creating them.

  • Loading the data from HDFS (via HTTP) is potentially slow, but I don't know if it's that slow. If I downloaded the files locally and then ran the query, would it be faster (again, likely not a long-term solution, though).

  • It's a lot of CSV files (~25,000). Perhaps concatenating them and loading fewer files with far more rows in them might improve performance?

  • The query contains a FOREACH(i IN (CASE test WHEN true THEN [1] ELSE [] END) | ...) clause in it. How slow is this? I could rewrite some of the Spark code to partition on test and run two separate queries.

The query pretty much looks like this (simplified, but nothing that would be performance issue is removed):

using periodic commit
load csv with headers from 'http://...csv' as r
fieldterminator '\t'

// lookup a node by ID and a phenotype referenced by the row
match (q:Analysis) where ID(q) = $id
match (p:Phenotype {name: r.phenotype})

// create the variant if it doesn't already exist
merge (v:Variant {name: r.varId})
on create set v.chromosome = r.chromosome, ...

// create the data node for this row
create (n:MetaAnalysis {pValue: toFloat(r.pValue), ...})

// create the relationships
create (q)-[:PRODUCED]->(n)
create (p)-[:HAS_META_ANALYSIS]->(n)
create (v)-[:HAS_META_ANALYSIS]->(n)

// if this result was flagged as such, add another relationship
foreach(i in (case toBoolean(r.flag) when true then [1] else [] end) |
  create (v)-[:FLAGGED_META_ANALYSIS]->(n)
)

I should note that there is a unique constraint on :Phenotype(name) and :Variant(name), so those look-ups should be quite fast. When I'm referring to indexes that might be causing issues, they are on the properties of :MetaAnalysis (e.g. pValue and beta).

One criteria I'm hoping to maintain is that the solution can't be a one-shot load. While I understand there may be some things I can do now with an empty graph, shortly after the data is loaded Spark will run again with some new, additional data and I will need to update a major portion of the graph (~50M nodes) using new CSV files. That follow-up load should be equally as performant as the initial load.

Again, any links, experiences, thoughts and comments much appreciated! And, if there is a similar topic that I missed somewhere, my apologies. I saw plenty of "load csv" posts, but none that really hit the scale I'm looking at.


(Andrew Bowman) #2

You mentioned you have an index on :Variant(varId), but your query is doing a MERGE on :Variant(name): merge (v:Variant {name: r.varId})

Unless there was a typo here, the index may not be being used, which would explain the slowdown as more :Variant nodes are added to the graph (each successive MERGE must scan every :Variant node...that gets increasingly expensive).

You can EXPLAIN the query to see the query plan that will be used (without running the query), which you can use to double-check that index lookups are going to be used instead of label scans.

Is it possible to defer adding data to the indexes until after everything is done?

Not recommended. MERGEs are like MATCHes followed by CREATEs (if the match failed), so indexes are necessary for MERGEs and MATCHes to be quick.

Separately, this looks like a typo: create (n:MetaAnalysis {n.pValue: toFloat(r.pValue), ...})
The property key shouldn't have a dot in it, I'm guessing this was meant to be pValue: toFloat (...


(Massung) #3

You mentioned you have an index on :Variant(varId) , but your query is doing a MERGE on :Variant(name)

Thanks for the catch, but the index is on :Variant(name). It was my mistake transcribing the index to the post.

this looks like a typo: create (n:MetaAnalysis {n.pValue: ...

Yes, it was. The actual query is correct.

I'll update the post.


(Andrew Bowman) #4

Sounds good. The EXPLAIN of the query plan should help you double-check you're using index lookup where needed.

As for the foreach trick for conditional execution, if this is the last part of the query you can replace that with a simple WHERE clause to filter for the specific case then do the CREATE.