cancel
Showing results for 
Search instead for 
Did you mean: 

Join the community at Nodes 2022, our free virtual event on November 16 - 17.

Stream neo4j query result in same transaction

mehdikwa
Node Link

I'm trying without success to stream all nodes from a neo4j graphdb while calculating their path using nodejs and neo4j js driver. I'm using two queries, the first one fetch all nodes nodes and using the observer onNext then I try to execute the query to fetch the path for each node and push it to the buffer. The problem seems that the buffer closes on the onComplete of the first query and does not wait until the end of the code executed in the onNext.

Error [ERR_STREAM_PUSH_AFTER_EOF]: stream.push() after EOF

Here are my services that streams the data into the buffer:

async fetchSiteMap(): Promise<Readable> {
const transaction = this.session.getSession().beginTransaction();
const buffer = new stream.Readable();
buffer._read = () => {};
await transaction.run( `
  MATCH (r:Category) RETURN r.code AS code
  `)
  .subscribe( {
    onNext: async node => {
      const result = await transaction.run(`MATCH path = (:Category {code: $code})<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
          WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
          RETURN nodes
          ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1`, {code: node.get( 'code' )});

      const records: Record[] = get(result, 'records');
      if (isEmpty(records)) {
        return [];
      }

      const newRecords = records.map(record => record.get('nodes'));

      const path = compact(newRecords[0].map((segment: { properties: GraphProperty }, index: number) => {
        if (index === 0) {
          return;
        }

        return CategoryNodePath.fromGraph(<NodeGraphProperty>segment.properties);
      }));

      buffer.push(JSON.stringify(path));
    },
    onCompleted: async () => {
      buffer.push(null)
      transaction.commit();
      this.session.close();
    },
    onError: () => {
      buffer.push(null);
    },
  } );

return buffer;

}

2 REPLIES 2

adam_cowley
Neo4j
Neo4j

What is the stack trace for that error? Does it come from the neo4j driver? You may be closing the transaction when the inner stream has completed for the paths rather than the outer stream of categories.

One thing I've spotted is that you could get away with doing this all in a single Cypher query rather than one to get the code and another to find the paths. Because you're running the query for all categories, you could just stream them directly out of the first query.

transaction.run(`
    MATCH path = (:Category)<-[hasChild:HAS_CHILD*]-(rootNode:RootNode)
          WITH rootNode, REVERSE(nodes(path)) as nodes, COLLECT(path) as paths, reduce(sum = 0, rel in hasChild | sum + rel.position) as weight
          RETURN nodes
          ORDER BY rootNode.position, SIZE(paths) asc, weight asc LIMIT 1
`)
.subscribe({ /* Same code as you have now */ })

It doesn't solve the problem you've reported but it could be a workaround.

mehdikwa
Node Link

Thanks Adam, the error comes from the controller of the API when I call the resource that streams the http response. As you mentioned, I would have prefered indeed to run it on a single query, but I can't see how you would search paths for each node if you don't get the node code at first. Your query won't work because there is a limit to 1 to fetch one single path. Therefore not specifying a node code will just retrieve the first node path found.