We had seen the warning logs during async tasks in python, The details are:
WARNING - /appdata/appuser/.conda/envs/data_analysis/lib/python3.10/site-packages/neo4j/_async/io/_pool.py:1118: RuntimeWarning: coroutine 'StreamReader.read' was never awaited
WARNING - await self.deactivate(address=address)
WARNING - RuntimeWarning: Enable tracemalloc to get the object allocation traceback
WARNING - /appdata/appuser/.conda/envs/data_analysis/lib/python3.10/site-packages/neo4j/_async/io/_pool.py:1118: RuntimeWarning: coroutine 'StreamWriter.drain' was never awaited
WARNING - await self.deactivate(address=address)
Our code is consist of 100 tasks with async mode to parsing the code for code indexing.
First, for async mode, we made a session on each driver.
async def _parallel_transaction(self, sem, driver, tx_func, i, record, **params):
async with sem:
async with driver.session(database=self.database) as s:
await tx_func(session=s, i=i, record=record, **params)
Logger.info(f"Done to process '{i + 1}' data set")
async def parallel_transaction(self, tx_func, data, **params):
sem = asyncio.Semaphore(self.pool_size)
async with AsyncGraphDatabase.driver(f"{self.uri}:{self.port}", auth=(self.user, self.passwd)) as driver:
coroutines = [
self._parallel_transaction(sem, driver, tx_func, i, record, **params)
for i, record in enumerate(data)
]
await asyncio.gather(*coroutines)
In this, The tx_func is core function to query using cypher. pool_size is 100, we mentioned above.
The warning logs are shown at this point.
query = f"""
MATCH (file: FILE)-[:HAS_FUNCTION]->(func: FUNCTION {{name: '{funcname}'}})
WHERE id(file) = $id
WITH func
{match_query}-[:HAS_TYPEDEF]->(typedef: TYPEDEF)
MERGE do=(func)-[:REFERENCE]->(typedef)
RETURN do
"""
Logger.debug(f"[task {task_num}] Type is: '{ref_type}', name: '{ref_name}', id: '{lastId}' {query}")
result_data, result_summary = await session.execute_write(tx_execute, query=query, id=lastId)
The function tx_execute is below:
async def tx_execute(tx, query, **params):
ret = await tx.run(query, **params)
data = await ret.data()
summary = await ret.consume()
return data, summary
We did write the await keyword on async func, But logs said no await in the async code. But the code is not our code, neo4j code.
What do we do to fix the logs?