Nodejs neo4j-driver Trying to read and return result set of over 4m records

We have a new server running dedicated enterprise Neo4j.

Recently imported data from MSSQL database producing around 100m nodes and equal number of relationships.

I have connected the neo4j-driver for nodejs fine and run many queries successfully.

However, I am at a point where I need to read/stream around 4 million records from a particular node label.

I have used both session and reactive session to try to do this, but the results just never appear, even if I leave it for hours. However, if I put a LIMIT clause on the query, I get results fine for anything up to 500,000. However, going above this there is just no response at all and also I can't see anything in the logs.

Here is the query:

cypher runtime=slotted 
MATCH (c:Campaign)
OPTIONAL MATCH (c)<-[:HAS]-(i:IndustrySector)
WITH c,i
OPTIONAL MATCH (c {campaignId: c.campaignId})-[sent:SENT_EMAIL]->(sp:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[opened:OPENED]-(op:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[clicked:CLICKED]-(cp:Person)
WITH c, i, COUNT(DISTINCT(sp)) AS totalSent, COUNT(DISTINCT(op)) AS totalOpened, COUNT(DISTINCT(cp)) AS totalClicked
RETURN c.campaignId AS campaignId, i.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totalSent, totalOpened, totalClicked

As you can see I have set the cypher runtime to use slotted. I have tried this without but it seems to send the entire results back in one batch, rather than streaming row by row.

Here are two versions I have tried, first is normal session. The second is using reactive session:

// Normal Session
session.run(query)
.subscribe({
onKeys: keys => {
},
onNext: record => {
},
onCompleted: () => {
},
onError: error => {
}});

// Reactive Session
rxSession.run(query)
.records()
.pipe(map(record => record.toObject()))
.subscribe({
...same as above
})

One other thing I tried was to use SKIP and LIMIT on the query so processing in batches of 10k records. This works fine until it reaches 520,000 records then just hangs again.

Would be very grateful if anyone is able to point me to where I am going wrong or a better way to achieve what I want. I have looked at the apoc library such as apoc.periodic.iterate however that is only applicable to performing write operations.

Thanks for any help.

You are using aggregation

WITH c, i, COUNT(DISTINCT(sp)) AS totalSent, COUNT(DISTINCT(op)) AS totalOpened, COUNT(DISTINCT(cp)) AS totalClicked

Because of this the response can be returned only after processing all the data. Most likely the query is not completed on the server side and is taking a long time to complete.

In the browser run ":queries" to see if there are any queries running. most likely you will see the query running.

Is the Campaign id distinct? If it is so, then query here is inefficient.

WITH c,i
OPTIONAL MATCH (c {campaignId: c.campaignId})-[sent:SENT_EMAIL]->(sp:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[opened:OPENED]-(op:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[clicked:CLICKED]-(cp:Person)
WITH c, i, COUNT(DISTINCT(sp)) AS totalSent, COUNT(DISTINCT(op)) AS totalOpened, COUNT(DISTINCT(cp)) AS totalClicked

Since you already have the Campaign node "c" I don't see why you are querying again like this

OPTIONAL MATCH (c {campaignId: c.campaignId})-[sent:SENT_EMAIL]->(sp:Person)

It could be

OPTIONAL MATCH (c )-[sent:SENT_EMAIL]->(sp:Person)

This would avoid index lookup again.

Ok thanks I will try this to see if it improves.

I was specifically including the distinct campaignId as I assumed (I see now wrongly) that the subqueries would require it to match against.

Ok that has improved the performance a fair bit.

Using PROFILE and LIMIT to 1000:

From:
14901409 total db hits in 4724 ms

To:
14820288 total db hits in 2718 ms

Almost 50% improvement alone.

I have decided to use the apoc.export.csv.query to see if I can stream it to a CSV file instead, then process that csv file later.

However, having added these improvements, the query is still hanging. Currently at: 447726ms with no output.

Are there other improvements I can do here? What I want is the COUNT of how many UNIQUE people was sent an email, how many unique opens, how many unique clicks, per campaign.

Thanks for your help btw.

Do you need to do optional match for the industry sector? Since you are looking at id, I'm assuming it should exist for all of them.

If so, can you try this query?

MATCH (c:Campaign)-[r: sent:SENT_EMAIL|OPENED|CLICKED]-(p)
WITH c, type(r) as type, count(distinct p) as count
WITH c, {type: type, count: count} as data
WITh c, collect(data) as totals
MATCH (c)<-[:HAS]-(i:IndustrySector)
RETURN c.campaignId AS campaignId, i.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totals

Once that works we can use CASE statement to get the response format as before.

1 Like

So actually, not all campaigns are linked to an Industry Sector. However, what I have decided is that I am only interested in campaigns that DO have an industry sector defined.

Also what I have done, is instead of using IndustrySector as a separate node, instead I have just added an 'industrySectorId' property to the campaign nodes.

However, first I tried using your query without considering this latest change:

Using PROFILE and LIMIT 1000:
119749022 total db hits in 45626 ms

Using the following query:

PROFILE MATCH (c:Campaign)-[r:SENT_EMAIL|OPENED|CLICKED]-(p)
WITH c, type(r) as type, count(distinct p) as count LIMIT 1000
WITH c, {type: type, count: count} as data
WITh c, collect(data) as totals
MATCH (c)<-[:HAS]-(i:IndustrySector)
RETURN c.campaignId AS campaignId, i.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totals

Changing the query to instead only include campaigns that have an industrySectorId defined as property:

108979056 total db hits in 43429 ms

Query:

PROFILE MATCH (c:Campaign)-[r:SENT_EMAIL|OPENED|CLICKED]-(p)
WHERE c.industrySectorId IS NOT NULL
WITH c, type(r) as type, count(distinct p) as count LIMIT 1000
WITH c, {type: type, count: count} as data
WITH c, collect(data) as totals
RETURN c.campaignId AS campaignId, c.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totals

If I try to run it without any LIMIT, it just hangs.

I forgot to mention, I have added an index to the industrySectorId property on Campaign nodes.

It seems you are returning a lot of data. You might be better off separating the query details into individual steps

First get the Campaing ID's in the first query.

MATCH (c:Campaign)
WHERE c.industrySectorId IS NOT NULL
RETURN id(c) as campaignId

In the response of that query execute the second query

MATCH (c)-[r:SENT_EMAIL|OPENED|CLICKED]-(p)
WHERE id(c) = $campaignId
WITH c, type(r) as type, count(distinct p) as count LIMIT 1000
WITH c, {type: type, count: count} as data
WITH c, collect(data) as totals
RETURN c.campaignId AS campaignId, c.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totals

You need to pass the campaign ID as parameter.

This will execute the queries individually so, server is not waiting before aggregation of all data.

Also, if you want you can use multiple threads on the client side to execute queries to use all the processing power available from server. Say you have 8 CPU machine running Neo4J DB, you can execute around 8 queries in parallel.

Also, what is your DB size and page cache size. It is possible that you are touching almost all the database and f your page cache is very small you will spending lot of time causing page faults, moving data between memory and disk.

1 Like

Ok thanks that is a great suggestion.

I'll probably try doing this in batches of around 10k campaigns, because it handles this size with no issue. Will also consider the parallel queries.

I did look at changing the page cache size in configuration, however in the config file it said that it is already optimised for the system. So I have left it as default. Have I misunderstood this?

DB size is showing around 72GB overall.

Thanks again, I think this solution will work.

What is the system memory available?

Also, when you are running these queries, take a look at the page faults. if they keep increasing then the system may need more page cache. Say this is occasional request, then you can leave it as it is.

Ideal scenario is you have page cache that is the size of DB, so that every thing is in memory. If that's not possible you need to have page cache that handle your most used query profiles without causing too many page faults.

Hi

some other things you can consider -

  1. implement :HasIndustrySector as a label on Campaign so you don't have to inspect the property and use the degree count store instead of the traversal. You don't need the p node, so don't access it. Your query would then reduce to:
//get total
MATCH (n:HasIndustrySector) RETURN n.campaignId, "total" AS metric, SIZE( (n)-[:SENT_EMAIL|OPENED|CLICKED]->() ) AS count
UNION
//get sents
MATCH (n:HasIndustrySector) RETURN n.campaignId, "sent" AS metric, SIZE( (n)-[:SENT_EMAIL]->() ) AS count
UNION
// get opens
MATCH (n:HasIndustrySector) RETURN n.campaignId, "opened" AS metric, SIZE( (n)-[:OPENED]->() ) AS count
...and so on

you can also test apoc.node.degree.out(node, relationshipName) to see if it is faster than the native degree count store
https://neo4j.com/docs/labs/apoc/current/graph-querying/node-querying/

  1. wrap the query in parallel cypher execution statement, since you are just inspecting nodes
    https://neo4j.com/docs/labs/apoc/current/cypher-execution/parallel/

  2. if this will be a common query, create aggregation nodes to store these counts on load

you should see a dramatic improvement from #1 and if that's not enough try #2

1 Like

Ok also looks like a promising suggestion, however I can't simply do SIZE() on the relationships since a single campaign may have sent several emails to a single Person, and there may be several click/open events between the Person and Campaign. So I included the Person node to get the distinct count. Perhaps there is a better way though.

When I get a chance I will try these various methods and see what works best. Thanks

Gotcha - so in that case you can also leverage the relationship to stash data, for example storing on the SENT relationship a list of emailIds that got sent to that person by the campaign or a list of emailIds opened by that person.

Then your campaign level activity would be distinct already - so you can then leverage the degree count store - and if you needed the details you could go into the transactional lists on the relationship.

Alternatively you could model it both ways: keep your existing raw transactional relationships and add a new SENT_SUMMARY relationship that does the above.

Fortunately the graph model is flexible enough that you can optimize it easily to accomodate your query workloads.

1 Like

I believe the second method you mention might be the route for this case. Reason being, is the client wants to store device details related to the event types (i.e. Opened on mobile device at IP address, Clicked on Desktop at IP address). So don't think I can't aggregate on a single event relationship.

I think what I'll do is amalgamate these suggestions, there's certainly enough for me to play around with. Much appreciated. Not sure what to accept as the solution though.. will come back after I have implemented something.

@PeteM If you like TypeScript you may wish to check out Drivine, which can stream millions of records without back-pressure. (Use Node.js pipes).

The details are on the the website:

https://drivine.org

But in summary what's required is:

return this.persistenceManager.openCursor(
        new CursorSpecification<Route>()
            .withStatement(`SOME CYPHER STATEMENT`)
            .bind([param1, param2])
            .batchSize(1000) // default is 100
            .transform(Route) // Turn result into a type if you want, otherwise 'any' type. 
    );

And then cursors implement both AsyncIterable use with for await of syntax and it will fetch in batches, or it can be piped into another stream. (Read the docs for when to use which).

Regarding Streaming Large Datasets:

We raised this issue: https://github.com/neo4j/neo4j-javascript-driver/issues/542

However in the meantime there is a workaround. Neo4j driver uses RxJS which pushes records. For streaming large datasets into a slower sync without memory pressure we want a pull style API, hence the existence of RxJS companion library - IxJS.

Meantime native Nodejs streams, which Drivine uses, are duplex, and can be used to compose RxJS or IxJS streams.

(I skimmed the issue without reading all details, btw, so apologies if this answer is not fully relevant)

Thanks for the suggestion. After I adapted the queries from the recommendations and increased the heap size the problem is resolved.

1 Like

Yes, I saw that you got it solved. I thought that I would share regarding Drivine, in case it is of interest in any case.

Drivine also uses SKIP and LIMIT at the present time, however once the linked issue above is implemented, the implementation could be more like the Postgres/Agens version, provided the query results fit within the HEAP.