cancel
Showing results for 
Search instead for 
Did you mean: 

Using Spark Structured Streaming API

Hey guys

We are trying to use the spark structured streaming API, but always get the following error:

java.lang.UnsupportedOperationException: Data source org.neo4j.spark.DataSource does not support streamed writing at org.apache.spark.sql.errors.QueryExecutionErrors$.streamedOperatorUnsupportedByDataSourceError(QueryExecutionErrors.scala:430)

Which seems strange as structure streaming should be supported by the connector version 4.1. For testing purposes, we used the following setup in an interactive spark scala shell.

spark-shell --packages org.mongodb.spark:mongo-spark-connector:10.0.2,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3

Which should be the correct version of the spark connector right? Then we create a df by readStream from a MongoDB instance.

val dfTxn = spark.readStream.format("mongodb").option("spark.mongodb.connection.uri", "mongodb://<ip>:<port>").option("spark.mongodb.database", "test").option("spark.mongodb.collection", "txn").option("park.mongodb.read.readPreference.name", "primaryPreferred").option("spark.mongodb.change.stream.publish.full.document.only", "true").option("forceDeleteTempCheckpointLocation", "true").load()

Afterwards, we are trying to write it to Neo4j, which is the step where the error described above happens.

val query = dfPaymentTx.writeStream.format("org.neo4j.spark.DataSource").option("url", "bolt://<ip>:<port>").option("save.mode", "Append").option("checkpointLocation", "/tmp/checkpoint/myCheckPoint").option("labels", "Account").option("node.keys", "txn_snd").start()

We really do not understand why we get the Data Source does not support streamed writing error. We are running on an Ubuntu machine and installed Neo4j from here. Is there a problem in using Scala or anything else?

Any help is appreciated

 

1 REPLY 1

santand84
Node

Can you please post the result of the following code?

val list = new Neo4jTable(StructType(Array(StructField("foo", DataTypes.StringType))),
Map("url" -> "bolt://neo4j:7687", "labels" -> "foo").asJava, "").capabilities()
println(list)