Return query result as IAsyncEnumerable

Hi! I'm new to Neo4j and graph databases in general. I'm working on an dotnet project that will maybe integrate graph processing in the near future and I'm evaluating my options at the moment.
My scenario: I need to send a lot of nodes to the browser (think on the order of 20 million entries) where they will be rendered on a map (this will be a whole other challenge ^^)
As I don't want to buffer all entries and send them in one go, I stream them to the browser - Dotnet supports this beautifully via the IAsyncEnumerable abstraction. So the streaming communication between endpoint and browser works. What I'm unsure about is how to grab the data from neo4j in the most efficient manner. The first test just gives me the connections between the entries of a very small test data set.

Starting with the most naive approach (The following methods are located in a service):

await using var session = driver.AsyncSession();
var cursor = await session.ExecuteReadAsync(tx => tx.RunAsync(_query));
await foreach (var record in cursor.WithCancellation(cancellationToken))
{
    yield return new Connection
    (
        record["SourceNodeId"].As<string>(),
        record["TargetNodeId"].As<string>()
    );
}

results in an exception:

[Neo4j.Driver.ResultConsumedException: Cannot access records on this result any more as the result has already been consumed or the query runner where the result is created has already been closed.
   at Neo4j.Driver.Internal.Result.ResultCursor.AssertNotConsumed()
   at Neo4j.Driver.Internal.Result.ResultCursor.MoveNextAsync()

Ok, makes sense - the session is disposed as soon as the service method returns.

The fetching has to happen in a separate Task so that I'm not restricted by the lifetime of the query runner - sounds like a job that calls for System.Threading.Channels. With a little help by Claude I came up with:

// Create a channel to buffer results
var channel = Channel.CreateUnbounded<Connection>();

// Start a background task to fetch and buffer all results
_ = Task.Run(async () =>
{
    try
    {
        await using var session = driver.AsyncSession();
        await session.ExecuteReadAsync(async tx =>
        {
            var cursor = await tx.RunAsync(_query);

            // Process all records while transaction is open
            await foreach (var record in cursor.WithCancellation(cancellationToken))
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                await channel.Writer.WriteAsync(new Connection
                (
                    record["SourceNodeId"].As<string>(),
                    record["TargetNodeId"].As<string>()
                ), cancellationToken);
            }

            return true;
        });

        // Signal completion
        channel.Writer.Complete();
    }
    catch (Exception ex)
    {
        channel.Writer.Complete(ex);
    }
}, cancellationToken);

// Return the buffered results as they become available
await foreach (var connection in channel.Reader.ReadAllAsync(cancellationToken))
{
    // await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    yield return connection;
}

and it works! The results arrive one by one in the browser, nice!

Now to my question: Is there an easier way to accomplish this? I don't mind using Channels, but I'm curious if there is an API that is better suited for the job.

Thanks!

Can you not find a way to "paginate" your query and bring chunks as needed?

You risk wasting a lot of resources on data that never makes it to the end or is used (considering a map overlaying 20M items vs humans managing 7 items at a time?)

I don't see a way to do pagination, but I'll implement an optimization where I will only load the entities that are in the current viewport.

But I'll solve the client problems later, atm I'm focusing on optimizing the data fetching from Neo4j.

I've just tested this code and it worked fine:

  [HttpGet("async")]
    public async IAsyncEnumerable<MyModel> Async()
    {
        var query = "MATCH (N) RETURN N.name AS name, N.uuid AS uuid";

        var session = _driver.AsyncSession();
        var cursor = await session.RunAsync(query);

        while (await cursor.FetchAsync())
        {
            yield return new MyModel
            {
                Id = cursor.Current["uuid"].As<string>(),
                Name = cursor.Current["name"].As<string>()
            };
        }
    }
1 Like

(so you can just put the cancellation tokens where you want ...) but the code looks clean?

1 Like

Thanks! Yes, looping over the results directly in the controller/endpoint works, what I wanted to achieve was moving the query logic into a service class. This would enable me to quickly switch the implementations or isolate the graph db during testing.

Hi Johanndev

In general you have the right idea. We generally recommend using either the fluent style ExecutableQuery found on the Driver object, or the transaction functions ExecuteRead/Write as you have done.

However in your specific use case there is a caveat that I want to highlight. The ExecuteRead/Write and ExecutableQuery interfaces make use of the drivers inbuilt retry mechanism. So if a transient error, such as connectivity issues, occurs the driver will rollback the transaction and restart it again. With you sending the results to a browser I suspect that this kind of retriability behaviour is not what you want as it would start sending the records from the beginning again.

This leaves two options. As mentioned above, making use of the Session.Run interface is one way to go. This is the simplest, and has auto commit behaviour.

The second, should you want more control or to do something like batch multiple queries together is to handle transactions yourself. This would involve using the Session.BeginTransactionAsync and Transaction.Run, Transaction.Commit and Transaction.Rollback methods. You would have to handle any surfaced errors yourself and obviously decide when to commit and rollback.

Thats a bunch of information there, I hope it helps clarify things for you.

2 Likes

Hey @AndyHeap-NeoTec - thank you very much for your in-depth explanation, this makes a lot of sense :slight_smile: I have a lot to learn, I'm currently watching some of the tutorial series on YouTube.

@joshcornejo - Apologies, you had already provided a working solution, but I was so fixated on the code being located directly in the controller action instead of in a service that I didn't inspect your code properly.

I now settled on this method in my service:

var session = driver.AsyncSession();
var cursor = await session.RunAsync(_query);

await foreach (var record in cursor.WithCancellation(cancellationToken))
{
    yield return record.AsObject<Connection>();
}

Incredible powerful stuff :slight_smile:

Thanks to both of you!

1 Like

@johanndev there's an extension method you could use there to simplify that code even more:

var session = driver.AsyncSession();
var cursor = await session.RunAsync(_query);
return cursor.AsObjects<Connection>(cancellationToken);

This will return you an IAsyncEnumerable<Connection> - the code in the extension method looks identical to the loop you have there.

Nice, thanks for the info!