Hey there,
I'm currently in the process of developing a Python library to streamline the import of event data into a Neo4j database (GitHub - PromG-dev/promg-core). The import flow is pretty straightforward: users specify the location of the input file, which I then preprocess and move to the import folder of the database and kick off the import process. Once the import is done, I tidy up by deleting the file from the import folder. However, after recently upgrading my database from version 5.9.0 to 5.10.0, I hit a snag. The error still prevails in version 5.17 (last tested).
The issue arises when I attempt to delete the file post-import. I encounter a PermissionError
, signaling that the file is still in use by another process. It seems the database is holding onto the file longer than anticipated, causing a conflict with my cleanup operation.
More specifically, I get this error: PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: '<ne04j>\\import\\import_file.csv
To remedy this, I'm seeking advice on how to synchronize my code with the completion of the import process. Currently, I'm using Python 3.11 along with the Neo4j driver version 5.10.0, running Neo4j version 5.10 with both APOC and APOC extended installed.
Here's a minimal snippet of the code I'm using for the import process:
# Import file
exec_query(query='''CALL apoc.periodic.iterate('
CALL apoc.load.csv("import_file.csv") yield map as row return row',
'CREATE (record:Record)
SET record += row'
, {batchSize:10000, parallel:true, retries: 1});''')
# Delete the file from the import directory
path = Path(self.get_import_directory(), "import_file.csv")
os.remove(path)
And here's the exec_query
function responsible for executing queries:
def exec_query(query: str, **kwargs) \
-> Optional[List[Dict[str, Any]]]:
"""
Write a transaction of the query to the server and return the result
@param query: string, query to be executed
@param database: string, Name of the database
@return: The result of the query or None
"""
def run_query(tx: neo4j.Transaction, _query: str, **_kwargs) \
-> Optional[tuple[List[Dict[str, Any]], neo4j.ResultSummary]]:
"""
Run the query and return the result of the query
@param tx: transaction class on which we can perform queries to the database
@param _query: string
@return: The result of the query or None if there is no result
"""
# get the results after the query is executed
try:
_result = tx.run(_query, _kwargs)
_result_records = _result.data()
_summary = _result.consume() #consume and exhaust the results
except Exception as inst:
self.close_connection()
print(inst)
else:
if _result_records is not None and _result_records != []:
# return the values if result is not none or empty list
return _result_records, _summary
else:
return None, _summary
with self.driver.session(database="neo4j") as session:
result, summary = session.execute_write(run_query, query, **kwargs)
return result
Any insights on how I can ensure that my code waits for the import to finish before attempting file deletion would be greatly appreciated. I suspect there might be something amiss with my transactions. Thanks in advance!