Performance Issues Merging Node Relationships

Hi everyone, apologies for the lengthy post, but I'm struggling to find a way to improve the performance of my ingestions. I have all the nodes in my database already, I just need to create the relationships between the final set (chromosomes and subjects). I'm certainly no pro at either python or neo4j, so please forgive the amateur attempts!

My scripts iterate through chromosome, then subject and use the scikit-allel package to extract the data from VCF format into numpy/pandas.

My first attempt primarily iterates, first matching all relevant nodes, then merging them in separate queries. Despite this seeming very inefficient, it does appear to be the fastest yet:

for chrom in chrom_list:
    chrom_label = "Chromosome_" + str(chrom)
    samples = callset[chrom]['samples']
    print("Starting " + chrom_label)
    variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['AC','AF_AFR', 'AF_AMR', 'AF_ASN', 'AF_EUR', 'AF_MAX', 'CGT', 'CLR', 'CSQ', 'DP', 'DP4', 'ESP_MAF', 'FILTER_LowQual', 'FILTER_MinHWE', 'FILTER_MinVQSLOD', 'FILTER_PASS', 'HWE', 'ICF', 'ID', 'IS', 'PC2', 'PCHI2', 'POS', 'PR', 'QCHI2', 'QUAL', 'REF', 'ALT', 'INDEL', 'SHAPEIT', 'SNP_ID', 'TYPE', 'UGT', 'VQSLOD', 'dbSNPmismatch', 'is_snp', 'numalt'], index='POS')
    pos = variants['POS'][:]
    pos = pos.tolist()
    ref = variants['REF'][:]
    alt = variants['ALT'][:]
    dpz = callset[chrom]['calldata/DP']
    calldata = callset[chrom]['calldata']
    gt = allel.GenotypeDaskArray(calldata['GT'])
    hap = gt.to_haplotypes()
    hap = gt.to_haplotypes()
    hap1 = hap[:, ::2]
    hap2 = hap[:, 1::2]
    list_h1 = hap1[:, 0].compute()
    list_h1 = list_h1.tolist()
    list_h2 = hap2[:, 0].compute()


[8:48 AM] Iterates through subjects
    for i in range(len(samples)):
        subject = samples[i]
        dp = dpz[:, i]
        list_h1 = hap1[:, i].compute()
        list_h2 = hap2[:, i].compute()
        g = Graph()
        print(subject)
        print("Subject " + str(i + 1) + " of " + str(len(samples)) + " for " + chrom_label)
        s = matcher.match("Subject", subject_id= subject).first()
        print(s)
        if s is None:
            continue
        j = 0
        nodes = []


[8:49 AM] Matches subjects to individual nodes
        for j in range(len(pos)):
            h1 = int(list_h1[j])
            h2 = int(list_h2[j])
            k = int(pos[j])
            l = str(ref[j])
            m = str(alt[j][h1-1])
            o = str(alt[j][h2-1])
            if h1 == 0 and h2 == 0:
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)
                nodes.append(a)

            elif h1 == 0 and h2 > 0:
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=o).first()
                nodes.append(a)

            elif h1 > 0 and h2 == 0:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=l).first()
                nodes.append(a)

            elif h1 == h2 and h1 > 0:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                nodes.append(a)

            else:
                a = matcher.match(chrom_label, pos=k, bp=m).first()
                nodes.append(a)
                a = matcher.match(chrom_label, pos=k, bp=o).first()
                nodes.append(a)
            if j % 10000 == 0:
                print(str(j) + " rows complete.")
        print(subject + " matching complete.")
        print(len(nodes))


[8:50 AM] Merges nodes matched in the previous section
        j=0
        tx = g.begin()
        for j in range(len(pos)):
            read_depth = int(dp[j])
            h1 = int(list_h1[j])
            h2 = int(list_h2[j])

            if h1 == 0 and h2 == 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
            elif h1 == 0 and h2 > 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
            elif h1 > 0 and h2 == 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
            elif h1 == h2 and h1 > 0:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
            else:
                x = (2*j)
                a = nodes[x]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
                y = (2*j)+1
                b = nodes[y]
                tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
            if (j+1) % 10000 == 0:
                print(str(j + 1) + " relationships added to graph.")
                tx.process()
                tx.commit()
                tx = g.begin()
        tx.commit()
                
    print(chrom_label + " completed.")

My second attempt uses apoc.merge.relationship:

for chrom in chrom_list:
    chrom_label = "Chromosome_" + str(chrom)
    print("Starting " + chrom_label)
    variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['DP', 'ID', 'POS', 'REF', 'ALT', 'numalt'], index='POS')
    samples = callset[chrom]['samples']
    total_samples = len(samples)
    pos = variants['POS'][:].astype(int)
    ref = variants['REF'][:].astype(str)
    alt1 = variants['ALT'][:,0].astype(str)
    alt2 = variants['ALT'][:,1].astype(str)
    alt3 = variants['ALT'][:,2].astype(str)
    d = {'pos': pos, 'ref': ref,'alt1': alt1, 'alt2': alt2, 'alt3': alt3}
    df = pd.DataFrame(data = d)
    df['chrom'] = chrom_label
    calldata = callset[chrom]['calldata']
    gt = allel.GenotypeDaskArray(calldata['GT'])
    hap = gt.to_haplotypes()
    hap1 = hap[:, ::2]
    hap2 = hap[:, 1::2]
    dp = callset[chrom]['calldata/DP']
    zygosity = ['HOMOZYGOUS','HETEROZYGOUS','HETEROZYGOUS','HETEROZYGOUS',
            'HETEROZYGOUS','HOMOZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS',
            'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS', 'HETEROZYGOUS',
            'HETEROZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS']
    bp1 = [df['ref'],df['ref'],df['ref'],df['ref'],
       df['alt1'],df['alt1'],df['alt1'],df['alt1'],
    df['alt2'], df['alt2'], df['alt2'], df['alt2'],
       df['alt3'], df['alt3'], df['alt3'], df['alt3']]
    bp2 = [df['ref'], df['alt1'], df['alt2'], df['alt3'],
       df['ref'], df['alt1'], df['alt2'], df['alt3'], 
       df['ref'], df['alt1'], df['alt2'], df['alt3'], 
       df['ref'], df['alt1'], df['alt2'], df['alt3']]
    for i in range(total_samples):
        df['subject'] = str(samples[i])
        df['h1'] = hap1[:, i].compute().astype(int)
        df['h2'] = hap2[:, i].compute().astype(int)
        conditions = [
            (df['h1'] == 0) & (df['h2'] == 0), 
            (df['h1'] == 0) & (df['h2'] == 1), 
            (df['h1'] == 0) & (df['h2'] == 2), 
            (df['h1'] == 0) & (df['h2'] == 3), 
            (df['h1'] == 1) & (df['h2'] == 0), 
            (df['h1'] == 1) & (df['h2'] == 1), 
            (df['h1'] == 1) & (df['h2'] == 2), 
            (df['h1'] == 1) & (df['h2'] == 3), 
            (df['h1'] == 2) & (df['h2'] == 0), 
            (df['h1'] == 2) & (df['h2'] == 1), 
            (df['h1'] == 2) & (df['h2'] == 2), 
            (df['h1'] == 2) & (df['h2'] == 3), 
            (df['h1'] == 3) & (df['h2'] == 0), 
            (df['h1'] == 3) & (df['h2'] == 1), 
            (df['h1'] == 3) & (df['h2'] == 2), 
            (df['h1'] == 3) & (df['h2'] == 3)]
        df['zygosity'] = np.select(conditions, zygosity)
        df['bp1'] = np.select(conditions, bp1)
        df['bp2'] = np.select(conditions, bp2)
        df['dp'] = dp[:,i].astype(int)
        dict_a = df.to_dict('records')
        entries_to_remove = ('ref','alt1','alt2','alt3','h1','h2','chrom')
        for k in entries_to_remove:
            d.pop(k, None)
        query1 = '''
                    UNWIND $mapEntry AS mItem
                    MATCH (s:Subject {subject_id: mItem["subject"]}), 
                    (a:'''
        query2 = ''' {pos: mItem["pos"], bp: mItem["bp1"]}),
                    (b:'''
        query3 = ''' {pos: mItem["pos"], bp: mItem["bp2"]})
                    CALL apoc.merge.relationship(s, mItem["zygosity"], {}, {dp: mItem["dp"]},  a) YIELD rel as rels1
                    CALL apoc.merge.relationship(s, mItem["zygosity"], {}, {dp: mItem["dp"]}, b) YIELD rel as rels2
                    RETURN (COUNT(rels1) + COUNT(rels2)) as total'''
        full_query = query1 + chrom_label + query2 + chrom_label + query3
        tx = graph.begin()
        rel_count = tx.evaluate(full_query, mapEntry = dict_a)
        tx.commit()
        print("Subject " + str(i) + " complete with " + str(rel_count) + " relationships, " + str(i) + " of " + str(total_samples) + ".")

And my most recent attempt uses apoc.periodic.iterate, but if anything actually seems to work more slowly:

for chrom in chrom_list:
    chrom_label = "Chromosome_" + str(chrom)
    print("Starting " + chrom_label)
    variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['DP', 'ID', 'POS', 'REF', 'ALT', 'numalt'], index='POS')
    samples = callset[chrom]['samples']
    total_samples = len(samples)
    pos = variants['POS'][:].astype(int)
    ref = variants['REF'][:].astype(str)
    alt1 = variants['ALT'][:,0].astype(str)
    alt2 = variants['ALT'][:,1].astype(str)
    alt3 = variants['ALT'][:,2].astype(str)
    d = {'pos': pos, 'ref': ref,'alt1': alt1, 'alt2': alt2, 'alt3': alt3}
    df = pd.DataFrame(data = d)
    df['chrom'] = chrom_label
    calldata = callset[chrom]['calldata']
    gt = allel.GenotypeDaskArray(calldata['GT'])
    hap = gt.to_haplotypes()
    hap1 = hap[:, ::2]
    hap2 = hap[:, 1::2]
    dp = callset[chrom]['calldata/DP']
    zygosity = ['HOMOZYGOUS','HETEROZYGOUS','HETEROZYGOUS','HETEROZYGOUS',
            'HETEROZYGOUS','HOMOZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS',
            'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS', 'HETEROZYGOUS',
            'HETEROZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS']
    bp1 = [df['ref'],df['ref'],df['ref'],df['ref'],
       df['alt1'],df['alt1'],df['alt1'],df['alt1'],
    df['alt2'], df['alt2'], df['alt2'], df['alt2'],
       df['alt3'], df['alt3'], df['alt3'], df['alt3']]
    bp2 = [df['ref'], df['alt1'], df['alt2'], df['alt3'],
       df['ref'], df['alt1'], df['alt2'], df['alt3'], 
       df['ref'], df['alt1'], df['alt2'], df['alt3'], 
       df['ref'], df['alt1'], df['alt2'], df['alt3']]
    for i in range(38,total_samples):
        df['subject'] = str(samples[i])
        df['h1'] = hap1[:, i].compute().astype(int)
        df['h2'] = hap2[:, i].compute().astype(int)
        conditions = [
            (df['h1'] == 0) & (df['h2'] == 0), 
            (df['h1'] == 0) & (df['h2'] == 1), 
            (df['h1'] == 0) & (df['h2'] == 2), 
            (df['h1'] == 0) & (df['h2'] == 3), 
            (df['h1'] == 1) & (df['h2'] == 0), 
            (df['h1'] == 1) & (df['h2'] == 1), 
            (df['h1'] == 1) & (df['h2'] == 2), 
            (df['h1'] == 1) & (df['h2'] == 3), 
            (df['h1'] == 2) & (df['h2'] == 0), 
            (df['h1'] == 2) & (df['h2'] == 1), 
            (df['h1'] == 2) & (df['h2'] == 2), 
            (df['h1'] == 2) & (df['h2'] == 3), 
            (df['h1'] == 3) & (df['h2'] == 0), 
            (df['h1'] == 3) & (df['h2'] == 1), 
            (df['h1'] == 3) & (df['h2'] == 2), 
            (df['h1'] == 3) & (df['h2'] == 3)]
        df['zygosity'] = np.select(conditions, zygosity)
        df['bp1'] = np.select(conditions, bp1)
        df['bp2'] = np.select(conditions, bp2)
        df['dp'] = dp[:,i].astype(int)
        dict_a = df.to_dict('records')
        entries_to_remove = ('ref','alt1','alt2','alt3','h1','h2','chrom')
        for k in entries_to_remove:
            d.pop(k, None)
        query1 = '''
                    UNWIND $mapEntry AS mItem
                    CALL apoc.periodic.iterate(
                    "MATCH (s:Subject {subject_id: '" + mItem.subject + "'}), 
                    (a:'''
        query2 = ''' {pos: '" + mItem.pos + "', bp: '" + mItem.bp1 + "'}),
                    (b:'''
        query3 = ''' {pos: '" + mItem.pos + "', bp: '" + mItem.bp2 + "'})
                    RETURN s, a, b",
                    "CALL apoc.merge.relationship(s, " + mItem.zygosity + ", {}, {dp: '" + mItem.dp + "'},  a) YIELD rel as rels1
                    CALL apoc.merge.relationship(s, " + mItem.zygosity + ", {}, {dp: '" + mItem.dp + "'}, b) YIELD rel as rels2
                    RETURN (COUNT(rels1) + COUNT(rels2)) as total",
                    {batchSize:40000}) YIELD batches, total, errorMessages
                    RETURN batches, total, errorMessages'''
        full_query = query1 + chrom_label + query2 + chrom_label + query3
        tx = graph.begin()
        rel_count = tx.evaluate(full_query, mapEntry = dict_a)
        tx.commit()
        print("Subject " + str(i) + " complete with " + str(rel_count) + " relationships, " + str(i) + " of " + str(total_samples) + ".")

Each chromosome is labelled "Chromosome_n", and each has a node key constraint on bp and pos. The dictionary for the last two outputs in the following format:

[{'pos': 264941,
  'ref': 'A',
  'alt1': 'G',
  'alt2': '',
  'alt3': '',
  'chrom': 'Chromosome_4',
  'subject': 'UK10K_ALS5085249',
  'h1': 0,
  'h2': 0,
  'zygosity': 'HOMOZYGOUS',
  'bp1': 'A',
  'bp2': 'A',
  'dp': 0},
 {'pos': 265013,
  'ref': 'G',
  'alt1': 'A',
  'alt2': '',
  'alt3': '',
  'chrom': 'Chromosome_4',
  'subject': 'UK10K_ALS5085249',
  'h1': 0,
  'h2': 0,
  'zygosity': 'HOMOZYGOUS',
  'bp1': 'G',
  'bp2': 'G',
  'dp': 9},
 {'pos': 265024,
  'ref': 'C',
  'alt1': 'T',
  'alt2': '',
  'alt3': '',
  'chrom': 'Chromosome_4',
  'subject': 'UK10K_ALS5085249',
  'h1': 0,
  'h2': 0,
  'zygosity': 'HOMOZYGOUS',
  'bp1': 'C',
  'bp2': 'C',
  'dp': 10},...

I tried the batches at both 10,000 and 40,000 (for the chromosome I was testing, there were just under 40,000 relationships) and neither seem to make a substantial performance improvement. There are somewhere between 10-60,000 chromosome nodes per chromosome, with ~2,000 subjects.

Forgive the obvious question, but do you know about database indexes? That can speed things up a lot:
https://neo4j.com/docs/cypher-manual/current/administration/indexes-for-search-performance/

I'm just starting my Neo4J experience, but one thing you might consider, is where you are using empty values for things alt1, leave out the property. When there is no property, then the value returned is NULL. I suspect it would be faster for Neo4J when trying to get a property that doesn't exist, to return NULL than, getting the property, then return the empty string, and then having to compare the empty string with a string.

Also... I suggest that using encoded integers instead of letters for your base pairs could speed things up.

E.g. define in your python:

A = 1
C = 2
G = 3
T = 4

'A' is going to be a string which is going to be more expensive to compare (it's hard to tell how much though...)

Most of my other suggestions concerns Python (which I know better):

I see also a fair number of string to int conversions. If you can keep ints as ints instead of strings, that will help too.

I don't know how often the prints are used, but you're better off with formatted strings, as it avoids string concatenations. In Python 3:

print(f"Starting {chrom_label}")
and:
chrom_label = f"Chromosome_{chrom}"

Do you know about "list comprehension" in python? If you can avoid explicit for loops in python you're better off. (That's because Python is an interpreted language, so its execution of for loops is much slower than in a compiled language.) See: List Comprehensions in Python - PythonForBeginners.com

Pandas dataframe can be slow. You might consider using the dask library instead. It can parallelize your data on multiple cores. It's a little trickier to use than straight pandas. (it will make a bigger difference if the DF is large).

You have some (what appear to be) complex constant assignments inside a loop. E.g. zygosity = [...] and conditions =[...] and entries_to_remove = () and d = {...} . That is going to be very slow, especially the second on that's inside a double for loop. Lists should also be made into numpy arrays, which are always faster than Python Lists. Converting a dictionary into a DF is also slow df = pd.DataFrame(data = d). Why not make a DF, then clone it? That would be faster.

Do you know about enumerate? You can replace:

for i in range(len(samples)):
        subject = samples[i]

with:

for i, subject in enumerate(samples):

Also, do you know about profiling in Python? That can help you find your performance bottlenecks. It's hard to say whether your slowness is due to Python or Neo4J. Also, you can avoid optimizing code that doesn't need to be optimized! (Not all of my suggestions will buy you large improvements.)

See: Python Profiling | Medium

and Easy Python memory profiling for data scientists and scientists with Fil

Good luck!