I have a Cypher that works well on browser, which is to select two nodes (ID 188 & ID 189) and build a new edge (property type='test') for them:
match (n:entity) where id(n)=188 with n
match (m:entity) where id(m)=189 with m, n
create (n)-[r:relation {type:'test'}]->(m)
Now I am trying to do the same through pyspark. First I read these two nodes by using option 'query':
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()
result = spark.read \
.format("org.neo4j.spark.DataSource") \
.option("url", "bolt://localhost:7687") \
.option("query", "match (n:entity) where id(n)=188 with n match (m:entity) where id(m)=189 return m, n") \
.load().toDF('m', 'n')
Thank you for reply. I bet I have to apply write mode on a dataframe or dataset, or any kind of existing object, which I suppose is generated from read mode.
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()
spark.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "bolt://localhost:7687") \
.option("query", "match (n:entity) where id(n)=188 match (m:entity) where id(m)=189 create (n)-[r:relation {type:'test'}]->(m)")\
.save()
Above is my attempt to put the whole cypher in query option, directly using write mode, yet was told:
AttributeError: 'SparkSession' object has no attribute 'write'
which is why I guess it won't work to skip the read mode.
Full disclosure, I have not used the Spark connector, nor spark.
I think the error is resulting since you are invoking the 'write' method on the SparkSession, which does not have a 'write' method according to the error message. You want to invoke the 'write' method on your data frame from your first query, which is defined as 'result'. Try that first.
If that does not work, try the following. From what I read, I believe it is prepending 'unwind $events as event' to your query, where $events represent the data frame results. As such, you can access each row from your data from as 'event.' Again, I am may be wrong...worth a try.
result.write\
.format("org.neo4j.spark.DataSource")\
.mode("Overwrite")\
.option("url", "bolt://localhost:7687") \
.option("query", "with event.n as n, event.m as m create (n)-[r:relation {type:'test'}]->(m)")\
.save()
from pyspark.sql import *
spark = SparkSession.builder.getOrCreate()
result = spark.read \
.format("org.neo4j.spark.DataSource") \
.option("url", "bolt://localhost:7687") \
.option("query", """
match (n:entity)
where id(n)=188 with n match (m:entity) where id(m)=189 return id(m) AS m, id(n) AS n
""") \
.load() \
.toDF('m', 'n')
# and then
result.write \
.format("org.neo4j.spark.DataSource") \
.mode("Overwrite") \
.option("url", "bolt://localhost:7687") \
.option("query", "create (n)-[r:relation {type:'test'}]->(m) WHERE id(n) = event.n AND id(m) = event.m")\
.save()
Btw you should not rely on internal ids but always use constraints.