cancel
Showing results for 
Search instead for 
Did you mean: 

Bulk statement to improve update throughput

Hi, I am working on an Open-Source project where I read values from OPC UA and I now wanted write the live values from industrial machines via OPC UA to Neo4j. Typically in databases there is some kind of a bulk statement or at least a prepared statement which can be used often without the need to re-parse the statement every time.

I didn't find something like this in the Neo4j Java-API. When I do a transaction with about 100 statements it leads to a max throughput of about 200 updates per second. Which is really poor.

Here is my Cypher-Statement:
MERGE (n:OpcUaNode {
system : $system,
address : $address
})
SET n += {
status : $status,
doubleValue : $doubleValue,
serverTime : $serverTime,
sourceTime : $sourceTime
}

Any hints to speed up such updates would be great!
best regards,
Andreas

1 ACCEPTED SOLUTION

I think I found it. UNWIND and passing an array is the solution.

Kotlin Example

val query = """
            UNWIND ${"$"}rows AS row
            MERGE (n:OpcUaNode {
              system : row.system,
              address : row.address
            }) 
            SET n += {
              status : row.status,
              doubleValue : row.doubleValue,                           
              serverTime : row.serverTime,
              sourceTime : row.sourceTime
            }  
            """.trimIndent()

....

  val rows = mutableListOf<Map<String, Any?>>()
  // fill rows....
  session?.writeTransaction { tx ->
    tx.run(query, parameters("rows", rows))
    valueCounterOutput += counter
  }

View solution in original post

2 REPLIES 2

I think I found it. UNWIND and passing an array is the solution.

Kotlin Example

val query = """
            UNWIND ${"$"}rows AS row
            MERGE (n:OpcUaNode {
              system : row.system,
              address : row.address
            }) 
            SET n += {
              status : row.status,
              doubleValue : row.doubleValue,                           
              serverTime : row.serverTime,
              sourceTime : row.sourceTime
            }  
            """.trimIndent()

....

  val rows = mutableListOf<Map<String, Any?>>()
  // fill rows....
  session?.writeTransaction { tx ->
    tx.run(query, parameters("rows", rows))
    valueCounterOutput += counter
  }

@andreas.vogler

also be sure to have an index on :OpcUaNode and properties system address. Since a MERGE is a CREATE or UPDATE, for it to UPDATE it needs to find the node firstly and an index will help here