Neo4j Spark Connector: write in through query

  • neo4j 4.4
  • spark(pyspark) 3.3.1

I have a Cypher that works well on browser, which is to select two nodes (ID 188 & ID 189) and build a new edge (property type='test') for them:

match (n:entity) where id(n)=188 with n
match (m:entity) where id(m)=189 with m, n
create (n)-[r:relation {type:'test'}]->(m)

Now I am trying to do the same through pyspark. First I read these two nodes by using option 'query':

from pyspark.sql import *
 
spark = SparkSession.builder.getOrCreate()
 
result = spark.read \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("query", "match (n:entity) where id(n)=188 with n match (m:entity) where id(m)=189 return m, n") \
    .load().toDF('m', 'n')

Get these two nodes as result:

+---------------------+---------------------+
|                    m|                    n|
+---------------------+---------------------+
|{189, [entity], G...|{188, [entity], G...|
+---------------------+---------------------+

Then I tried to use m & n to write a new edge through query as well

result.write\
    .format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", "bolt://localhost:7687") \
    .option("query", "create (n)-[r:relation {type:'test'}]->(m)")\
    .save()

Yet I realize this script generates two new nodes rather than using the existing m & n to build the new edge.

How can I pass the parameters m & n from the former reading script into the writing script?

Can't you just run the entire query in the 'write' query, and not use the read query at all?

match (n:entity) where id(n)=188
match (m:entity) where id(m)=189
create (n)-[r:relation {type:'test'}]->(m)

Hi Gary,

Thank you for reply. I bet I have to apply write mode on a dataframe or dataset, or any kind of existing object, which I suppose is generated from read mode.

from pyspark.sql import *

spark = SparkSession.builder.getOrCreate()

spark.write\
    .format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", "bolt://localhost:7687") \
    .option("query", "match (n:entity) where id(n)=188 match (m:entity) where id(m)=189 create (n)-[r:relation {type:'test'}]->(m)")\
    .save()

Above is my attempt to put the whole cypher in query option, directly using write mode, yet was told:

AttributeError: 'SparkSession' object has no attribute 'write'

which is why I guess it won't work to skip the read mode.

Full disclosure, I have not used the Spark connector, nor spark.

I think the error is resulting since you are invoking the 'write' method on the SparkSession, which does not have a 'write' method according to the error message. You want to invoke the 'write' method on your data frame from your first query, which is defined as 'result'. Try that first.

If that does not work, try the following. From what I read, I believe it is prepending 'unwind $events as event' to your query, where $events represent the data frame results. As such, you can access each row from your data from as 'event.' Again, I am may be wrong...worth a try.

result.write\
    .format("org.neo4j.spark.DataSource")\
    .mode("Overwrite")\
    .option("url", "bolt://localhost:7687") \
    .option("query", "with event.n as n, event.m as m  create (n)-[r:relation {type:'test'}]->(m)")\
    .save()

Try this:

from pyspark.sql import *
 
spark = SparkSession.builder.getOrCreate()
 
result = spark.read \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "bolt://localhost:7687") \
    .option("query", """
       match (n:entity)
       where id(n)=188 with n match (m:entity) where id(m)=189 return id(m) AS m, id(n) AS n
    """) \
    .load() \
    .toDF('m', 'n')

# and then 
result.write \
    .format("org.neo4j.spark.DataSource") \
    .mode("Overwrite") \
    .option("url", "bolt://localhost:7687") \
    .option("query", "create (n)-[r:relation {type:'test'}]->(m) WHERE id(n) = event.n AND id(m) = event.m")\
    .save()

Btw you should not rely on internal ids but always use constraints.

Cheers
Andrea