Are Cypher subqueries (`CALL {}`) synchronous or aysnchronous? I thought they were synchronous,
meaning that if a query has two subqueries, the execution of the second would
not begin until the first is complete. I also thought that the results of the
transaction implied by a second would be available to the first after each
completion of the second.
At least with the way I'm calling the subqueries, this is not the case. I
invite suggestions about what I need to do differently to accomplish my goal.
I apologize for the length of this post, I don't know how to shorten it.
Everything that follows is running on neo4j-enterprise v 4.4.6, running on a
well-provisioned and robust AWS EC2 instance running RockyLinux. Python code is
running Python v3.9.6.
I have an interconnected set of processes (described below) where the results
of the second process are needed by the first process. I have been driving
these in Neo4J by a Python process that uses the standard Python driver.
There are two steps to the calculation. There is feedback from the second step
to the first step, and that feedback is the source of the issue I see in my
combined cypher.
Until now, the Python code performs the first task for a given date. It then
performs the second task for the same date. This means that the results of the
second task are available to the first task when it runs for the next date.
For example, instances of PredictedToRise are computed for 2022-02-03. Then
instances of HotSpot are computed for 2022-02-03. Instances of PredictedToRise
are then computed for 2022-02-04. The PredictedToRise instances created for
2022-02-04 are restricted by the HotSpot list created for 2022-02-03.
Since it is being driven from Python, the processing for a given date (such as
2022-02-03) is always finished before the processing for the subsequent date
(such 2022-02-04) begins.
For a variety of reasons, including performance, I attempted to do all this in
Cypher. An all-Cypher approach must produce the same results as the current
approach using the Python driver. I appreciate any guidance about what I must
do differently to accomplish this.
I migrated the two steps into subqueries (using the `CALL{}` directive)
making the apparently incorrect assumption that when processing a sequence of
date instances (produced by the `UNWIND` directive), the (lexically) second
subquery does not begin until the lexically first subquery has completed. I
also assumed that processing on the second date in the UNWIND sequence does
not begin until processing of both subqueries for the first date have
completed.
I've shown psuedocode for the Cypher I use below. The code runs without
complaint. It generates instances of PredictedToRise and HotSpot, and it adds
the expected recentHotSpotsJSON property to each instance of HotSpotMetadata.
Sadly, it produces results that are dramatically different from the current
Python ingestor. In particular, it behaves as if the recentHotSpotsJSON
generated by the second subquery on a previous date is not restricting the
first subquery for the subsequent date.
The instance count for PredictedToRise and HotSpot is the same for the first
date that is processed. Those counts are dramatically higher in the all-Cypher
approach for each date after the first date. It appears that the attempt to
update recentHotSpotsJSON in the second subquery is not available (in time) for
the first subquery. The values of recentHotSpotsJSON are each available when
the all-Cypher approach finishes, but they are much too high.
The code in question is handling a sequence of datapoints, where each datapoint
is the value of a specific property (cumulative COVID case count) measured on a
given date (the "pertainsDate") and reported for one of the 3K+ US counties.
Each county is identified by a "stateCountyFIPSCode" assigned by the US census
bureau.
For each date, instances of "PredictedToRise" are generated based on a complex
calculation of current and historical data. "PredictedToRise" is a labeled node
that contains a pertainsDate (the date of the prediction) and a
stateCountyFIPSCode. The existence of an instance of PredictedToRise signifies
that the given county is expected to show a statistically significant increase
on the given date.
After PredictedToRise instances are generated for a given date, those instances
are used to generate instances of "HotSpot", another labeled node that contains
a pertainsDate and stateCountyFIPSCode. The county identified by each instance
of PredictedToRise has geographic neighbors that it may infect. Each infected
neighbor of a PredictedToRise instance (with some complicated filtering) is a
HotSpot for that date.
The feedback is that the PredictedToRise instances for a given date are
filtered by the HotSpot instances for the immediately previous date.
This means that after the HotSpot instances are generated for a given date,
they are collected in a list for that date that is kept separately. The list is
stored and queried as a JSON string for implementation convenience (and to
reduce the memory footprint of all this).
Here is the pseudo-cypher:
UNWIND(sequenceOfDateInstances) AS processDate
CALL { // Add instances of PredictedToRise
WITH
processDate
WITH
processDate - duration({days: 1}) AS restrictionDate
MATCH(metadata:Metadata {pertainsDate: restrictionDate})
WITH
metadata.recentHotSpotsJSON AS recentHotSpotsJSON
WITH
<complex-calculation> AS stateCountyFIPSCode
WHERE
<complex calculation>
AND SIZE(apoc.text.indexesOf(recentHotSpotsJSON, neighborAdjustedCaseCount.stateCountyFIPSCode)) = 0
CREATE (predictedToRise:PredictedToRise)
SET
predictedToRise.pertainsDate = pertainsDate,
predictedToRise.stateCountyFIPSCode = stateCountyFIPSCode
}
CALL { // Add instances of HotSpot
WITH
processDate
MATCH(predictedToRise:PredictedToRise {pertainsDate: processDate})
WITH
<complex-calculation using predictedToRise> AS hotSpot
CREATE (hotSpot:ZeetixHotSpot)
SET
hotSpot.pertainsDate = pertainsDate,
hotSpot.stateCountyFIPSCode = predictedToRise.stateCountyFIPSCode
WITH
processDate,
lookbackPeriod // instance of duration
WITH
processDate,
pertainsDate - lookbackPeriod AS startDate
MATCH (hotSpotMetadata: HotSpotMetadata {pertainsDate: processDate})
MATCH(hotSpot:HotSpot)
WHERE
hotSpot.pertainsDate >= startDate
AND hotSpot.pertainsDate <= processDate
WITH
hotSpotMetadata,
hotSpot.stateCountyFIPSCode AS hotSpotStateCountyFIPSCode
ORDER BY
hotSpot.stateCountyFIPSCode
WITH
hotSpotMetadata,
apoc.convert.toJson(COLLECT(DISTINCT(hotSpotStateCountyFIPSCode))) AS recentHotSpotsJSON
SET
hotSpotMetadata.recentHotSpotsJSON = recentHotSpotsJSON