cancel
Showing results for 
Search instead for 
Did you mean: 

Join the community at Nodes 2022, our free virtual event on November 16 - 17.

Example on how to use C# 8.0 IAsyncEnumerable with neo4 driver 4.0?

Thypari
Node

I am using the new Async driver and was wondering how to use it in conjunction with IAsyncEnumerable. Let's say I have 1million nodes and I want to stream them node by node, so it can be consumed by another methodA whenever this methodA is ready.

        public async IAsyncEnumerable<IRecord> GetRecords()
        {
            var session = _neo4J.Driver.AsyncSession(o => o.WithDatabase("neo4j"));
            try
            {
                var cursor = await session.RunAsync(
                    @"match (node:SomeNode)
                    return node");
                //somehow yield return the records here?
            }
            catch (Exception e)
            {
                var error = e.ToString();
            }
            finally
            {
                await session.CloseAsync();
            }
        }

        private void MethodA()
        {
            await foreach (var record in GetRecords())
            {
                // do something
            }
        }

What's the correct way to do this? Are there any async examples for the new 4.0 driver? What happens when the database nodes change while the execution is still in the for each loop?

1 ACCEPTED SOLUTION

charlotte_skard
Graph Buddy

Hello!

Welcome to the community 🙂

You would yield return the content of the .Current property:

public static async IAsyncEnumerable<IRecord> GetRecords(IDriver driver)
{
	var session = driver.AsyncSession(o => o.WithDatabase("neo4j"));
	try
	{
		var cursor = await session.RunAsync("MATCH (node:Movie) RETURN node ORDER BY node.title");
		while(await cursor.FetchAsync()){
			yield return cursor.Current;
		}
	}
	finally
	{
		await session.CloseAsync();
	}
}
  1. The FetchAsync() method will return true while there is still data in the stream, false when it's ended.
  2. cursor.Current contains the current value, which you'll want to yield.
  3. The catch(Exception e) code block has been removed, as you can't do a yield return within a try/catch.

If the nodes change whilst you're in the loop you will get the old version, for example, if you were streaming a list of Movies ordered by their Title, if someone was to change Cast Away to Cast Back you would still see Cast Away in your output.

One thing you might also want to consider is the Reactive Extensions version, which would give MethodA even more control over what it receives. I wrote a short post about that here: https://xclave.co.uk/2019/11/27/reactive-neo4j-using-net/

All the best

Chris

View solution in original post

1 REPLY 1

charlotte_skard
Graph Buddy

Hello!

Welcome to the community 🙂

You would yield return the content of the .Current property:

public static async IAsyncEnumerable<IRecord> GetRecords(IDriver driver)
{
	var session = driver.AsyncSession(o => o.WithDatabase("neo4j"));
	try
	{
		var cursor = await session.RunAsync("MATCH (node:Movie) RETURN node ORDER BY node.title");
		while(await cursor.FetchAsync()){
			yield return cursor.Current;
		}
	}
	finally
	{
		await session.CloseAsync();
	}
}
  1. The FetchAsync() method will return true while there is still data in the stream, false when it's ended.
  2. cursor.Current contains the current value, which you'll want to yield.
  3. The catch(Exception e) code block has been removed, as you can't do a yield return within a try/catch.

If the nodes change whilst you're in the loop you will get the old version, for example, if you were streaming a list of Movies ordered by their Title, if someone was to change Cast Away to Cast Back you would still see Cast Away in your output.

One thing you might also want to consider is the Reactive Extensions version, which would give MethodA even more control over what it receives. I wrote a short post about that here: https://xclave.co.uk/2019/11/27/reactive-neo4j-using-net/

All the best

Chris