Big CSV Datasets and Error regarding unexpected characters

Hello Neo4j Community,

I'm a beginner in using Neo4j and Python and I've been grappling with an issue for about two weeks now that I can't seem to resolve, so I'm reaching out for some help.

I have large CSV files (about 4 million lines per file) that I'm trying to import into a Neo4j database using the py2neo library in Python. Due to the size of the files, I'm reading and importing the files in chunks using the LOAD CSV Cypher command.

However, my CSV files contain some lines that are faulty and cause a DatabaseError when I try to load them into the database. When the LOAD CSV command encounters a faulty line, it seems to stop processing the rest of the current chunk and does not move on to the next line.

I've attempted to clean the CSV files beforehand, but given the size of the datasets and the variability of the errors, this approach has proven to be impractical.

Here's my Python script:

from py2neo import Graph, DatabaseError
import re

from entity_attributes import organization_properties, acquisitions_properties, degrees_properties, events_properties, category_groups_properties, event_appearances_properties, funds_properties, investors_properties, ipos_properties, jobs_properties, funding_rounds_properties

def connect_to_neo4j_graph(uri, user, password):
    return Graph(uri, auth=(user, password))

class Create_Entity:
    def __init__(self, graph, csv_path):
        self.graph = graph
        self.csv_path = csv_path

    def clear_database_in_batches(self, batch_size=100000):
        while True:
            # Match and detach delete nodes and relationships in batches
            query = f"MATCH (n) WITH n LIMIT {batch_size} DETACH DELETE n"
            result = self.graph.run(query).stats()

            deleted_nodes = result.get("nodes_deleted", 0)
            if deleted_nodes == 0:
                # No more nodes to delete, break the loop
                break

    def add_constraint(self, label, prop):
        self.graph.run(
            f"CREATE CONSTRAINT {label}_unique_{prop} IF NOT EXISTS FOR (n:{label}) REQUIRE n.{prop} IS UNIQUE")

    def load_in_chunks(self, label, properties, chunk_size=100000):
        total_rows = self.graph.run(
            f"LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line RETURN COUNT(*) AS count").evaluate()
        num_chunks = (total_rows + chunk_size - 1) // chunk_size  # Calculate number of chunks

        for chunk_index in range(num_chunks):
            skip_rows = chunk_index * chunk_size
            query = f"""
                LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line
                WITH line SKIP {skip_rows} LIMIT {chunk_size}
                MERGE (n:{label} {properties})
            """
            self.graph.run(query)

class Organization(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///organizations.csv")

class Person(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///people.csv")

# clean " und \


class Acquisitions(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///acquisitions.csv")


class Degrees(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///degrees.csv")


class Events(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///events.csv")


class CategoryGroups(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///category_groups.csv")


class EventAppearances(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///event_appearances.csv")


class Funds(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///funds.csv")


class Investors(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///investors.csv")


class IPOs(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///ipos.csv")

class Jobs(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///jobs.csv")


class FundingRounds(Create_Entity):
    def __init__(self, graph):
        super().__init__(graph, "file:///funding_rounds.csv")


##############################


graph = connect_to_neo4j_graph("bolt://localhost:7687", "neo4j", "password4graph")



########################
# Erstellen der Knoten #
########################

# Organization
org = Organization(graph)
org.clear_database_in_batches(batch_size=100000)
org.add_constraint("Organization", "uuid")
org.load_in_chunks("Organization", organization_properties, chunk_size=100000)

# People
# Person
person = Person(graph)
person.clear_database_in_batches(batch_size=100000)
person.add_constraint("Person", "uuid")
person.load_in_chunks("Person", person_properties, chunk_size=100000)


# Acquisitions
acq = Acquisitions(graph)
acq.clear_database_in_batches(batch_size=100000)
acq.add_constraint("Acquisitions", "uuid")
acq.load_in_chunks("Acquisitions", acquisitions_properties, chunk_size=100000)

# Degrees
deg = Degrees(graph)
deg.clear_database_in_batches(batch_size=100000)
deg.add_constraint("Degrees", "uuid")
deg.load_in_chunks("Degrees", degrees_properties, chunk_size=100000)

# Events
eve = Events(graph)
eve.clear_database_in_batches(batch_size=100000)
eve.add_constraint("Events", "uuid")
eve.load_in_chunks("Events", events_properties, chunk_size=100000)

# Category Groups
cat_grp = CategoryGroups(graph)
cat_grp.clear_database_in_batches(batch_size=100000)
cat_grp.add_constraint("CategoryGroups", "uuid")
cat_grp.load_in_chunks("CategoryGroups", category_groups_properties, chunk_size=100000)

# Event Appearances
evt_app = EventAppearances(graph)
evt_app.clear_database_in_batches(batch_size=100000)
evt_app.add_constraint("EventAppearances", "uuid")
evt_app.load_in_chunks("EventAppearances", event_appearances_properties, chunk_size=100000)

# Funds
fnd = Funds(graph)
fnd.clear_database_in_batches(batch_size=100000)
fnd.add_constraint("Funds", "uuid")
fnd.load_in_chunks("Funds", funds_properties, chunk_size=100000)

# Investors
inv = Investors(graph)
inv.clear_database_in_batches(batch_size=100000)
inv.add_constraint("Investors", "uuid")
inv.load_in_chunks("Investors", investors_properties, chunk_size=100000)

# IPOs
ipo = IPOs(graph)
ipo.clear_database_in_batches(batch_size=100000)
ipo.add_constraint("IPOs", "uuid")
ipo.load_in_chunks("IPOs", ipos_properties, chunk_size=100000)

# Jobs
job = Jobs(graph)
job.clear_database_in_batches(batch_size=100000)
job.add_constraint("Jobs", "uuid")
job.load_in_chunks("Jobs", jobs_properties, chunk_size=100000)

# Funding Rounds
fr = FundingRounds(graph)
fr.clear_database_in_batches(batch_size=100000)
fr.add_constraint("FundingRounds", "uuid")
fr.load_in_chunks("FundingRounds", funding_rounds_properties, chunk_size=100000)

The relevant Part being:

    def load_in_chunks(self, label, properties, chunk_size=100000):
        total_rows = self.graph.run(
            f"LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line RETURN COUNT(*) AS count").evaluate()
        num_chunks = (total_rows + chunk_size - 1) // chunk_size  # Calculate number of chunks

        for chunk_index in range(num_chunks):
            skip_rows = chunk_index * chunk_size
            query = f"""
                LOAD CSV WITH HEADERS FROM '{self.csv_path}' AS line
                WITH line SKIP {skip_rows} LIMIT {chunk_size}
                MERGE (n:{label} {properties})
            """
            self.graph.run(query)

My question is: how can I modify this script so that it skips over the faulty lines and continues loading the rest of the rows in the chunk? Is there a way to do this using the LOAD CSV command, or is there another method I should be using to import my CSV file?

I'm a bit stuck and I really need some guidance on how to proceed. Any help would be greatly appreciated.

Thank you in advance! :slight_smile:

If you know all the potential issues with each row, you can test for each condition and skip over the row. Put a ‘where’ clause between your ‘with’ and ‘merge’ clauses test for the conditions to pass the row, which would be the negative of the conditions to exclude the row.