Hi,
I have a big sized pandas data frame. I want to load it into neo4j database using the neo4j python driver. How can I load it in batches?
OR
Is there any way I can use the file (stored in a local driver) directly something like this using the neo4j python driver:
LOAD CSV WITH HEADERS FROM 'file://///genes.csv' AS line CALL { WITH line CREATE (:Gene {symbol: line.symbol})} IN TRANSACTIONS OF 100 ROWS"
Thanks
Hi,
if you want to import CSV files, they must either be located in the import directory on the server or be accessible through HTTP(S), or FTP. See also the docs.
As to how to batch things manually, here's a suggestion I threw together quickly. Please play around with it, adjust it to your needs, and fine-tune the constants. Especially the batch-size is very dependent on the work load.
import asyncio
import neo4j
import numpy as np
import pandas as pd
# some sample data
data = np.stack(
np.meshgrid(np.arange(-1000, 1000), np.arange(-1000, 1000)), -1
).reshape(-1, 2)
df = pd.DataFrame(data, columns=["x", "y"])
URL = "neo4j://localhost:7687"
AUTH = ("neo4j", "pass")
DB = "neo4j"
BATCH_SIZE = 10000
MAX_CONCURRENCY = 50
async def upload_batch(semaphore, driver, batch):
async with semaphore:
await driver.execute_query(
"UNWIND range(0, $len - 1) AS i "
"CREATE (n:Node {x: $data['x'][i], y: $data['y'][i]})",
data=batch,
len=len(batch),
database_=DB,
)
async def main():
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with neo4j.AsyncGraphDatabase.driver(URL, auth=AUTH) as driver:
# optionally clear out the DB for testing
# await driver.execute_query("MATCH (n) DETACH DELETE n", database_=DB)
tasks = [
upload_batch(semaphore, driver, df[offset:(offset + BATCH_SIZE)])
for offset in range(0, len(df), BATCH_SIZE)
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())