Database is holding onto file after apoc.load.csv creating a permission error

Hey there,

I'm currently in the process of developing a Python library to streamline the import of event data into a Neo4j database (GitHub - PromG-dev/promg-core). The import flow is pretty straightforward: users specify the location of the input file, which I then preprocess and move to the import folder of the database and kick off the import process. Once the import is done, I tidy up by deleting the file from the import folder. However, after recently upgrading my database from version 5.9.0 to 5.10.0, I hit a snag. The error still prevails in version 5.17 (last tested).

The issue arises when I attempt to delete the file post-import. I encounter a PermissionError, signaling that the file is still in use by another process. It seems the database is holding onto the file longer than anticipated, causing a conflict with my cleanup operation.

More specifically, I get this error: PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: '<ne04j>\\import\\import_file.csv

To remedy this, I'm seeking advice on how to synchronize my code with the completion of the import process. Currently, I'm using Python 3.11 along with the Neo4j driver version 5.10.0, running Neo4j version 5.10 with both APOC and APOC extended installed.

Here's a minimal snippet of the code I'm using for the import process:

# Import file
exec_query(query='''CALL apoc.periodic.iterate('
                        CALL apoc.load.csv("import_file.csv") yield map as row return row',
                        'CREATE (record:Record)
                        SET record += row'
                    , {batchSize:10000, parallel:true, retries: 1});''')

# Delete the file from the import directory
path = Path(self.get_import_directory(), "import_file.csv")
os.remove(path)

And here's the exec_query function responsible for executing queries:

def exec_query(query: str, **kwargs) \
    -> Optional[List[Dict[str, Any]]]:
    """
    Write a transaction of the query to  the server and return the result
    @param query: string, query to be executed
    @param database: string, Name of the database
    @return: The result of the query or None
    """

    def run_query(tx: neo4j.Transaction, _query: str, **_kwargs) \
        -> Optional[tuple[List[Dict[str, Any]], neo4j.ResultSummary]]:

        """
            Run the query and return the result of the query
            @param tx: transaction class on which we can perform queries to the database
            @param _query: string
            @return: The result of the query or None if there is no result
        """
        # get the results after the query is executed
        try:
            _result = tx.run(_query, _kwargs)
            _result_records = _result.data()
            _summary = _result.consume() #consume and exhaust the results
        except Exception as inst:
            self.close_connection()
            print(inst)
        else:
            if _result_records is not None and _result_records != []:  
            # return the values if result is not none or empty list
                return _result_records, _summary
            else:
                return None, _summary

    with self.driver.session(database="neo4j") as session:
        result, summary = session.execute_write(run_query, query, **kwargs)
        return result

Any insights on how I can ensure that my code waits for the import to finish before attempting file deletion would be greatly appreciated. I suspect there might be something amiss with my transactions. Thanks in advance!

Quick semi-related note:

I see that you catch and swallow exceptions in your transaction function. Consider that the outcome of transaction functions is used by the driver to determine if the transaction should be committed or rolled back. So when a query fails (tx.run or an operation on _result) with a Neo4jError, this likely means that the transaction is broken beyond recovery. By swallowing the error, the driver will nevertheless try to commit the transaction. Depending on the driver version you'll get either the driver throw some exception (probably TransactionError) or a possibly harder to understand error from the server in such a case.

1 Like

Hi,

Thank you.
So, just to check, can I just remove the try except block to ensure that I don't swallow the error?

Will the rollback then happen automatically? And is there a method to check whether the transactions was committed or rolled back?

Kind regards,
Ava

Yes, you can remove it. On any Exception raised from the transaction function, the driver will roll back the transaction (if it's not already broken anyway). To check whether the transaction has been committed or rolled back, check session.execute_write with try/catch. Any exception raised within the transaction function will either make the driver retry the transaction if the error is deemed retryable (e.g., lost connectivity, transient server errors) or if it's not, the exception will be re-raised to the call-site of session.execute_write.

I highly recommend giving the driver manual a read. In particular this page Run your own transactions - Neo4j Python Driver Manual

1 Like

Really weird, it shouldn't happen, usually we close the file on consumption end.
Please open a github issue on APOC

Can you replace your use of apoc.load.csv just with a LOAD CSV WITH HEADERS which does the same.

You can also use CALL IN TRANSACTIONS now in 5.x which doesn't have parallel yet (soon coming).

1 Like

Thank you for your feedback and the pointers.

Just for reference for other people interested, I changed the exec_query as follows:

def exec_query(self, query: str, **kwargs) 
   -> Optional[List[Dict[str, Any]]]:
  """
  Write a transaction of the query to  the server and return the result
  @param query: string, query to be executed
  @param database: string, Name of the database
  @return: The result of the query or None
  """
  
  def run_query(tx: neo4j.Transaction, _query: str, **_kwargs) -> Tuple[
      Optional[List[Dict[str, Any]]], neo4j.ResultSummary]:
  
      """
          Run the query and return the result of the query
          @param tx: transaction class on which we can perform queries to the database
          @param _query: string
          @return: The result and the summary of the query 
      """
      # get the results after the query is executed
      _result = tx.run(_query, _kwargs)
      _result_records = _result.data()  # obtain dict representation
      _summary = _result.consume()  # exhaust the result
  
      return _result_records, _summary
  
  
  with self.driver.session(database="neo4j") as session:
      try: # try to commit the transaction, if the transaction fails, it is rolled back automatically
          result, summary = session.execute_write(run_query, query, **kwargs)
          return result
      except Exception as inst: # let user know the transaction failed and close the connection
          self.close_connection()
          print("Latest transaction was rolled back")
          print(f"This was your latest query: {query}")
          print(inst)

Thank you for your reply.

I opened an issue: apoc.load.csv does not close the file on consumption end · Issue #4078 · neo4j-contrib/neo4j-apoc-procedures · GitHub.

I did not show this, but I use some of the configuration options such as null values and the mapping. I will see if I can work around it.

Thank you, I will look into this.