cancel
Showing results for 
Search instead for 
Did you mean: 

Handling embedded Neo4j transactions in multi-threaded Java

Hi,
I am dealing with a graph of approx 22m nodes and 250m relations. For each relation, I need to compute a numeric property. Now, I guess it is reasonable to do this multithreaded. My problem is that although the serial code is running smoothly (but would take too long), the multithreaded code runs in a big deal of issues.
Some of the errors:

java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 2147483647 out of bounds for length 4
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.computeAndSetExclusivityAndTransferProbParallel(TestExclusivityCalculatorNeo4j.java:131)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.main(TestExclusivityCalculatorNeo4j.java:185)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 2147483647 out of bounds for length 4
at org.neo4j.internal.recordstorage.RecordPropertyCursor.currentBlock(RecordPropertyCursor.java:154)
at org.neo4j.internal.recordstorage.RecordPropertyCursor.propertyKey(RecordPropertyCursor.java:170)
at org.neo4j.kernel.impl.newapi.DefaultPropertyCursor.propertyKey(DefaultPropertyCursor.java:186)
at org.neo4j.kernel.impl.core.RelationshipEntity.hasProperty(RelationshipEntity.java:407)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:82)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:58)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

java.util.concurrent.ExecutionException: org.neo4j.graphdb.NotFoundException: Node 1 not found
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.computeAndSetExclusivityAndTransferProbParallel(TestExclusivityCalculatorNeo4j.java:131)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.main(TestExclusivityCalculatorNeo4j.java:185)
Caused by: org.neo4j.graphdb.NotFoundException: Node 1 not found
at org.neo4j.kernel.impl.core.NodeEntity.getRelationshipSelectionIterator(NodeEntity.java:760)
at org.neo4j.kernel.impl.core.NodeEntity.lambda$innerGetRelationships$0(NodeEntity.java:148)
at org.neo4j.graphdb.ResourceIterable.iterator(ResourceIterable.java:78)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:78)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:58)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

java.util.concurrent.ExecutionException: org.neo4j.graphdb.NotFoundException: org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException: Unable to load RELATIONSHIP with id 227441015.
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.computeAndSetExclusivityAndTransferProbParallel(TestExclusivityCalculatorNeo4j.java:131)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j.main(TestExclusivityCalculatorNeo4j.java:185)
Caused by: org.neo4j.graphdb.NotFoundException: org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException: Unable to load RELATIONSHIP with id 227441015.
at org.neo4j.kernel.impl.core.RelationshipEntity.singleRelationship(RelationshipEntity.java:545)
at org.neo4j.kernel.impl.core.RelationshipEntity.hasProperty(RelationshipEntity.java:403)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:82)
at main.java.dbpedia201510.TestExclusivityCalculatorNeo4j$CalcRunnable.call(TestExclusivityCalculatorNeo4j.java:58)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.neo4j.internal.kernel.api.exceptions.EntityNotFoundException: Unable to load RELATIONSHIP with id 227441015.
... 8 more

Sometimes, I don't get these errors but directly a JVM crash.
I suspect I am not using Transactions correctly, and I cannot find any Neo4j 4.0 examples of how to use transactions in a multi-threaded environment.

The code:

void computeAndSetExclusivityAndTransferProbParallel(GraphDatabaseService db){
      
      ExecutorService exec =  Executors.newFixedThreadPool(20);
      try(Transaction tx = db.beginTx() ){
       int count = 0; 
      List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
 
       for (Node n: tx.getAllNodes()){   
         CalcRunnable r = new CalcRunnable(db, n);
         Future<Integer> f = exec.submit(r);
         futures.add(f);
       }
      
       for (Future<Integer> f: futures) {
             Integer result = 0;
             try {
                   result = f.get();        
             }
             catch (InterruptedException e) {
                e.printStackTrace();
             } catch (ExecutionException e) {
                e.printStackTrace();
             }
             count = result+count;
             System.out.println("relations = " + count);
      }
      tx.commit();
      }catch(Exception ex) {
         ex.printStackTrace();
         exec.shutdown();
      }
      exec.shutdown();

   }

and the worker:

class CalcRunnable implements Callable<Integer> {
      
      GraphDatabaseService db;
      Node n;
      
      
      protected CalcRunnable(GraphDatabaseService db, Node n) {
         this.db = db;
         this.n = n;
      }
   
   @Override
   public Integer call() {
      System.out.println("Solving node " + n.getId());
      int countall = 0;
      try (Transaction t = db.beginTx()) { 
         for (Relationship r: n.getRelationships()){
             String type = r.getType().name();  
             System.out.println(type);
            
            if (!r.hasProperty("Exclusivity")) {     
               System.out.println("no exclusivity found"); 
            }
            else {
               double excl = (double)r.getProperty("Exclusivity");
               System.out.println(excl);
            }
            countall++;            
         }
         
         t.commit();
      }      
         return countall;
      }
      
   }

Not sure this is relevant, but I run it with 300G pagecache and -Xmx500G.

Thanks a lot for any hint.

0 REPLIES 0