Import json with py2neo

I was building a python importer of json data based on py2neo

from py2neo import Graph, Node, Relationship

class Neo4jInterface:
    def __init__(self, uri, user, password):
        self.graph = Graph(uri, auth=(user, password))

    def create_node(self, label, **properties):
        node = Node(label, **properties)
        self.graph.create(node)
        return node

    def create_relationship(self, node1, relationship_type, node2, **properties):
        rel = Relationship(node1, relationship_type, node2, **properties)
        self.graph.create(rel)

    def process_json_data(self, data):
        ___node = self.create_node("xxx", **data['__'])

        # Create xxx node and associate it with xxx
        __ = self.create_node("__", **data['__'])
        self.create_relationship(__, "HAS_xxx", __)

# Usage
neo4j_interface = Neo4jInterface("bolt://localhost:7687", "username", "password")

# Assuming you've read your JSON data into a variable called `data`
neo4j_interface.process_json_data(data)

This is just a simple example and the code was getting pretty complicated. Now that py2neo seems depreciated.

i would like a systematic way to define my nodes and relationships and import them into my neo2j database.

I can save my data in a wide variety of formats, it's saved on disk in json format that i read and process.

I would like some assistance in being able to systematically define the nodes and relationships and import the data.

trying to create a simple node and import in parallel, i get this error:

/neo_importer.py", line 81, in main
    result = future.result()
             ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/username/Documents/tmp_project/tmp/TEST/../neo_importer.py", line 40, in load_into_neo4j
    conn.run_query(page.to_cypher())
  File "/home/username/Documents/tmp_project/tmp/Neo4jConnection.py", line 32, in run_query
    result = session.run(query)
             ^^^^^^^^^^^^^^^^^^
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/work/session.py", line 317, in run
    self._auto_result._run(
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/work/result.py", line 166, in _run
    self._attach()
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/work/result.py", line 274, in _attach
    self._connection.fetch_message()
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_common.py", line 180, in inner
    func(*args, **kwargs)
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_bolt.py", line 826, in fetch_message
    res = self._process_message(tag, fields)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_bolt5.py", line 370, in _process_message
    response.on_failure(summary_metadata or {})
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_common.py", line 240, in on_failure
    self.connection.reset()
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_bolt5.py", line 324, in reset
    self.fetch_all()
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_bolt.py", line 840, in fetch_all
    detail_delta, summary_delta = self.fetch_message()
                                  ^^^^^^^^^^^^^^^^^^^^
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_bolt.py", line 823, in fetch_message
    tag, fields = self.inbox.pop(
                  ^^^^^^^^^^^^^^^
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_common.py", line 74, in pop
    self._buffer_one_chunk()
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_common.py", line 58, in _buffer_one_chunk
    receive_into_buffer(
  File "/home/username/.local/lib/python3.11/site-packages/neo4j/_sync/io/_common.py", line 305, in receive_into_buffer
    buffer.data += bytearray(end - len(buffer.data))
BufferError: Existing exports of data: object cannot be re-sized

i have no idea how to debug this issue, any technical assistance would be appreciated

Sessions, transactions, and results aren't concurrency safe. You have to create a new session per thread/async task. See if this example helps Python - generator already executing - #6 by rouven_bauer

Head's up that I've forked this discussion to its own thread.

Best,
ABK

2 Likes

I am still having concurrency issues, even going as far as creating the nodes.

let's say i define uniq_id as a number 12, running this cypher query below from a function like this:

        query = """
        MERGE (tl:TopicsList {id: $id})
        """
        parameters = {'id': self.uniq_id}

        conn = Neo4jConnection(uri, auth_type="none")
        conn.run_query(query, parameters)
        conn.close()

from a loop like this:

        with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
            futures = []
            for base_filename, url_filename in url_files.items():
                if base_filename in page_processed_files:
                    futures.append(executor.submit(load_into_neo4j, url_filename, page_processed_files[base_filename], uri, user, password))

i get anywhere from 1 to len(futures_list) node with the same id. In the above code MAX_THREADS = 32

I've tried reading the documentation here: Unique IDs — Neo4j.rb 10.0.1 documentation

I've tried everything suggested in there and nothing works.

I always get duplicate nodes created.

i've tried run_query, i've tried session.run

I've tried defining my own unique ids.

If it's user error please point me in the correct direction.

This is a tad out of my main competence. I'm mostly concerned with drivers (i.e., connectivity) and less with Cypher, Kernel, etc. So take this with a grain of salt.

Have you created a unique index over your id attribute? See Syntax - Cypher Manual under "Create a node property uniqueness constraint". I heard people mention that this is necessary to avoid the kind of race conditions in the DBMS you are experiencing. However, I never played around with it myself.

Let me know either way (if it works, or doesn't), so I can learn something with you :grin:

That's exactly what I was trying to implement but the more of these features I try to implement the further from the main task of getting data into the db and the more issues I keep running into.

[edit]
What I mean by "I was trying to implement" is: when I try to create an index as recommended in the documentation, i get random crashes and still have duplicate nodes being created.

I do not need to use python and I can transform the data so I am pretty flexible.

I would just like a somewhat stable base to work from.

Just to be 100% explicit and emphasize what Rouven said, you need to create a uniqueness constraint to avoid duplicates, indices won't help.

CREATE CONSTRAINT unique_topics_id IF NOT EXISTS FOR (n:TopicsList) REQUIRE n.id IS UNIQUE

that code will fail sporadically if I try to call it from the code i placed above.

Can you share the full project and some instructions to run into the situation you describe? That will likely help us help you.

relevant code section:

the threadpool loop:

for file in os.listdir(processed_folder_path):
        with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
            futures = []
            for base_filename, OBJECT_filename in OBJECT_files.items():
                if base_filename in OBJECTB_processed_files:
                    futures.append(executor.submit(load_into_neo4j, OBJECT_filename, OBJECTB_processed_files[base_filename], uri, user, password))

the function that will crash

def load_into_neo4j(OBJECT_filename, OBJECTB_filename, uri, user, password):
    OBJECT_data = load_from_disk(OBJECT_filename)
    OBJECT = OBJECT.from_dict(OBJECT_data)

    OBJECT.to_cypher("c", uri, user, password)
    # conn = Neo4jConnection(uri, auth_type="none")
    return False

initially I tried:

  1. created one connection and passed it into the executor.submit function
  2. created a db connection within the load_into_neo4j use it and close before the end of the function.
  3. create a a connection in the object class, generate the query string and run the query from the class.

I simplified it down to just callling:

        query = """
        MERGE (tl:TopicsList {id: 0000})
        """
        parameters = {'id': 000}

        conn = Neo4jConnection(uri, auth_type="none")
        conn.run_query(query, parameters)
        conn.close()

from within the load_into_neo4j function.

calling that simple query from within the futures.append(executor.submit... function and it still creates duplicate nodes.

The most confusing part is that multiple runs of the program creates a different amount of TopicsList nodes, it should only be one

Before even running the above program: when you open Cypher Shell or Neo4j browser on your target Neo4j server (assuming a recent version), what is the result of:

SHOW CONSTRAINTS
YIELD *
WHERE entityType = "NODE"
    AND (type = "UNIQUENESS" OR type = "NODE_KEY")
    AND labelsOrTypes = ["TopicsList"]
    AND properties = ["id"]
RETURN name, type

Does it return a row or does it return nothing?

Apologies for the delay.

That cypher command returns nothing because my db is empty.

I typically run match n detach delete n to clear it.

I've created a simple test_case.py file, contents below:

import os
import sys
import json
import pandas as pd
from urllib.parse import urlparse


parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

from concurrent.futures import ThreadPoolExecutor


import time
from neo4j import GraphDatabase

class Neo4jConnection:
    def __init__(self, uri, auth_type="basic", user=None, password=None, realm=None, parameters=None):
        if auth_type == "basic":
            self._driver = GraphDatabase.driver(uri, auth=(user, password))
        elif auth_type == "none":
            self._driver = GraphDatabase.driver(uri, auth=None)
        elif auth_type == "custom":
            self._driver = GraphDatabase.driver(uri, auth=AuthTokens.custom("my_scheme", user, password, realm=realm, parameters=parameters))
        else:
            raise ValueError(f"Unsupported auth_type: {auth_type}")

    def close(self):
        self._driver.close()

    def run_query(self, query, parameters=None, database=None):
        # Check if the query is empty or consists only of whitespace
        if not query.strip():
            return

        with self._driver.session(database=database) as session:
            result = session.run(query, parameters)
            records = list(result)

            if not records:
                pass
            return records

    def ensure_database_exists(self, database_name):
        # Check if the database exists
        databases = self.run_query("SHOW DATABASES")
        if database_name not in [record["name"] for record in databases]:
            # Create the database if it doesn't exist
            self.run_query(f"CREATE DATABASE {database_name}")

    def assert_database_exists(self, database_name):
        # Check if the database exists
        databases = self.run_query("SHOW DATABASES")
        if database_name not in [record["name"] for record in databases]:
            raise ValueError(f"Database '{database_name}' does not exist!")

    def remove_database_if_exists(self, database_name):
        # Check if the database exists
        databases = self.run_query("SHOW DATABASES")
        if database_name in [record["name"] for record in databases]:
            # Drop the database if it exists
            self.run_query(f"DROP DATABASE {database_name}")

    def assert_database_not_exists(self, database_name):
        # Check if the database exists
        databases = self.run_query("SHOW DATABASES")
        if database_name in [record["name"] for record in databases]:
            raise ValueError(f"Database '{database_name}' already exists!")





#-------------------------------------------------------------------------
MAX_THREADS = 16
uri         = "bolt://localhost:7687"

def load_from_disk(filename):
    with open(filename, 'r') as f:
        return json.load(f)

def load_into_neo4j(_id, _name, _desc):
    global uri
    conn = Neo4jConnection(uri, auth_type="none")

    query = """
    CREATE (o:Object {id: $id, name: $name, description: $description})
    RETURN o
    """
    
    parameters = {
        'id': _id,
        'name': _name,
        'description': _desc
    }
    # print(
    # f"""
    # id:          {_id}
    # name:        {_name}
    # description: {_desc}
    # """)
    conn.run_query(query, parameters)
    conn.close()
    return False


def main(base_directory=None, uri=None, user=None, password=None):
    base_directory = base_directory
    processed_folder_path = os.path.join(base_directory, "processed")


    X = 4096
    OBJECTS = {}
    for i in range(X):
        obj = {
            "id": i,
            "name": f"Object_{i}",
            "description": f"This is object number {i}"
        }
        OBJECTS[i] = obj
    # print(OBJECTS)


    # Use ThreadPoolExecutor with MAX_THREADS to process files in parallel
    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        futures = []
        # for base_filename, url_filename in OBJECTS.items():
        for _index, _object in OBJECTS.items():
            _id   = _object.get('id', '')
            _name = _object.get('name', '')
            _desc = _object.get('description', '')
            futures.append(executor.submit(load_into_neo4j, _id, _name, _desc))

            for future in futures:
                result = future.result()


if __name__ == "__main__":
    main(sys.argv[0])

I just create some basic json objects and add them to the database. Sometimes the program crashes, sometimes there are duplicates, sometimes there's exactly 4096 objects in the db and I cannot understand the reason why.

As far as creating a uniqueness constraint, where would I place that? Should I run it each time I want to import data into the database or just once when the db is setup initially?

This project dumps a lot of small objects into the db.

MATCH (n) DETACH DELETE n removes data, not indices/constraints.

You would basically create the uniqueness or node key constraint, once, right after you create the database.

Side note: you can run CREATE DATABASE $name IF NOT EXISTS and DROP DATABASE $name IF EXISTS (unless you run very old Neo4j versions).