Hello!
I've been trying to write a Twitter crawler that saves author data to Neo4j. I'm currently running a free tier AuraDB instance, and have been using the following python code (simplified) for my queries:
class Client:
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
with self.driver.session() as session:
session.run("CREATE CONSTRAINT author_id IF NOT EXISTS ON (a:Author) ASSERT a.id IS UNIQUE")
def write(self, query) -> List[Any]:
with self.driver.session() as session:
result = session.write_transaction(self.run_query, query)
self.driver.close()
return result
def run_query(self, tx, query):
result = tx.run(query)
return result.graph()
This client instance is a singleton, so I'm not recreating the driver object at every request.
My writing query looks like this:
queries = []
for i, tweet in enumerate(tweets):
tweet_author = tweet.author
tweet_author_query = f"MERGE (a{i}:Author {{ 'id': {tweet_author.id}, 'username': {tweet_author.username} }})"
parent_author = tweet.parent.author
parent_author_query = f"MERGE (pa{i}:Author {{ 'id': {parent_author.id}, 'username': {parent_author.username} })"
relationship_query = f"MERGE (a{i})-[:{tweet.kind}]->(pa{i})"
queries.append(f"{parent_author_query} {tweet_author_query} {relationship_query}")
query = " ".join(queries)
self.db.write(query)
I've read that bigger batches are faster, but haven't been successful. For a batch of 100 tweets, the write transaction takes around 20s:
[2022-10-26 22:19:37,870: INFO/ForkPoolWorker-4] Task crawler.use_cases.tasks.tasks.query_tweets[c481a1bb-ca64-4c4c-acf0-24120b577b21] succeeded in 21.27412606299913s: None
For a batch of 1000 tweets, the operation took over 20 minutes, after which I got a defunct connection error:
[2022-10-26 22:21:23,193: INFO/MainProcess] Task crawler.use_cases.tasks.tasks.query_tweets[ec340381-aa35-4589-aa9c-65417fe55d54] received
[2022-10-26 22:52:05,848: ERROR/ForkPoolWorker-4] Failed to read from defunct connection IPv4Address(('xxxxxxxxx.databases.neo4j.io', 7687)) (IPv4Address(('xx.xxx.xxx.xx', 7687)))
I'm pretty lost as to what may be happening. I tried using a unique ID index, but that doesn't seem to have helped. Can anyone help me?