cancel
Showing results for 
Search instead for 
Did you mean: 

Bulk statement to improve update throughput

Hi, I am working on an Open-Source project where I read values from OPC UA and I now wanted write the live values from industrial machines via OPC UA to Neo4j. Typically in databases there is some kind of a bulk statement or at least a prepared statement which can be used often without the need to re-parse the statement every time.

I didn't find something like this in the Neo4j Java-API. When I do a transaction with about 100 statements it leads to a max throughput of about 200 updates per second. Which is really poor.

Here is my Cypher-Statement:
MERGE (n:OpcUaNode {
system : $system,
address : $address
})
SET n += {
status : $status,
doubleValue : $doubleValue,
serverTime : $serverTime,
sourceTime : $sourceTime
}

Any hints to speed up such updates would be great!
best regards,
Andreas

1 ACCEPTED SOLUTION

I think I found it. UNWIND and passing an array is the solution.

Kotlin Example

val query = """
            UNWIND ${"$"}rows AS row
            MERGE (n:OpcUaNode {
              system : row.system,
              address : row.address
            }) 
            SET n += {
              status : row.status,
              doubleValue : row.doubleValue,                           
              serverTime : row.serverTime,
              sourceTime : row.sourceTime
            }  
            """.trimIndent()

....

  val rows = mutableListOf<Map<String, Any?>>()
  // fill rows....
  session?.writeTransaction { tx ->
    tx.run(query, parameters("rows", rows))
    valueCounterOutput += counter
  }

View solution in original post

2 REPLIES 2

I think I found it. UNWIND and passing an array is the solution.

Kotlin Example

val query = """
            UNWIND ${"$"}rows AS row
            MERGE (n:OpcUaNode {
              system : row.system,
              address : row.address
            }) 
            SET n += {
              status : row.status,
              doubleValue : row.doubleValue,                           
              serverTime : row.serverTime,
              sourceTime : row.sourceTime
            }  
            """.trimIndent()

....

  val rows = mutableListOf<Map<String, Any?>>()
  // fill rows....
  session?.writeTransaction { tx ->
    tx.run(query, parameters("rows", rows))
    valueCounterOutput += counter
  }

@andreas.vogler

also be sure to have an index on :OpcUaNode and properties system address. Since a MERGE is a CREATE or UPDATE, for it to UPDATE it needs to find the node firstly and an index will help here

Nodes 2022
Nodes
NODES 2022, Neo4j Online Education Summit

On November 16 and 17 for 24 hours across all timezones, you’ll learn about best practices for beginners and experts alike.