Is it possible to Retry processing a message a after race condition?

Hi All,

TL;DR: I want to know if messages can be automatically reprocessed when errors/exceptions have been thrown during the topic handler's query

I'm running Neo4j 3.5.14, with a Sink and Source enabled Kafka Integration (plugin version 3.5.6)

Context

We are consuming messages (from within Neo4j) from two different topics, which affect the same nodes (e.g. one topic is conversation-created and the other is part-tagged).

Since we want to account for async processing, as we not always receive the messages in the natural order, both topic's handlers use MERGE on a NodeKey that represents (external_id,app_id), so they are unique across the whole graph.

Thing is, it seems sometimes the part-tagged handler finishes executing while the conversation-created's is still processing, so the result of both merge becomes:

  • part-tagged --> CREATE
  • conversation-created --> the transaction thought it should be CREATE, but then it fails as the unique node was already created meanwhile

An example of the neo4j.log

class.name: streams.kafka.KafkaAutoCommitEventConsumer, exception.class.name: 
org.neo4j.graphdb.QueryExecutionException, exception.message: Node(11049075) already exists with label `Part` and 
properties `external_id` = '27062231877-553480989', `app_id` = 'zqzv6i6r', exception.stacktrace: 
org.neo4j.graphdb.QueryExecutionException: Node(11049075) already exists with label `Part` and properties 
`external_id` = '27062231877-553480989', `app_id` = 'zqzv6i6r' at 
org.neo4j.kernel.impl.query.QueryExecutionKernelException.asUserException(QueryExecutionKernelException.java:35) at 
org.neo4j.graphdb.facade.spi.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:88) at 
org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:421) at 
org.neo4j.kernel.impl.factory.GraphDatabaseFacade.execute(GraphDatabaseFacade.java:404) at 
streams.StreamsEventSinkQueryExecution.write(StreamsEventSinkQueryExecution.kt:27) at 
streams.service.StreamsSinkService.writeWithCypherTemplate(StreamsSinkService.kt:37) at 
streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:43) at 
streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:114) at 
streams.kafka.KafkaEventSink$createJob$1$timeMillis$1.invoke(KafkaEventSink.kt:30) at 
streams.kafka.KafkaAutoCommitEventConsumer.executeAction(KafkaAutoCommitEventConsumer.kt:95) at 
streams.kafka.KafkaAutoCommitEventConsumer.readSimple(KafkaAutoCommitEventConsumer.kt:87) at 
streams.kafka.KafkaAutoCommitEventConsumer.read(KafkaAutoCommitEventConsumer.kt:134) at 
streams.kafka.KafkaEventSink$createJob$1.invokeSuspend(KafkaEventSink.kt:110) at 
kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32) at 
kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235) at kotlinx.coroutines.AbstractContinuation.run(
AbstractContinuation.kt:19) at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94) at 
kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586) at 
kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60) at 
kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732) Caused by: 
org.neo4j.kernel.impl.query.QueryExecutionKernelException: Node(11049075) already exists with label `Part` and 
properties `external_id` = '27062231877-553480989', `app_id` = 'zqzv6i6r' at 
org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:89) at 
org.neo4j.graphdb.facade.spi.ClassicCoreSPI.executeQuery(ClassicCoreSPI.java:84) ... 18 more Caused by: 
org.neo4j.cypher.CypherExecutionException: Node(11049075) already exists with label `Part` and properties 
`external_id` = '27062231877-553480989', `app_id` = 'zqzv6i6r' at 
org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslationSupport$class.translateException(
ExceptionTranslationSupport.scala:35) at 
org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslatingQueryContext.translateException(
ExceptionTranslatingQueryContext.scala:42) at org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslatingQueryCon
text$ExceptionTranslatingOperations.setProperty(ExceptionTranslatingQueryContext.scala:307) at 
org.neo4j.cypher.internal.runtime.interpreted.UpdateCountingQueryContext$CountingOps.setProperty(
UpdateCountingQueryContext.scala:193) at org.neo4j.cypher.internal.runtime.interpreted.pipes.BaseCreatePipe.setProperty
(CreatePipe.scala:70) at org.neo4j.cypher.internal.runtime.interpreted.pipes.BaseCreatePipe$$anon$1.accept(
CreatePipe.scala:50) at org.neo4j.cypher.internal.runtime.interpreted.pipes.BaseCreatePipe$$anon$1.accept(
CreatePipe.scala:49) at org.neo4j.values.virtual.MapValue$MapWrappingMapValue.foreach(MapValue.java:99) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.BaseCreatePipe.setProperties(CreatePipe.scala:49) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.EntityCreateSlottedPipe$$anonfun$createNode$1.apply(
CreateSlottedPipe.scala:44) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.EntityCreateSlottedPipe$$anonfun$createNode$1.apply(
CreateSlottedPipe.scala:44) at scala.Option.foreach(Option.scala:257) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.EntityCreateSlottedPipe.createNode(CreateSlottedPipe.scala:44) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.MergeCreateNodeSlottedPipe$$anonfun$internalCreateResults$2.apply(
CreateSlottedPipe.scala:144) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.MergeCreateNodeSlottedPipe$$anonfun$internalCreateResults$2.apply(
CreateSlottedPipe.scala:143) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at 
scala.collection.Iterator$$anon$12.next(Iterator.scala:445) at scala.collection.Iterator$$anon$12.next(
Iterator.scala:445) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at 
scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(
Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at 
scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:732) at 
scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:708) at scala.collection.TraversableOnce$class.to(
TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at 
scala.collection.TraversableOnce$class.toIndexedSeq(TraversableOnce.scala:300) at 
scala.collection.AbstractIterator.toIndexedSeq(Iterator.scala:1334) at 
org.neo4j.cypher.internal.runtime.slotted.pipes.EagerSlottedPipe.internalCreateResults(EagerSlottedPipe.scala:37) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:76) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at 
org.neo4j.cypher.internal.runtime.interpreted.pipes.PipeWithSource.createResults(Pipe.scala:72) at org.neo4j.cypher.int
ernal.compatibility.v3_5.runtime.executionplan.BaseExecutionResultBuilderFactory$BaseExecutionWorkflowBuilder.build(
DefaultExecutionResultBuilderFactory.scala:61) at 
org.neo4j.cypher.internal.compatibility.InterpretedRuntime$InterpretedExecutionPlan.run(InterpretedRuntime.scala:84) 
at org.neo4j.cypher.internal.compatibility.CypherCurrentCompiler$CypherExecutableQuery$$anonfun$execute$3.apply(
CypherCurrentCompiler.scala:204) at 
org.neo4j.cypher.internal.compatibility.CypherCurrentCompiler$CypherExecutableQuery$$anonfun$execute$3.apply(
CypherCurrentCompiler.scala:190) at org.neo4j.cypher.exceptionHandler$runSafely$.apply(exceptionHandler.scala:89) at 
org.neo4j.cypher.internal.compatibility.CypherCurrentCompiler$CypherExecutableQuery.execute(
CypherCurrentCompiler.scala:223) at org.neo4j.cypher.internal.ExecutionEngine.execute(ExecutionEngine.scala:101) at 
org.neo4j.cypher.internal.javacompat.ExecutionEngine.executeQuery(ExecutionEngine.java:85) ... 19 more Caused by: 
org.neo4j.kernel.api.exceptions.schema.UniquePropertyValueValidationException: New data does not satisfy CONSTRAINT ON 
( label[7]:label[7] ) ASSERT (label[7].property[28], label[7].property[44]) IS NODE KEY: Both node 11049075 and node 
-1 share the property value ( String("27062231877-553480989"), String("zqzv6i6r") ) at 
org.neo4j.kernel.impl.newapi.Operations.validateNoExistingNodeWithExactValues(Operations.java:510) at 
org.neo4j.kernel.impl.newapi.Operations.lambda$nodeSetProperty$1(Operations.java:575) at 
org.neo4j.kernel.impl.newapi.NodeSchemaMatcher.onMatchingSchema(NodeSchemaMatcher.java:73) at 
org.neo4j.kernel.impl.newapi.Operations.nodeSetProperty(Operations.java:572) at 
org.neo4j.cypher.internal.runtime.interpreted.TransactionBoundQueryContext$NodeOperations.setProperty(
TransactionBoundQueryContext.scala:605) at org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslatingQueryContex
t$ExceptionTranslatingOperations$$anonfun$setProperty$1.apply$mcV$sp(ExceptionTranslatingQueryContext.scala:307) at org
.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslatingQueryContext$ExceptionTranslatingOperations$$anonfun$setP
roperty$1.apply(ExceptionTranslatingQueryContext.scala:307) at org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTr
anslatingQueryContext$ExceptionTranslatingOperations$$anonfun$setProperty$1.apply(
ExceptionTranslatingQueryContext.scala:307) at 
org.neo4j.cypher.internal.compatibility.v3_5.ExceptionTranslationSupport$class.translateException(
ExceptionTranslationSupport.scala:33) ... 70 more Caused by: IndexEntryConflictException{propertyValues=( 
String("27062231877-553480989"), String("zqzv6i6r") ), addedNodeId=-1, existingNodeId=11049075} ... 79 more 

If I look for that (external_id,app_id) node, it exists with only the result of the part-tagged handler.

What I've tried
I've tried a few dirty tricks, with no results:

  • Adding an apoc.utils.sleep call to wait for a few seconds in the fast topic
  • Reducing the value of the max.poll.records to 1 in the neo4j.conf file, which slowed down the processing (a lot), but still ran into these errors

The question
I have implemented a DLQ topic that's catching all these errors, BUT, before implementing an external/internal consumer that re-processes/re-send these messages in the DLQ, is there a way that if a query for a message fails, it can be retried? Is this good practice?

BONUS Question
Does this leave the DB in an inconsistent state?

Thank you SO MUCH in advance :nerd_face: