I've been trying to run some read queries concurrently using asyncio tasks and gather(). I've tried to run the queries concurrently within a transaction. session and driver, but they all fail. Does anyone have an idea on how to run multiple read queries concurrently using neo4j python driver >v5?
Neo4j version: 4.4
Neo4j Python driver version: 5.3.0
Python 3.10
Here is a reproducible problem:
import asyncio
import traceback
from neo4j import AsyncGraphDatabase
async def test_1_tx(tx):
query = "RETURN 1"
result = await tx.run(query)
record = await result.value()
print("test_1_tx", record)
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(test_2_tx(tx)))
await asyncio.gather(*tasks)
async def test_2_tx(tx):
query = "RETURN 2"
result = await tx.run(query)
record = await result.value()
print("test_2_tx", record)
async def session_task(driver):
async with driver.session() as session:
await session.execute_read(test_1_tx)
async def main():
uri = "neo4j://localhost:7687"
driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "pass"),)
try:
async with driver.session() as session:
await session.execute_read(test_1_tx)
except Exception as e:
print("asyncio gather within transaction failed")
print(traceback.format_exc())
try:
async with driver.session() as session:
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(session.execute_read(test_2_tx)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within session failed")
print(traceback.format_exc())
try:
tasks = []
for x in range(10):
tasks.append(asyncio.create_task(session_task(driver)))
await asyncio.gather(*tasks)
except Exception as e:
print("asyncio gather within driver failed")
print(traceback.format_exc())
asyncio.run(main())
Stacktrace of error:
Traceback (most recent call last):
File "../test.py", line 37, in main
async with driver.session() as session:
File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 115, in __aexit__
await self.close()
File "../lib/python3.10/site-packages/neo4j/_async/work/session.py", line 197, in close
await self._connection.fetch_all()
File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 669, in fetch_all
detail_delta, summary_delta = await self.fetch_message()
File "../lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 652, in fetch_message
tag, fields = await self.inbox.pop(
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 74, in pop
await self._buffer_one_chunk()
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 53, in _buffer_one_chunk
await receive_info_buffer(self._socket, self._buffer, 2)
File "../lib/python3.10/site-packages/neo4j/_async/io/_common.py", line 290, in receive_info_buffer
n = await sock.recv_info(view[buffer.used:end]), end - buffer.used)
File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 157, in recv_into
res = await self._wait_for_io(io_fut)
File "../lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 116, in _wait_for_io
return await wait_for(io_fut, timeout)
File "../lib/python3.10/site-packages/neo4j/_async_compat/shims/__init__.py", line 70, in _wait_for
return await fut
File "../lib/python3.10/asyncio/streams.py", line 668, in read
await self._wait_for_data('read')
File "../lib/python3.10/asyncio/streams.py", line 487, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Another regular error that occurs:
Failed to read from defunct connection IPv4Address(('localhost', 7687)) (IPv4Address(('127.0.0.1', 7687)))