Help me merge 170M relationships with LOAD CSV

Please help me!

I am working on a project to compare/benchmark neo4j with postgres using a dataset of 176M research citations from Semantic Scholar

I definitely should have asked for help sooner, but I like to try to figure things out for myself as much as I can...

I have a script that converts the original JSON data into CSV format in normalized tables for easy import into both neo4j and postgres. Each JSON file has 1million records. I have tables for Papers (id, title, DOI, year), Authors (id, name), paperAuthors(author_id, paper_id), outCitations (id, outcit_id) and inCitations (id, incit_id).

Importing to postgres took an average of about 5s/1M rows if I turned off all the indexes. Adding indexes and deleting duplicates at the end took about 9hrs for the full database (I later learned that I could speed this up by filtering and creating a new table, rather than deleting...)

However, the neo4j import has been giving me trouble from the start...

I first tried the command-line tool "neo4j-admin import" according to this guide:

I started on an AWS EC2 m5.large instance (8GB RAM), but quickly ran out of memory. I could not find any guidelines about how much RAM was needed, and I did not like the all-or-nothing results with that tool, so I rewrote everything to use Cypher's "LOAD CSV" instead (that way I could at least work incrementally if I ran out of memory along the way).

I was able to get through 50M records this way on an r5.xlarge (32GB RAM), though it ran out of memory building the relations for the last million records. This took ~1.6 hours to build the nodes (Papers, Author), and ~28 hours to build the relations (CITES, IS_CITED_BY, HAS_AUTHOR).

However, I have been running an r5.2xlarge (64GB RAM) all week and it is still not finished... All of the nodes (papers, authors) are loaded, but I am only up to 87M of 176M relations. The process time has been steadily increasing, and it now around 3hrs per million records.

My original query looks like this:

LOAD CSV WITH HEADERS 
FROM "data/csv/s2-corpus-090-cites.csv" AS row
FIELDTERMINATOR "|"
MATCH (p1:Paper {id:row.id}), (p2:Paper {id:row.outcit_id})
MERGE (p1)-[:CITES]->(p2);

I paused the process (kill -19) to troubleshoot, trying a single query:

match (p:Paper {id:"892cbd4fe56bd56cbdb69810631648e6f8d1e407"}),
  (q:Paper {id:"24c6fc7aa8968d0a6a7a7b8d3bb002c53a29df13"})
return p,q;
1 row available after 14 ms, consumed after another 1 ms

explain match (p:Paper {id:"892cbd4fe56bd56cbdb69810631648e6f8d1e407"}),
  (q:Paper {id:"24c6fc7aa8968d0a6a7a7b8d3bb002c53a29df13"}) return p,q;
+------------------------+----------------+-------------+------------+------------+
| Operator               | Estimated Rows | Identifiers | Ordered by | Other      |
+------------------------+----------------+-------------+------------+------------+
| +ProduceResults        |              1 | p, q        | p.id ASC   |            |
| |                      +----------------+-------------+------------+------------+
| +CartesianProduct      |              1 | p, q        | p.id ASC   |            |
| |\                     +----------------+-------------+------------+------------+
| | +NodeUniqueIndexSeek |              1 | q           | q.id ASC   | :Paper(id) |
| |                      +----------------+-------------+------------+------------+
| +NodeUniqueIndexSeek   |              1 | p           | p.id ASC   | :Paper(id) |
+------------------------+----------------+-------------+------------+------------+

14ms * 1M rows = 14000s ~ 4 hours, so that may explain the holdup...

I read that splitting the matches can be faster:

match (p:Paper {id:"892cbd4fe56bd56cbdb69810631648e6f8d1e407"})
match (q:Paper {id:"24c6fc7aa8968d0a6a7a7b8d3bb002c53a29df13"})
return p,q;

However, it did not seem to make any difference doing it this way. EXPLAIN looks the same...
(it returned in 0ms, but I am pretty sure that was due to caching, since repeating the original query also returned in 0ms when I did it again):

I tried "explain" on my main import query too, though I am not sure how to interpret it:

EXPLAIN LOAD CSV WITH HEADERS
FROM "data/csv/s2-corpus-090-cites.csv" AS row
FIELDTERMINATOR "|"
MATCH (p1:Paper {id:row.id})
MATCH (p2:Paper {id:row.outcit_id})
MERGE (p1)-[:CITES]->(p2);

+-----------------------------------+----------------+------------------------+------------------------------+
| Operator                          | Estimated Rows | Identifiers            | Other                        |
+-----------------------------------+----------------+------------------------+------------------------------+
| +ProduceResults                   |              1 | anon[204], p1, p2, row |                              |
| |                                 +----------------+------------------------+------------------------------+
| +EmptyResult                      |              1 | anon[204], p1, p2, row |                              |
| |                                 +----------------+------------------------+------------------------------+
| +Apply                            |              1 | anon[204], p1, p2, row |                              |
| |\                                +----------------+------------------------+------------------------------+
| | +AntiConditionalApply           |              1 | anon[204], p1, p2      |                              |
| | |\                              +----------------+------------------------+------------------------------+
| | | +MergeCreateRelationship      |              1 | anon[204], p1, p2      |                              |
| | | |                             +----------------+------------------------+------------------------------+
| | | +Argument                     |              1 | p1, p2                 |                              |
| | |                               +----------------+------------------------+------------------------------+
| | +AntiConditionalApply           |              1 | anon[204], p1, p2      |                              |
| | |\                              +----------------+------------------------+------------------------------+
| | | +Optional                     |              1 | anon[204], p1, p2      |                              |
| | | |                             +----------------+------------------------+------------------------------+
| | | +ActiveRead                   |              0 | anon[204], p1, p2      |                              |
| | | |                             +----------------+------------------------+------------------------------+
| | | +Expand(Into)                 |              0 | anon[204], p1, p2      | (p1)-[anon[204]:CITES]->(p2) |
| | | |                             +----------------+------------------------+------------------------------+
| | | +LockNodes                    |              1 | p1, p2                 | p1, p2                       |
| | | |                             +----------------+------------------------+------------------------------+
| | | +Argument                     |              1 | p1, p2                 |                              |
| | |                               +----------------+------------------------+------------------------------+
| | +Optional                       |              1 | anon[204], p1, p2      |                              |
| | |                               +----------------+------------------------+------------------------------+
| | +ActiveRead                     |              0 | anon[204], p1, p2      |                              |
| | |                               +----------------+------------------------+------------------------------+
| | +Expand(Into)                   |              0 | anon[204], p1, p2      | (p1)-[anon[204]:CITES]->(p2) |
| | |                               +----------------+------------------------+------------------------------+
| | +Argument                       |              1 | p1, p2                 |                              |
| |                                 +----------------+------------------------+------------------------------+
| +Apply                            |              1 | p1, p2, row            |                              |
| |\                                +----------------+------------------------+------------------------------+
| | +ValueHashJoin                  |              1 | p1, p2, row            | p2.id = row.outcit_id        |
| | |\                              +----------------+------------------------+------------------------------+
| | | +NodeUniqueIndexSeek(Locking) |              1 | p1, row                | :Paper(id)                   |
| | |                               +----------------+------------------------+------------------------------+
| | +NodeUniqueIndexSeek(Locking)   |              1 | p2, row                | :Paper(id)                   |
| |                                 +----------------+------------------------+------------------------------+
| +LoadCSV                          |              1 | row                    |                              |
+-----------------------------------+----------------+------------------------+------------------------------+

If it makes any difference, I have autocommit turned off... Maybe I should be turning that on? (though I have 64GB RAM, and I haven't crashed yet)

I have adjusted neo4j.conf to account for my available RAM as follows:

dbms.memory.heap.initial_size=23900m
dbms.memory.heap.max_size=23900m
dbms.memory.pagecache.size=27g

Ubuntu (18.04) on AWS has the swapfile disabled, so it should not be a virtual memory issue...

Any ideas??

You need to batch the transaction, simplest way is to prefix your LOAD CSC with USING PERIODIC COMMIT LOAD CSV ....
Another more advanced option is to use apoc.periodic.iterate - but I think in your case the first options will be good enough.

If you don't do batching all the data ends up in one huge transaction that exhaust all your memory.

Thank @stefan.armbruster!

I tried that today (using periodic commit 10000), which gave a pretty good improvement in processing time (down to ~2.5 hrs from ~3hrs per million records), but it is still too slow to finish in time for my project... (up to 94M, still 82M to go...)

Still open to suggestions!

First thing is play with the batchsize, maybe try USING PERIODIC COMMIT 1000, USING PERIODIC COMMIT 10000, ... the value needs to be aligned your heap size.

You could try to import using apoc.periodic.iterate and parallel:true. However that might cause locking contention. Parallel imports are a good idea if you can provide the data in an order that doesn't cause lock contention. E.g. if you have splittet the relationship import into multiple files, each covering a distinct region in your graph without (or only with little) overlaps.

Another way to improve speed - but this is a rather advanced approch, I have warned you: write a stored procedure for doing the import. That procedure should cache index lookups in a simple java map structure to prevent duplicated index operations.

What AMI are you running? Double check if you have ENA enabled, see Enable enhanced networking with the Elastic Network Adapter (ENA) on Linux instances - Amazon Elastic Compute Cloud for details

Do you have indices created for Paper.id?

If you create the indices ahead of the LOAD you should see a good improvement in performance.

If you may have already done this, then nevermind.

Thanks @stefan.armbruster, @yyyguy!

With regards to periodic commit, is there any guidelines for using periodic commit according to available RAM?

I setup on a very large AWS instance (r5.2xlarge, 64GB RAM, running Ubuntu 18.04) to make sure I did not run out of memory, so I think larger commits are probably not a problem...

Parallel imports may be a problem, because the rows in the relationship files were created by parsing json files containing records for individual papers, so consecutive rows are likely to refer to the same paper.

regarding indexes, I am pretty sure they are turned on... I used p2neo for this, at the start of my import script:

graph.schema.create_uniqueness_constraint(label,property) #<-- "Paper", "id" and "Author", "id"
graph.schema.create_index(label,property) #<-- "Paper", "id" and "Author", "id"

Also, I believe the "EXPLAIN" query above shows that an index lookup is occurring
NodeUniqueIndexSeek | 1 | q | q.id ASC | :Paper(id) |

The "id"s came from semantic scholar, and they are 40 digit hexadecimal numbers, stored as strings, like:
"88ac1990157843539f2a98262ba86bdd26a271af"

I updated my neo4j-admin import script again, and was able to import 10M records in around 13 min on an r5.xlarge (32GB RAM... didn't want to take any chances):

2019-10-20 00:12:09,783 - subprocess: 'IMPORT DONE in 13m 24s 380ms.'
2019-10-20 00:12:09,783 - subprocess: 'Imported:'
2019-10-20 00:12:09,783 - subprocess: '24038139 nodes'
2019-10-20 00:12:09,783 - subprocess: '32592355 relationships'
2019-10-20 00:12:09,783 - subprocess: '64240293 properties'
2019-10-20 00:12:09,783 - subprocess: 'Peak memory usage: 1.83 GB'

Since I got this working, I may just abort my current import (currently at 98M) and try again using this technique... So long as I have enough RAM it should be okay, and it seems to be much faster.
The neo4j-admin import tool is very picky about file permissions though... I basically had to run:

sudo chmod 777 /var/lib/neo4j/data/databases
sudo chmod 666 /var/log/neo4j/debug.log

to get it to work... I am using python's subprocess.Popen to run the process, and I even tried playing with the user id using pwd and os.set.uid, but that did not seem to solve it:

rec = pwd.getpwnam('neo4j')
os.setgid(rec.pw_gid)
os.setuid(rec.pw_uid)

I guess it is supposed to be user='neo4j' and group='adm' (though by default 'neo4j' belongs to group 'neo4j', and I could not figure out how to chance the group to 'adm')

Another idea suitable for initial imports is using the batch importer, see Import - Operations Manual. This is way faster than any other import method, but just suitable for initial data loads.

Hi @stefan.armbruster
I tried that initially and my database crashed for lack of memory. I then made the mistake of rewriting my import script to use LOAD CSV (cypher), because it could at least save partial progress. Unfortunately, when I did that I still ran out of memory once I loaded a few million records.

The actual solution was to just use a larger instance. I finally ran it again last night with the neo4j-admin import tool on my r5.2xlarge (64GB) instance, and it finished in about 3.69 hours!
peak memory usage was 16.35 GB, so I probably could have gotten away with an r5.xlarge (32GB).
Here are the relevant bits from my log file:

2019-10-20 08:06:19,733 - subprocess: 'Available resources:'
2019-10-20 08:06:19,733 - subprocess: 'Total machine memory: 62.13 GB'
2019-10-20 08:06:19,733 - subprocess: 'Free machine memory: 421.90 MB'
2019-10-20 08:06:19,733 - subprocess: 'Max heap memory : 22.37 GB'
2019-10-20 08:06:19,733 - subprocess: 'Processors: 8'
2019-10-20 08:06:19,733 - subprocess: 'Configured max memory: 35.79 GB'
2019-10-20 08:06:19,733 - subprocess: 'High-IO: false'
2019-10-20 08:06:19,733 - subprocess: ''
2019-10-20 08:07:29,717 - subprocess: 'WARNING: heap size 22.37 GB is unnecessarily large for completing this import.'
2019-10-20 08:07:29,775 - subprocess: 'The abundant heap memory will leave less memory for off-heap importer caches. Suggested heap size is 1.08 GBImport starting 2019-10-20 08:07:29.770+0000'
2019-10-20 08:07:29,776 - subprocess: 'Estimated number of nodes: 647.51 M'
2019-10-20 08:07:29,776 - subprocess: 'Estimated number of node properties: 1.56 G'
2019-10-20 08:07:29,776 - subprocess: 'Estimated number of relationships: 1.63 G'
2019-10-20 08:07:29,776 - subprocess: 'Estimated number of relationship properties: 0.00'
2019-10-20 08:07:29,776 - subprocess: 'Estimated disk space usage: 130.36 GB'
2019-10-20 08:07:29,776 - subprocess: 'Estimated required memory usage: 8.96 GB'

2019-10-20 08:07:29,807 - subprocess: '(1/4) Node import 2019-10-20 08:07:29.806+0000'
2019-10-20 08:07:29,807 - subprocess: 'Estimated number of nodes: 647.51 M'
2019-10-20 08:07:29,807 - subprocess: 'Estimated disk space usage: 78.64 GB'
2019-10-20 08:07:29,807 - subprocess: 'Estimated required memory usage: 8.96 GB'

2019-10-20 08:55:02,927 - subprocess: '(2/4) Relationship import 2019-10-20 08:55:02.927+0000'
2019-10-20 08:55:02,929 - subprocess: 'Estimated number of relationships: 1.63 G'
2019-10-20 08:55:02,929 - subprocess: 'Estimated disk space usage: 51.72 GB'
2019-10-20 08:55:02,929 - subprocess: 'Estimated required memory usage: 16.35 GB'

2019-10-20 10:17:43,556 - subprocess: '(3/4) Relationship linking 2019-10-20 10:17:43.555+0000'
2019-10-20 10:17:43,556 - subprocess: 'Estimated required memory usage: 3.41 GB'

2019-10-20 10:39:48,994 - subprocess: '(4/4) Post processing 2019-10-20 10:39:48.994+0000'
2019-10-20 10:39:48,994 - subprocess: 'Estimated required memory usage: 1020.01 MB'

2019-10-20 10:47:49,791 - subprocess: 'IMPORT DONE in 2h 41m 29s 405ms.'
2019-10-20 10:47:49,791 - subprocess: 'Imported:'
2019-10-20 10:47:49,791 - subprocess: '235200946 nodes'
2019-10-20 10:47:49,791 - subprocess: '1614730156 relationships'
2019-10-20 10:47:49,791 - subprocess: '729701634 properties'
2019-10-20 10:47:49,791 - subprocess: 'Peak memory usage: 16.35 GB'
2019-10-20 10:47:50,957 - subprocess: 'There were bad entries which were skipped and logged into /home/ubuntu/postgres-vs-neo4j/logs/neo4j.report'
2019-10-20 10:47:51,535 - Processing time: 9695.747298727976 s

So now I just have to run my benchmarking tests and I will be finished!
The idea is to come up with queries which really take advantage of the Neo4j graph architecture to show where it has an advantage over postgres.

Here some examples I have so far:

  1. Find the top ten papers with most citations
MATCH (:Paper)-[r:CITES]->(p:Paper)
RETURN p, COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;

^-- This kind of query Postgres usually wins (by a small margin, after warmup), since my many-to-many joining tables ("cites", "is_cited_by", "has_author") are all indexed (b-tree), so the lookup time is very fast.

  1. Find the top ten papers with most citations of citations of citations...
MATCH (:Paper)-[r:CITES *1..]->(p:Paper)
RETURN p, COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;

^-- This kind of structure I think is where Neo4j will probably win out... I am reading up on recursive CTE's for Neo4j to accomplish a similar query, but I expect the performance to be pretty bad...

  1. Find the top ten authors with the most papers
MATCH (:Paper)-[r:HAS_AUTHOR]->(a:Author)
RETURN a,COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;
  1. Find the top ten authors whose papers have the most direct citations
MATCH (:Paper)-[r:CITES]-(:Paper)-[HAS_AUTHOR]-(a:Author)
RETURN a, COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;
  1. Find the top ten authors who have had the most influence on the literature
    (largest number citations of citations of citations...)
MATCH (:Paper)-[r:CITES *1..]->(:Paper)-[:HAS_AUTHOR]->(a:Author)
RETURN a, COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;

These are all aggregations, so I am not sure how that will affect the performance of Neo4j...
I could also maybe try ranking the most influential papers by a single author. I haven't tested it yet, but I think this is the correct syntax:

MATCH (:Paper)-[r:CITES *1..]->(p:Paper)-[:HAS_AUTHOR]->(a:Author)
WHERE a.name IS "Giacomo Rizzolatti"
RETURN p.title, COUNT(r)
ORDER BY COUNT(r) DESC
LIMIT 10;

Any other suggestions are welcome!

1 Like

What immediately comes to my mind is using the graph algo library to calculate each author's pagerank to score their relevance. There's a online training course available, see Announcing Data Science with Neo4j And Applied Graph Algorithms Online Training Courses

If you want to use solely Cypher I'd use queries with variable path length (just like you did). Also the shortestPath function could be interesting to show case.

1 Like

Looks like you're well on your way to getting this all worked out. Using just plain cypher and not any of the algo procedures, doing simple aggregations, I found better performance writing aggregation queries like this:

MATCH (p:Paper)
RETURN p, SIZE( (:Paper)-[:CITES]->(p) ) AS cite_count
ORDER BY cite_count DESC
LIMIT 10;

I've found this to generate a better explain plan and perform better.

Hi @stefan.armbruster @mike_r_black!
Thanks for all of your suggestions!

I think I am going to open up another topic about query optimization, just to keep things better organized, if that is okay (I should have done that first I guess). I do have a lot of questions about that too, so I would appreciate if you could take a look.

By the way, I finally got around to opening up my HTTP port for my dash app, so you can check out my project if you like! I am mostly done with it, though some of the tests have not completed yet so some of the graphs will not actually show any data yet. I have it running on a free-tier t2.micro instance, so may leave it up there for a while if anyone is interested. (Probably cannot handle much traffic, but it shouldn't need to)
http://data-atsu.me/

You can see the accompanying presentation for a bit of explanation too (still making some edits to this, so it may change over the next few days)
http://data-atsu.me/slides