We have a new server running dedicated enterprise Neo4j.
Recently imported data from MSSQL database producing around 100m nodes and equal number of relationships.
I have connected the neo4j-driver for nodejs fine and run many queries successfully.
However, I am at a point where I need to read/stream around 4 million records from a particular node label.
I have used both session and reactive session to try to do this, but the results just never appear, even if I leave it for hours. However, if I put a LIMIT clause on the query, I get results fine for anything up to 500,000. However, going above this there is just no response at all and also I can't see anything in the logs.
Here is the query:
cypher runtime=slotted
MATCH (c:Campaign)
OPTIONAL MATCH (c)<-[:HAS]-(i:IndustrySector)
WITH c,i
OPTIONAL MATCH (c {campaignId: c.campaignId})-[sent:SENT_EMAIL]->(sp:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[opened:OPENED]-(op:Person)
OPTIONAL MATCH (c {campaignId: c.campaignId})<-[clicked:CLICKED]-(cp:Person)
WITH c, i, COUNT(DISTINCT(sp)) AS totalSent, COUNT(DISTINCT(op)) AS totalOpened, COUNT(DISTINCT(cp)) AS totalClicked
RETURN c.campaignId AS campaignId, i.industrySectorId AS industrySectorId, c.senddate AS sendDate, c.subjectLine AS subject, c.preHeader AS preHeader, totalSent, totalOpened, totalClicked
As you can see I have set the cypher runtime to use slotted. I have tried this without but it seems to send the entire results back in one batch, rather than streaming row by row.
Here are two versions I have tried, first is normal session. The second is using reactive session:
// Normal Session
session.run(query)
.subscribe({
onKeys: keys => {
},
onNext: record => {
},
onCompleted: () => {
},
onError: error => {
}});
// Reactive Session
rxSession.run(query)
.records()
.pipe(map(record => record.toObject()))
.subscribe({
...same as above
})
One other thing I tried was to use SKIP and LIMIT on the query so processing in batches of 10k records. This works fine until it reaches 520,000 records then just hangs again.
Would be very grateful if anyone is able to point me to where I am going wrong or a better way to achieve what I want. I have looked at the apoc library such as apoc.periodic.iterate however that is only applicable to performing write operations.
Thanks for any help.