Hello Neo4j Community,
I'm a beginner in using Neo4j and Python and I've been grappling with an issue for about two weeks now that I can't seem to resolve, so I'm reaching out for some help.
I have large CSV files (about 4 million lines per file) that I'm trying to import into a Neo4j database using the py2neo library in Python. Due to the size of the files, I'm reading and importing the files in chunks using the LOAD CSV
Cypher command.
However, my CSV files contain some lines that are faulty and cause a DatabaseError
when I try to load them into the database. When the LOAD CSV
command encounters a faulty line, it seems to stop processing the rest of the current chunk and does not move on to the next line.
I've attempted to clean the CSV files beforehand, but given the size of the datasets and the variability of the errors, this approach has proven to be impractical.
Here's my Python script:
from py2neo import Graph, DatabaseError
import re
from entity_attributes import organization_properties, acquisitions_properties, degrees_properties, events_properties, category_groups_properties, event_appearances_properties, funds_properties, investors_properties, ipos_properties, jobs_properties, funding_rounds_properties
def connect_to_neo4j_graph(uri, user, password):
return Graph(uri, auth=(user, password))
class Create_Entity:
def __init__(self, graph, csv_path):
self.graph = graph
self.csv_path = csv_path
def clear_database_in_batches(self, batch_size=100000):
while True:
# Match and detach delete nodes and relationships in batches
query = f"MATCH (n) WITH n LIMIT {batch_size} DETACH DELETE n"
result = self.graph.run(query).stats()
deleted_nodes = result.get("nodes_deleted", 0)
if deleted_nodes == 0:
# No more nodes to delete, break the loop
break
def add_constraint(self, label, prop):
self.graph.run(
f"CREATE CONSTRAINT {label}_unique_{prop} IF NOT EXISTS FOR (n:{label}) REQUIRE n.{prop} IS UNIQUE")
def load_in_chunks(self, label, properties, chunk_size=100000):
total_rows = self.graph.run(
f"LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line RETURN COUNT(*) AS count").evaluate()
num_chunks = (total_rows + chunk_size - 1) // chunk_size # Calculate number of chunks
for chunk_index in range(num_chunks):
skip_rows = chunk_index * chunk_size
query = f"""
LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line
WITH line SKIP {skip_rows} LIMIT {chunk_size}
MERGE (n:{label} {properties})
"""
self.graph.run(query)
class Organization(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///organizations.csv")
class Person(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///people.csv")
# clean " und \
class Acquisitions(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///acquisitions.csv")
class Degrees(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///degrees.csv")
class Events(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///events.csv")
class CategoryGroups(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///category_groups.csv")
class EventAppearances(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///event_appearances.csv")
class Funds(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///funds.csv")
class Investors(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///investors.csv")
class IPOs(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///ipos.csv")
class Jobs(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///jobs.csv")
class FundingRounds(Create_Entity):
def __init__(self, graph):
super().__init__(graph, "file:///funding_rounds.csv")
##############################
graph = connect_to_neo4j_graph("bolt://localhost:7687", "neo4j", "password4graph")
########################
# Erstellen der Knoten #
########################
# Organization
org = Organization(graph)
org.clear_database_in_batches(batch_size=100000)
org.add_constraint("Organization", "uuid")
org.load_in_chunks("Organization", organization_properties, chunk_size=100000)
# People
# Person
person = Person(graph)
person.clear_database_in_batches(batch_size=100000)
person.add_constraint("Person", "uuid")
person.load_in_chunks("Person", person_properties, chunk_size=100000)
# Acquisitions
acq = Acquisitions(graph)
acq.clear_database_in_batches(batch_size=100000)
acq.add_constraint("Acquisitions", "uuid")
acq.load_in_chunks("Acquisitions", acquisitions_properties, chunk_size=100000)
# Degrees
deg = Degrees(graph)
deg.clear_database_in_batches(batch_size=100000)
deg.add_constraint("Degrees", "uuid")
deg.load_in_chunks("Degrees", degrees_properties, chunk_size=100000)
# Events
eve = Events(graph)
eve.clear_database_in_batches(batch_size=100000)
eve.add_constraint("Events", "uuid")
eve.load_in_chunks("Events", events_properties, chunk_size=100000)
# Category Groups
cat_grp = CategoryGroups(graph)
cat_grp.clear_database_in_batches(batch_size=100000)
cat_grp.add_constraint("CategoryGroups", "uuid")
cat_grp.load_in_chunks("CategoryGroups", category_groups_properties, chunk_size=100000)
# Event Appearances
evt_app = EventAppearances(graph)
evt_app.clear_database_in_batches(batch_size=100000)
evt_app.add_constraint("EventAppearances", "uuid")
evt_app.load_in_chunks("EventAppearances", event_appearances_properties, chunk_size=100000)
# Funds
fnd = Funds(graph)
fnd.clear_database_in_batches(batch_size=100000)
fnd.add_constraint("Funds", "uuid")
fnd.load_in_chunks("Funds", funds_properties, chunk_size=100000)
# Investors
inv = Investors(graph)
inv.clear_database_in_batches(batch_size=100000)
inv.add_constraint("Investors", "uuid")
inv.load_in_chunks("Investors", investors_properties, chunk_size=100000)
# IPOs
ipo = IPOs(graph)
ipo.clear_database_in_batches(batch_size=100000)
ipo.add_constraint("IPOs", "uuid")
ipo.load_in_chunks("IPOs", ipos_properties, chunk_size=100000)
# Jobs
job = Jobs(graph)
job.clear_database_in_batches(batch_size=100000)
job.add_constraint("Jobs", "uuid")
job.load_in_chunks("Jobs", jobs_properties, chunk_size=100000)
# Funding Rounds
fr = FundingRounds(graph)
fr.clear_database_in_batches(batch_size=100000)
fr.add_constraint("FundingRounds", "uuid")
fr.load_in_chunks("FundingRounds", funding_rounds_properties, chunk_size=100000)
The relevant Part being:
def load_in_chunks(self, label, properties, chunk_size=100000):
total_rows = self.graph.run(
f"LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line RETURN COUNT(*) AS count").evaluate()
num_chunks = (total_rows + chunk_size - 1) // chunk_size # Calculate number of chunks
for chunk_index in range(num_chunks):
skip_rows = chunk_index * chunk_size
query = f"""
LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line
WITH line SKIP {skip_rows} LIMIT {chunk_size}
MERGE (n:{label} {properties})
"""
self.graph.run(query)
My question is: how can I modify this script so that it skips over the faulty lines and continues loading the rest of the rows in the chunk? Is there a way to do this using the LOAD CSV
command, or is there another method I should be using to import my CSV file?
I'm a bit stuck and I really need some guidance on how to proceed. Any help would be greatly appreciated.
Thank you in advance!