I'm trying without success to stream all nodes from a neo4j graphdb while calculating their path using nodejs and neo4j js driver. I'm using two queries, the first one fetch all nodes nodes and using the observer onNext then I try to execute the query to fetch the path for each node and push it to the buffer. The problem seems that the buffer closes on the onComplete of the first query and does not wait until the end of the code executed in the onNext.
Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF
Here are my services that streams the data into the buffer:
async fetchSiteMap(): Promise<Readable> {
const transaction = this.session.getSession().beginTransaction();
const buffer = new stream.Readable();
buffer._read = () => {};
await transaction.run( `
MATCH (r:Category) RETURN r.code AS code
`)
.subscribe( {
onNext: async node => {
const result = await transaction.run(`MATCH path = (:Category {code: $code})<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
RETURN nodes
ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1`, {code: node.get( 'code' )});
const records: Record[] = get(result, 'records');
if (isEmpty(records)) {
return [];
}
const newRecords = records.map(record => record.get('nodes'));
const path = compact(newRecords[0].map((segment: { properties: GraphProperty }, index: number) => {
if (index === 0) {
return;
}
return CategoryNodePath.fromGraph(<NodeGraphProperty>segment.properties);
}));
buffer.push(JSON.stringify(path));
},
onCompleted: async () => {
buffer.push(null)
transaction.commit();
this.session.close();
},
onError: () => {
buffer.push(null);
},
} );
return buffer;
}