Make run the Query in multiple therads

I have few Queries that run for a longer time , how can I make run faster . Can I make the run in a distributed way. I see the Merge taking a lot of time .

@hemprasadk

a. neo4j version?

b. do you have indexes to support the merge?

For performance reasons, creating a schema index on the label or property is highly recommended when using MERGE. See Create, show, and delete indexes for more information.

c. Runtime concepts - Cypher Manual which states

Both the slotted and pipelined runtimes execute queries in a single thread assigned to one CPU core. 

but to use the parallel runtime is a Enterprise edition feature and thus a license is required

Please find the details
|Version:|5.22.0|
|Edition:|enterprise|

Invalid input 'PARALLEL': expected 'PERIODIC COMMIT' (line 1, column 7 (offset: 6))
"USING PARALLEL XXXXXX"

Can you post the query

"MATCH (pe:PolicyExpression)-[:CONSUMER_POLICY_EXPRESSION]-(n)-[:ASSOCIATED_WITH_ENVIRONMENT]-(env:Environment {enviromentName: $auditEnv})\n"
+ "WITH pe, env\n" + "UNWIND pe.values AS tagKey\n"
+ "MATCH (env)-[:TAGGING_ENV {state: "Approved" }]-(tag:Tag {tagKey: tagKey})\n"
+ "WHERE tag.tagType = "CONSUMER" \n" + "AND "column" IN tag.levels \n"
+ "WITH pe, tagKey, tag.values AS approvedValues \n" + "UNWIND approvedValues AS value\n"
+ "WITH collect({tagKey: tagKey, values: {tagKey: tagKey, tagValue: value}}) AS tagData, pe\n"
+ "UNWIND tagData AS entry \n" + "WITH entry.tagKey AS tagKey, entry.values AS value, pe\n"
+ "WITH tagKey, COLLECT(value) AS values1 ,pe\n" + "WITH collect(values1) AS finalList, pe\n"
+ "WITH com.ccb.datalake.cartesianProduct(finalList) AS tagExpressions,pe\n"
+ "UNWIND tagExpressions AS tagExpression \n"
+ "WITH pe, tagExpression, apoc.text.join([t IN tagExpression | t.tagKey + ":" + t.tagValue], ",") AS expressionHashString,\n"
+ "\t apoc.util.md5([apoc.text.join([t IN tagExpression | t.tagKey + ":" + t.tagValue], ",")]) AS expressionHash\n"
+ "MERGE (list:TagKeyValueList {expressionHash: expressionHash, expressionHashString: expressionHashString})\n"
+ "MERGE (pe)-[:HAS_TAG_EXPRESSION]->(list)\n" + "WITH pe, list, tagExpression\n"
+ "UNWIND tagExpression AS tag\n"
+ "MERGE (tagNode:TagKeyValue {tagKey: tag.tagKey, tagValue: tag.tagValue})\n"
+ "MERGE (list)-[:HAS_TAG]->(tagNode)

@hemprasadk

Not sure if you responded to my initial post and specifically about indexes so as to support the MERGE.

Any comments?

I have not created index .I am new will have to try it

if I create index on these Merge help?

when I used CYPHER runtime = parallel
org.springframework.dao.InvalidDataAccessResourceUsageException: The parallel runtime does not support updating queries. Please use another runtime; Error code 'Neo.ClientError.Statement.RuntimeUnsupportedError'

@hemprasadk

MERGE is effectively a create or update. if its going to update it needs to find the node. If you want to find the node to update having indexes is the way to go

And yes, my apologies, as parallel is not valid for cypher statements that involve writes to the database

what will be the best index for this MERGE (list:TagKeyValueList {expressionHash: expressionHash, expressionHashString: expressionHashString})\n"

  • "MERGE (pe)-[:HAS_TAG_EXPRESSION]->(list)\n" + "WITH pe, list, tagExpression\n"
  • "UNWIND tagExpression AS tag\n"
  • "MERGE (tagNode:TagKeyValue {tagKey: tag.tagKey, tagValue: tag.tagValue})\n"
  • "MERGE (list)-[:HAS_TAG]->(tagNode)

CREATE INDEX TagKeyValueList_node_index FOR (n:TagKeyValueList) ON (n.expressionHash, n.expressionHashString)
CREATE INDEX TagKeyValue_node_index FOR (n:TagKeyValue ) ON (n.tagKey, n.tagValue)
will these index be fine

I refactored the query. I think it is the same, but I don't have test data to verify. I mainly removed the redundant WITH clauses and compacted some things.

MATCH (env:Environment {enviromentName: $auditEnv})
MATCH (pe:PolicyExpression)-[:CONSUMER_POLICY_EXPRESSION]-()-[:ASSOCIATED_WITH_ENVIRONMENT]-(env)
UNWIND pe.values AS tagKey
MATCH (env)-[:TAGGING_ENV {state: "Approved" }]-(tag:Tag {tagKey: tagKey})
WHERE tag.tagType = "CONSUMER" AND "column" IN tag.levels
UNWIND  tag.values AS value
WITH tagKey, COLLECT({tagKey: tagKey, tagValue: value}) AS values1 ,pe
WITH collect(values1) AS finalList, pe
WITH com.ccb.datalake.cartesianProduct(finalList) AS tagExpressions, pe
UNWIND tagExpressions AS tagExpression
WITH pe, 
    tagExpression, 
    [t IN tagExpression | t.tagKey + ":" + t.tagValue] as tagKeyValuePairs
WITH pe,
    tagExpression,
    apoc.text.join(tagKeyValuePairs) AS expressionHashString,
    apoc.util.md5(tagKeyValuePairs) AS expressionHash
MERGE (list:TagKeyValueList {expressionHash: expressionHash, expressionHashString: expressionHashString})
MERGE (pe)-[:HAS_TAG_EXPRESSION]->(list)
FOREACH( tag in tagExpression |
    MERGE (tagNode:TagKeyValue {tagKey: tag.tagKey, tagValue: tag.tagValue})
    MERGE (list)-[:HAS_TAG]->(tagNode)
)

You should have an index on Environment(enviromentName), so the first match is fast. You could also use a composite index on TagKeyValue(tagKey, tagValue) to speed up the TagKeyValue MERGE at the end.

You should look at the query plain to see how it is matching nodes to see if there are additional indexes that could help.