cancel
Showing results for 
Search instead for 
Did you mean: 

Very slow performance on batch of MERGE queries

caiou
Node

Hello!
I've been trying to write a Twitter crawler that saves author data to Neo4j. I'm currently running a free tier AuraDB instance, and have been using the following python code (simplified) for my queries:

class Client:
  def __init__(self, uri: str, user: str, password: str):
    self.driver = GraphDatabase.driver(uri, auth=(user, password))

    with self.driver.session() as session:
      session.run("CREATE CONSTRAINT author_id IF NOT EXISTS ON (a:Author) ASSERT a.id IS UNIQUE")

  def write(self, query) -> List[Any]:
    with self.driver.session() as session:
      result = session.write_transaction(self.run_query, query)

    self.driver.close()
    return result

  def run_query(self, tx, query):
    result = tx.run(query)
    return result.graph()

This client instance is a singleton, so I'm not recreating the driver object at every request.

My writing query looks like this:

queries = []
for i, tweet in enumerate(tweets):
  tweet_author = tweet.author
  tweet_author_query = f"MERGE (a{i}:Author {{ 'id': {tweet_author.id}, 'username': {tweet_author.username} }})"
  parent_author = tweet.parent.author
  parent_author_query = f"MERGE (pa{i}:Author {{ 'id': {parent_author.id}, 'username': {parent_author.username} })"
  relationship_query = f"MERGE (a{i})-[:{tweet.kind}]->(pa{i})"

  queries.append(f"{parent_author_query} {tweet_author_query} {relationship_query}")

query = " ".join(queries)
self.db.write(query)

I've read that bigger batches are faster, but haven't been successful. For a batch of 100 tweets, the write transaction takes around 20s:

[2022-10-26 22:19:37,870: INFO/ForkPoolWorker-4] Task crawler.use_cases.tasks.tasks.query_tweets[c481a1bb-ca64-4c4c-acf0-24120b577b21] succeeded in 21.27412606299913s: None

For a batch of 1000 tweets, the operation took over 20 minutes, after which I got a defunct connection error:

[2022-10-26 22:21:23,193: INFO/MainProcess] Task crawler.use_cases.tasks.tasks.query_tweets[ec340381-aa35-4589-aa9c-65417fe55d54] received
[2022-10-26 22:52:05,848: ERROR/ForkPoolWorker-4] Failed to read from defunct connection IPv4Address(('xxxxxxxxx.databases.neo4j.io', 7687)) (IPv4Address(('xx.xxx.xxx.xx', 7687)))

I'm pretty lost as to what may be happening. I tried using a unique ID index, but that doesn't seem to have helped. Can anyone help me?

5 REPLIES 5

koji
Ninja
Ninja

Hi @caiou 

First, You can create a composite index like this.

CREATE INDEX author_id_username
FOR (n::Author)
ON (n.id, n.username)

It will be faster.

Next, You can move variables to the param.
https://neo4j.com/docs/cypher-manual/current/syntax/parameters/
https://neo4j.com/docs/cypher-manual/current/query-tuning/

Hi, @koji! Thank you for the tips

I've added the index and am now using params, but haven't had much success. My guess is that the specs of the free tier instance are just not fast enough to handle such big batches

HI @caiou

I found the solutions.
So you should change to Cypher, which writes one tweet at a time, and repeat for the number of tweets.
If the input tweets are in Neo4j, you can write a better code using CALL {} TRANSACTION.

In a typical application, the same Cypher is used repeatedly for a given number of registrations.
In this case, the parameter is valid.

Your code creates a Cypher for every registration and executes it only once.
In this case, it is not valid to give the variable Cypher as a parameter.
In addition, the higher the number of cases, the larger one transaction is.
Therefore, Cypher consumes more memory and slows down.

@koji, thank you a lot for your help!

I've optimized my code as suggested, and you were totally right! Here's what I've managed to come up with:

def create(self, tweets: List[Tweet]) -> None:
    author_queries = []
    parent_queries = []

    for tweet in tweets:
      author_dict = vars(tweet.author)

      if tweet.parent is not None:
        parent_author = tweet.parent.author
        parent_dict = { "parent": vars(parent_author) }

        parent_queries.append(author_dict | parent_dict)
      else:
        author_queries.append(author_dict)

    self.db.write(self._create_author_query(), authors=author_queries)
    self.db.write(self._create_author_with_parent_query(), authors=parent_queries)

  def _create_author_query(self) -> str:
    query = """
      WITH $authors as authors
      UNWIND authors as author
      MERGE (a:Author {id: author.id, name: author.name, username: author.username, created_at: author.created_at})
    """
    return query

  def _create_author_with_parent_query(self) -> str:
    query = """
      WITH $authors as authors
      UNWIND $authors as author
      MERGE (a:Author {id: author.id, name: author.name, username: author.username, created_at: author.created_at})
      MERGE (pa:Author {id: author.parent.id, name: author.parent.name, username: author.parent.username, created_at: author.parent.created_at})
      MERGE (a)-[:RETWEETED]->(pa)
    """
    return query

I now use only two queries for a batch of author objects, depending on whether or not I'm parsing retweets. The objects are now provided as lists of params for a transaction function, and the execution speed for my batches was reduced from ~30s to ~2s.

Thank you very much!

@caiou UNWIND is another great solution.
I'm glad my comment gave you a hint.