Apoc Periodic Iterate gets Stuck on Last Batch of Parallel Json Export

The below sometimes finishes and sometimes doesn't. When using a batch size of 5000 it finished in 93 hours which is outrageous. The below has a few optimization but still gets stuck on the last batch.

"CALL apoc.export.json.query('UNWIND $_batch as row with row.comm as comm MATCH (m:User) USING INDEX m:User(mup_id) where m.mup_id = comm
OPTIONAL MATCH (m)-[rs:OBSERVED_WITH]->(s:Segment)
OPTIONAL MATCH (m)-[rg:OBSERVED_WITH]->(e:Email)
OPTIONAL MATCH (m)-[rh:OBSERVED_WITH]->(h:Hash)
OPTIONAL MATCH (m)-[ra:OBSERVED_WITH]->(a:Adobe)
OPTIONAL MATCH (m)-[rl:OBSERVED_WITH]->(l:Liveramp)
WITH comm AS mup_id,
  COLLECT(distinct {muid: m.uid, first_obs: m.first_obs, last_obs: m.last_obs}) AS uids,
    COLLECT(distinct
      CASE
          WHEN s IS NULL THEN NULL
            ELSE {id: s.segment_id, first_obs: rs.first_obs} END
  ) AS segs,
    COLLECT(distinct
      CASE
          WHEN e IS NULL THEN NULL
            ELSE {id: e.email, first_obs: rg.first_obs} END
  ) AS eml,
   COLLECT(distinct
      CASE
          WHEN h IS NULL THEN NULL
            ELSE {id: h.hash_id, first_obs: rh.first_obs} END
  ) AS hashs,
    COLLECT(distinct
      CASE
          WHEN a IS NULL THEN NULL
            ELSE {id: a.adobe_id, first_obs: ra.first_obs} END
  ) AS adb,
    COLLECT(distinct
      CASE
          WHEN l IS NULL THEN NULL
            ELSE {id: l.liveramp_id, first_obs: rl.first_obs} END
  ) AS lrs RETURN {mup_id:mup_id,uid:uids, segment:segs, email:eml,adobe:adb,liveramp:lrs,hash_id:hashs} as map','/mnt/lv1/export_data/init_test/test-' + $_count+'.json',{useTypes:true, storeNodeIds:false,params:{_batch:$_batch}}) YIELD nodes return sum(nodes)",{batchSize:50000,iterateList:true,parallel:true,concurrency:20,retries:2});

When using 50k batchsize, it gets to 734 files before it gets stuck, that means it reaches 36,700,000 lines approximately. This was the runtime when using 5k before:

"Starting Init Export at 18:41:20.586Z"
batches, total, timeTaken, committedOperations, failedOperations, failedBatches, retries, errorMessages, batch, operations, wasTerminated, failedParams
7334, 36665559, 334991, 36665559, 0, 0, 0, {}, {total: 7334, committed: 7334, failed: 0, errors: {}}, {total: 36665559, committed: 36665559, failed: 0, errors: {}}, FALSE, {}

As you can see it had 7334 batches at 5k a piece which is 36,670,000 and it completed in 334k seconds which is ~ 93 hours. My guess here is it gets stuck on the last batch because prior to the final batch it was making files every few seconds. I think it may be a bug.

  • neo4j version 3.5.5 Enterprise
  • neo4j-graph-algorithms-3.5.6.0-standalone.jar
  • Limited to 10 without using the csv export component:
+-------------------------------------------------------------------------------------+| Plan      | Statement   | Version      | Planner | Runtime   | Time | DbHits | Rows |+-------------------------------------------------------------------------------------+| "PROFILE" | "READ_ONLY" | "CYPHER 3.5" | "COST"  | "SLOTTED" | 4    | 236    | 5    |+-------------------------------------------------------------------------------------+
+------------------------+----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| Operator               | Estimated Rows | Rows | DB Hits | Cache H/M | Identifiers                                   | Other                                                                                                     |
+------------------------+----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +ProduceResults        |              5 |    5 |       0 |       0/0 | map, lrs, adb, hashs, uids, mup_id, eml, segs |                                                                                                           |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +Projection            |              5 |    5 |       0 |       0/0 | map, lrs, adb, hashs, uids, mup_id, eml, segs | {map : {mup_id: mup_id, uid: uids, segment: segs, email: eml, adobe: adb, liveramp: lrs, hash_id: hashs}} |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +EagerAggregation      |              5 |    5 |      60 |     380/0 | lrs, adb, hashs, uids, mup_id, eml, segs      | mup_id                                                                                                    |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +Apply                 |             26 |   10 |       0 |     380/0 | e, comm, s, rs, rh, a, m, ra, l, rl, h, rg    |                                                                                                           |
| |\                     +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +OptionalExpand(All) |             26 |   10 |      30 |      78/0 | e, comm, s, rs, rh, a, m, ra, l, rl, h, rg    | (m)-[rl:OBSERVED_WITH]->(l); l:Liveramp                                                                   |
| | |                    +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +OptionalExpand(All) |             26 |   10 |      30 |      78/0 | e, comm, s, rs, rh, a, m, ra, h, rg           | (m)-[ra:OBSERVED_WITH]->(a); a:Adobe                                                                      |
| | |                    +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +OptionalExpand(All) |             26 |   10 |      30 |      78/0 | e, comm, s, rs, rh, m, h, rg                  | (m)-[rh:OBSERVED_WITH]->(h); h:Hash                                                                       |
| | |                    +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +OptionalExpand(All) |             26 |   10 |      30 |      78/0 | e, comm, s, rs, m, rg                         | (m)-[rg:OBSERVED_WITH]->(e); e:Email                                                                      |
| | |                    +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +OptionalExpand(All) |             26 |   10 |      30 |      78/0 | comm, m, rs, s                                | (m)-[rs:OBSERVED_WITH]->(s); s:Segment                                                                    |
| | |                    +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| | +NodeIndexSeek       |             26 |   10 |      15 |      78/0 | comm, m                                       | :User(mup_id)                                                                                             |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +Distinct              |             10 |    5 |       0 |     380/0 | comm                                          | comm                                                                                                      |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +Limit                 |             10 |   10 |       0 |     380/0 | cached[u.mup_id], u                           | 10                                                                                                        |
| |                      +----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
| +NodeIndexScan         |      100120771 |   10 |      11 |       0/0 | cached[u.mup_id], u                           | :User(mup_id)                                                                                             |
+------------------------+----------------+------+---------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------+
  • neo4j.log and debug.log
2019-06-26 12:59:07.395+0000 WARN [o.n.k.i.p.Procedures] Retrying operation 0 of 2
2019-06-26 12:59:07.395+0000 WARN [o.n.k.i.p.Procedures] Retrying operation 1 of 2
2019-06-26 12:59:13.745+0000 WARN [o.n.k.i.p.Procedures] Retrying operation 0 of 2
2019-06-26 12:59:13.845+0000 WARN [o.n.k.i.p.Procedures] Retrying operation 1 of 2
2019-06-26 12:59:13.946+0000 WARN [o.n.k.i.p.Procedures] Error during iterate.commit:
2019-06-26 12:59:13.946+0000 WARN [o.n.k.i.p.Procedures] 1 times: java.lang.NullPointerException

These back-to-back OPTIONAL MATCHes and late collections indicate that you likely have a huge cross product issue. Per m, you will get a cross product of every connected :Segment, every connected :Email, every connected :Hash, every connected :Adobe, and every connected :Liveramp. So looking at a single distinct m, for example, if there were 100 emails for it, 5, segments, 5 hashes, 10 adobes, and 3 liveramps, you would have 100 * 5 * 5 * 10 *3 = 75000 rows for the cross product of all of these. You are doing far too much redundant work, and only lowering the cardinality at the end with your COLLECT(distincts ), but that is still a ton of work to cope with the huge cross products you are generating.

While your LIMIT 10 query isn't showing this as a problem, likely in your last batch you may have some nodes with many connections, and in any case fixing this so it can work with any arbitrary number per OPTIONAL MATCH is a good idea to prevent cross products.

You should be either collecting the relevant lists immediately after your OPTIONAL MATCHes, or better, using pattern comprehension so you end up with a relevant collection and avoid the cross product and cardinality issues.

I would expect this to significantly cut down your execution time.

Working on a simplified version of this query, but I'll need to know if :User(mup_id) is unique, or if there can be multiple users with the same mup_id.

Also, about how many connected nodes (:Segment, :Email, :Hash, :Adobe, :Liveramp) are possible per label, per m node? If only one of each is possible, nothing more, then you can disregard my earlier cross product warning. If more than one of each is possible, especially if some can break double digits (or more) then the warning still stands.

Hey Andrew,
Mup_id is unique, the connected nodes can be in double digits but has average of 6. We tried using list comprehension but the grouping of one map per mup_id, i.e. one mup_id and a list of all its users,segments, emails,hash,etc. using List comprehension messed up this one line per mup for some reason. The above at least ensures the grouping is correct.

Per some of the Neo4j staff I think the recommendation is to move toward a Java procedure for this offload so we can offload/export data much better in parallel.

Here's the inner query with pattern comprehensions, give this a try. Note that uid isn't a list as there should only be one m per mup_id, so only a single map.

UNWIND $_batch as row with row.comm as comm 
MATCH (m:User) USING INDEX m:User(mup_id) where m.mup_id = comm
WITH comm AS mup_id, {muid: m.uid, first_obs: m.first_obs, last_obs: m.last_obs} as uid,
 [(m)-[rs:OBSERVED_WITH]->(s:Segment) | {id: s.segment_id, first_obs: rs.first_obs}] as segs,
 [(m)-[rg:OBSERVED_WITH]->(e:Email) | {id: e.email, first_obs: rg.first_obs}] as eml,
 [(m)-[rh:OBSERVED_WITH]->(h:Hash) | {id: h.hash_id, first_obs: rh.first_obs}] as hashs,
 [(m)-[ra:OBSERVED_WITH]->(a:Adobe) | {id: a.adobe_id, first_obs: ra.first_obs}] as adb,
 [(m)-[rl:OBSERVED_WITH]->(l:Liveramp) | {id: l.liveramp_id, first_obs: rl.first_obs}] as lrs
RETURN {mup_id:mup_id, uid:uid, segment:segs, email:eml,adobe:adb,liveramp:lrs,hash_id:hashs} as map

but actually there is many uids to a mup_id so it should be a list. We are looking to a do a group by Mup_id such that all the muids, segment,email,hash,adobe and liveramp ids are mapped to their appropriate mup in a single json map. Each line of the file corresponds to one Mup and each mup corresponds to many of the other ids.

I'm a little confused. You previous said "Mup_id is unique" per user, which means that you'll only have one :User node per mup_id. Since you're collecting with respect to mup_id, collecting the segment: {muid: m.uid, first_obs: m.first_obs, last_obs: m.last_obs} will never give you more than a single element, for the one node with that mup_id.

ahhhh, pardon, I misread as muid...muid is unique. Mup_id is a partition id representing many muid, and other ids for connected components. It is produced via the UnionFind algorithm. With doing the group by on mup_id it definitely gets a list of muids related to each mup_id.

Okay, got it. So yes you can collect() on that, but this because there will be multiple nodes per mup_id this also means we'll need to change the query. We need to collect all the other terms, so with each of those you'll have a list of lists, and we will need to combine the lists.

If you have APOC Procedures this is the fastest way:

UNWIND $_batch as row with row.comm as comm 
MATCH (m:User) USING INDEX m:User(mup_id) where m.mup_id = comm
WITH comm AS mup_id, collect({muid: m.uid, first_obs: m.first_obs, last_obs: m.last_obs}) as uids,
 collect([(m)-[rs:OBSERVED_WITH]->(s:Segment) | {id: s.segment_id, first_obs: rs.first_obs}]) as segsLists,
 collect([(m)-[rg:OBSERVED_WITH]->(e:Email) | {id: e.email, first_obs: rg.first_obs}]) as emlLists,
 collect([(m)-[rh:OBSERVED_WITH]->(h:Hash) | {id: h.hash_id, first_obs: rh.first_obs}]) as hashsLists,
 collect([(m)-[ra:OBSERVED_WITH]->(a:Adobe) | {id: a.adobe_id, first_obs: ra.first_obs}]) as adbLists,
 collect([(m)-[rl:OBSERVED_WITH]->(l:Liveramp) | {id: l.liveramp_id, first_obs: rl.first_obs}]) as lrsLists
 WITH mup_id, uids, apoc.coll.flatten(segsLists) as segs, apoc.coll.flatten(emlLists) as eml, 
  apoc.coll.flatten(hashsLists) as hashs, apoc.coll.flatten(adbLists) as adb, apoc.coll.flatten(lrsLists) as lrs
RETURN {mup_id:mup_id, uids:uids, segment:segs, email:eml, adobe:adb, liveramp:lrs, hash_id:hashs} as map