Neo4j-connector-spark issue:class not found

I have a spark 3.3.2 cluster of 3 nodes, with hive on master node [192.168.0.0].
Now I want to write a table into neo4j 5.5 (on another cluster [172.0.0.1]):

spark = SparkSession.builder.appName(appname) \
    .master("spark://192.168.0.0:7077") \
    .config("spark.sql.warehouse.dir", hdfs_add) \
    .config("spark.driver.host", spark_local_ip) \
    .config("spark.driver.port", spark_port) \
    .enableHiveSupport().getOrCreate()

df = spark.table("abc")

df.write.format("org.neo4j.spark.DataSource").mode("Overwrite").option("url", "neo4j://172.0.0.1:7687") \
    .option("authentication.basic.username", "neo4j")\
    .option("authentication.basic.password", "123456") \
    .option("query",
            """
            xxx
            """ 
            ) \
    .save()

Then received following error:

23/06/19 18:15:30 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) (192.168.30.111 executor 1): java.lang.ClassNotFoundException: org.neo4j.spark.writer.Neo4jDataWriterFactory
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1933)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Seems like class "org.neo4j.spark.DataSource" was found but not "org.neo4j.spark.writer.Neo4jDataWriterFactory", while both classes were confirmed in my neo4j-connector-spark jar.

BTW, when I use "local[*]" as my spark master, the script succeeded. However I have to use the cluster as my master since it is supposed to be much faster.