Neo4j python driver with asyncio.gather() crashes

I've been trying to run some read queries concurrently using asyncio tasks and gather(). I've tried to run the queries concurrently within a transaction. session and driver, but they all fail. Does anyone have an idea on how to run multiple read queries concurrently using neo4j python driver >v5?

Neo4j version: 4.4
Neo4j Python driver version: 5.3.0
Python 3.10

Here is a reproducible problem:

import asyncio
import traceback

from neo4j import AsyncGraphDatabase


async def test_1_tx(tx):
    query = "RETURN 1"
    result = await tx.run(query)
    record = await result.value()
    print("test_1_tx", record)

    tasks = []
    for x in range(10):
        tasks.append(asyncio.create_task(test_2_tx(tx)))

    await asyncio.gather(*tasks)

async def test_2_tx(tx):
    query = "RETURN 2"
    result = await tx.run(query)
    record = await result.value()
    print("test_2_tx", record)


async def session_task(driver):
    async with driver.session() as session:
        await session.execute_read(test_1_tx)


async def main():
    uri = "neo4j://localhost:7687"
    driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)

    try:
        async with driver.session() as session:
            await session.execute_read(test_1_tx)
    except Exception as e:
        print("asyncio gather within transaction failed")
        print(traceback.format_exc())

    try:
        async with driver.session() as session:
            tasks = []
            for x in range(10):
                tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))

            await asyncio.gather(*tasks)
    except Exception as e:
        print("asyncio gather within session failed")
        print(traceback.format_exc())

    
    try:
        tasks = []
        for x in range(10):
            tasks.append(asyncio.create_task(session_task(driver)))

        await asyncio.gather(*tasks)
    except Exception as e:
        print("asyncio gather within driver failed")
        print(traceback.format_exc())


asyncio.run(main())

Stacktrace of error:


Traceback (most recent call last):
    File "../test.py", line 37, in main
        async with driver.session() as session:
    File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 115, in __aexit__
        await self.close()
    File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 197, in close
        await self._connection.fetch_all()
    File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 669, in fetch_all
        detail_delta, summary_delta = await self.fetch_message()
    File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 652, in fetch_message
        tag, fields = await self.inbox.pop(
    File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 74, in pop
        await self._buffer_one_chunk()
    File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 53, in _buffer_one_chunk
        await receive_info_buffer(self._socket, self._buffer, 2)
    File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 290, in receive_info_buffer
        n = await sock.recv_info(view[buffer.used:end]), end - buffer.used)
    File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 157, in recv_into
        res = await self._wait_for_io(io_fut)
    File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 116, in _wait_for_io
        return await wait_for(io_fut, timeout)
    File "../lib/python3.10/site-packages/neo4j/_async_compat/shims/__init__.py", line 70, in _wait_for
        return await fut 
    File "../lib/python3.10/asyncio/streams.py", line 668, in read
        await self._wait_for_data('read')
    File "../lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
        raise RuntimeError(
    RuntimeError: read() called while another coroutine is already waiting for incoming data

Another regular error that occurs:

Failed to read from defunct connection IPv4Address(('localhost', 7687)) (IPv4Address(('127.0.0.1', 7687)))

Hi @Bertel I remembered your post today when a new version of the python driver docs got published. There's a specific section on running concurrent transactions with asyncio, does this help? https://neo4j.com/docs/python-manual/current/concurrency/

Hi, that example shows running a single query. I want to run multiple read queries in a for-loop concurrently, preferably within the same transaction. Do you have an example of this?

Hi,

lines 15 and 46 is where the issues lie. Just like sessions in the sync world aren't thread safe, are sessions in the async world not "task safe". They are not meant to be used concurrently. Please spawn (at least) one session per Task and don't share them across Tasks. The same holds true for transactions. You can't use those concurrently.

Please also have a read in the API docs here as there are some asyncio functions that might catch you off guard by creating Tasks in the background for you.

I've put some comments into your code to explain the problematic areas

import asyncio
import traceback

from neo4j import AsyncGraphDatabase


async def test_1_tx(tx):
    query = "RETURN 1"
    result = await tx.run(query)
    record = await result.value()
    print("test_1_tx", record)

    tasks = []
    for x in range(10):
        # this does not work, you can't "fork" a transaction
        # tasks.append(asyncio.create_task(test_2_tx(tx)))
        await test_2_tx(tx)

    # await asyncio.gather(*tasks)

async def test_2_tx(tx):
    query = "RETURN 2"
    result = await tx.run(query)
    record = await result.value()
    print("test_2_tx", record)


async def session_task(driver):
    async with driver.session() as session:
        await session.execute_read(test_1_tx)


async def main():
    uri = "neo4j://localhost:7687"
    driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)

    try:
        async with driver.session() as session:
            await session.execute_read(test_1_tx)
    except Exception as e:
        print("asyncio gather within transaction failed")
        print(traceback.format_exc())

    # try:
    #     async with driver.session() as session:
    #         tasks = []
    #         for x in range(10):
    #             # this does not work, sessions are not safe to be used concurrently
    #             tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))
    #
    #         await asyncio.gather(*tasks)
    # except Exception as e:
    #     print("asyncio gather within session failed")
    #     print(traceback.format_exc())


    try:
        tasks = []
        for x in range(10):
            # this is much better, create the session in the new Task!
            tasks.append(asyncio.create_task(session_task(driver)))

        await asyncio.gather(*tasks)
    except Exception as e:
        print("asyncio gather within driver failed")
        print(traceback.format_exc())


asyncio.run(main())

One closing thought (pun not intended): you don't close the driver object at the end of your script. You might want to use a context (async with AsyncGraphDatabase.driver(...) as driver:).

Hi, I run into the same issue.
I'm having multiple nodes, relations that would like to create concurrently.

Any suggestions how to do that ?

Regards,
P