Heterogeneous replication from PostgreSQL to Neo4j

Is there any way i can do replication from PostgesSQL to Neo4j just like other heterogeneous replication like Oracle ->SQL server or ORACLE -> PostgreSQL .It doesn't needs to be real time .It is ok to have some latency .

Thank you in advance .
sojan

A popular solution is to use Kafka as a message bus. Publish a message to Kafka and you can configure Neo4j as a subscriber (sink) to that topic to save the data in the graph. Neo4j can also publish to Kafka if you need to get data back out.

Replicating data to and from Neo4j is pretty much like any other database replication. There usually is some sort of message bus application that receives the change data capture logs and then executes the "insert" or "cypher" command against the target.

Thanks Mike for the reply .Do you think you can share any documents or use cases for me to understand in depth in case if you have .

Thanks
sojan

Here a good link to help you get started

There's lots of resources about Kafka in general on the web. The link I shared is about the Neo4j integration.

1 Like

Thank you so much for pointing it out to the right direction @mike.r.black

Thanks
sojan

Hi @mike.r.black and other experts ,
I have been trying to setup replication from source ->kafka->neo4j and i am able to complete the first part .Now i am trying to bringup the sink connector for neo4j it is giving me an error .Is that because of any version mismatches?
kafka version=kafka_2.12-2.5.0
neo4j-sink-file=neo4j-kafka-connect-neo4j-1.0.7 .
I Can also see the kafak-connect recognize the neo4j plugins from log
[2020-05-06 13:55:22,819] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/doc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,821] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/doc/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,822] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,823] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,824] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/assets (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:22,825] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/assets/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)
[2020-05-06 13:55:22,825] INFO Loading plugin from: /home/kafka/neo4j-kafka-connect-neo4j-1.0.7/lib (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:239)
[2020-05-06 13:55:23,473] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/lib/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:262)

Now when i run the curl command it is failing
curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @/home/kafka/neo4j-kafka-connect-neo4j-1.0.7/etc/sink-neo4j.json
{
"servlet":"org.glassfish.jersey.servlet.ServletContainer-782be4eb",
"message":"Request failed.",
"url":"/connectors",
"status":"500"
}

here is the contents of the sink-neo4j.json
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "person",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.deadletterqueue.topic.name": "test-error-topic",
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "aaaa",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.person": "MERGE (p:Person{name: event.name, age: event.age})"
}
}

any idea what is the cause of the error ? Appreciate your time in advance .

Thanks
Sojan

Here is the detailed error coming from connect.log
[2020-05-08 10:55:23,887] ERROR Uncaught exception in REST call to /connectors (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
java.lang.NullPointerException
at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:145)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:176)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:760)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:547)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:500)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:834)