Hi everyone, apologies for the lengthy post, but I'm struggling to find a way to improve the performance of my ingestions. I have all the nodes in my database already, I just need to create the relationships between the final set (chromosomes and subjects). I'm certainly no pro at either python or neo4j, so please forgive the amateur attempts!
My scripts iterate through chromosome, then subject and use the scikit-allel package to extract the data from VCF format into numpy/pandas.
My first attempt primarily iterates, first matching all relevant nodes, then merging them in separate queries. Despite this seeming very inefficient, it does appear to be the fastest yet:
for chrom in chrom_list:
chrom_label = "Chromosome_" + str(chrom)
samples = callset[chrom]['samples']
print("Starting " + chrom_label)
variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['AC','AF_AFR', 'AF_AMR', 'AF_ASN', 'AF_EUR', 'AF_MAX', 'CGT', 'CLR', 'CSQ', 'DP', 'DP4', 'ESP_MAF', 'FILTER_LowQual', 'FILTER_MinHWE', 'FILTER_MinVQSLOD', 'FILTER_PASS', 'HWE', 'ICF', 'ID', 'IS', 'PC2', 'PCHI2', 'POS', 'PR', 'QCHI2', 'QUAL', 'REF', 'ALT', 'INDEL', 'SHAPEIT', 'SNP_ID', 'TYPE', 'UGT', 'VQSLOD', 'dbSNPmismatch', 'is_snp', 'numalt'], index='POS')
pos = variants['POS'][:]
pos = pos.tolist()
ref = variants['REF'][:]
alt = variants['ALT'][:]
dpz = callset[chrom]['calldata/DP']
calldata = callset[chrom]['calldata']
gt = allel.GenotypeDaskArray(calldata['GT'])
hap = gt.to_haplotypes()
hap = gt.to_haplotypes()
hap1 = hap[:, ::2]
hap2 = hap[:, 1::2]
list_h1 = hap1[:, 0].compute()
list_h1 = list_h1.tolist()
list_h2 = hap2[:, 0].compute()
[8:48 AM] Iterates through subjects
for i in range(len(samples)):
subject = samples[i]
dp = dpz[:, i]
list_h1 = hap1[:, i].compute()
list_h2 = hap2[:, i].compute()
g = Graph()
print(subject)
print("Subject " + str(i + 1) + " of " + str(len(samples)) + " for " + chrom_label)
s = matcher.match("Subject", subject_id= subject).first()
print(s)
if s is None:
continue
j = 0
nodes = []
[8:49 AM] Matches subjects to individual nodes
for j in range(len(pos)):
h1 = int(list_h1[j])
h2 = int(list_h2[j])
k = int(pos[j])
l = str(ref[j])
m = str(alt[j][h1-1])
o = str(alt[j][h2-1])
if h1 == 0 and h2 == 0:
a = matcher.match(chrom_label, pos=k, bp=l).first()
nodes.append(a)
nodes.append(a)
elif h1 == 0 and h2 > 0:
a = matcher.match(chrom_label, pos=k, bp=l).first()
nodes.append(a)
a = matcher.match(chrom_label, pos=k, bp=o).first()
nodes.append(a)
elif h1 > 0 and h2 == 0:
a = matcher.match(chrom_label, pos=k, bp=m).first()
nodes.append(a)
a = matcher.match(chrom_label, pos=k, bp=l).first()
nodes.append(a)
elif h1 == h2 and h1 > 0:
a = matcher.match(chrom_label, pos=k, bp=m).first()
nodes.append(a)
nodes.append(a)
else:
a = matcher.match(chrom_label, pos=k, bp=m).first()
nodes.append(a)
a = matcher.match(chrom_label, pos=k, bp=o).first()
nodes.append(a)
if j % 10000 == 0:
print(str(j) + " rows complete.")
print(subject + " matching complete.")
print(len(nodes))
[8:50 AM] Merges nodes matched in the previous section
j=0
tx = g.begin()
for j in range(len(pos)):
read_depth = int(dp[j])
h1 = int(list_h1[j])
h2 = int(list_h2[j])
if h1 == 0 and h2 == 0:
x = (2*j)
a = nodes[x]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
elif h1 == 0 and h2 > 0:
x = (2*j)
a = nodes[x]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
y = (2*j)+1
b = nodes[y]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
elif h1 > 0 and h2 == 0:
x = (2*j)
a = nodes[x]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
y = (2*j)+1
b = nodes[y]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
elif h1 == h2 and h1 > 0:
x = (2*j)
a = nodes[x]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HOMOZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
else:
x = (2*j)
a = nodes[x]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {A} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "A":a.identity,"DP":read_depth})
y = (2*j)+1
b = nodes[y]
tx.run("MATCH (s) WHERE id(s) = {S} MATCH (a) WHERE id(a) = {B} MERGE (s)-[r:HETEROZYGOUS {dp:{DP}}]->(a)", {"S":s.identity, "B":b.identity, "DP":read_depth})
if (j+1) % 10000 == 0:
print(str(j + 1) + " relationships added to graph.")
tx.process()
tx.commit()
tx = g.begin()
tx.commit()
print(chrom_label + " completed.")
My second attempt uses apoc.merge.relationship:
for chrom in chrom_list:
chrom_label = "Chromosome_" + str(chrom)
print("Starting " + chrom_label)
variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['DP', 'ID', 'POS', 'REF', 'ALT', 'numalt'], index='POS')
samples = callset[chrom]['samples']
total_samples = len(samples)
pos = variants['POS'][:].astype(int)
ref = variants['REF'][:].astype(str)
alt1 = variants['ALT'][:,0].astype(str)
alt2 = variants['ALT'][:,1].astype(str)
alt3 = variants['ALT'][:,2].astype(str)
d = {'pos': pos, 'ref': ref,'alt1': alt1, 'alt2': alt2, 'alt3': alt3}
df = pd.DataFrame(data = d)
df['chrom'] = chrom_label
calldata = callset[chrom]['calldata']
gt = allel.GenotypeDaskArray(calldata['GT'])
hap = gt.to_haplotypes()
hap1 = hap[:, ::2]
hap2 = hap[:, 1::2]
dp = callset[chrom]['calldata/DP']
zygosity = ['HOMOZYGOUS','HETEROZYGOUS','HETEROZYGOUS','HETEROZYGOUS',
'HETEROZYGOUS','HOMOZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS',
'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS', 'HETEROZYGOUS',
'HETEROZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS']
bp1 = [df['ref'],df['ref'],df['ref'],df['ref'],
df['alt1'],df['alt1'],df['alt1'],df['alt1'],
df['alt2'], df['alt2'], df['alt2'], df['alt2'],
df['alt3'], df['alt3'], df['alt3'], df['alt3']]
bp2 = [df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3']]
for i in range(total_samples):
df['subject'] = str(samples[i])
df['h1'] = hap1[:, i].compute().astype(int)
df['h2'] = hap2[:, i].compute().astype(int)
conditions = [
(df['h1'] == 0) & (df['h2'] == 0),
(df['h1'] == 0) & (df['h2'] == 1),
(df['h1'] == 0) & (df['h2'] == 2),
(df['h1'] == 0) & (df['h2'] == 3),
(df['h1'] == 1) & (df['h2'] == 0),
(df['h1'] == 1) & (df['h2'] == 1),
(df['h1'] == 1) & (df['h2'] == 2),
(df['h1'] == 1) & (df['h2'] == 3),
(df['h1'] == 2) & (df['h2'] == 0),
(df['h1'] == 2) & (df['h2'] == 1),
(df['h1'] == 2) & (df['h2'] == 2),
(df['h1'] == 2) & (df['h2'] == 3),
(df['h1'] == 3) & (df['h2'] == 0),
(df['h1'] == 3) & (df['h2'] == 1),
(df['h1'] == 3) & (df['h2'] == 2),
(df['h1'] == 3) & (df['h2'] == 3)]
df['zygosity'] = np.select(conditions, zygosity)
df['bp1'] = np.select(conditions, bp1)
df['bp2'] = np.select(conditions, bp2)
df['dp'] = dp[:,i].astype(int)
dict_a = df.to_dict('records')
entries_to_remove = ('ref','alt1','alt2','alt3','h1','h2','chrom')
for k in entries_to_remove:
d.pop(k, None)
query1 = '''
UNWIND $mapEntry AS mItem
MATCH (s:Subject {subject_id: mItem["subject"]}),
(a:'''
query2 = ''' {pos: mItem["pos"], bp: mItem["bp1"]}),
(b:'''
query3 = ''' {pos: mItem["pos"], bp: mItem["bp2"]})
CALL apoc.merge.relationship(s, mItem["zygosity"], {}, {dp: mItem["dp"]}, a) YIELD rel as rels1
CALL apoc.merge.relationship(s, mItem["zygosity"], {}, {dp: mItem["dp"]}, b) YIELD rel as rels2
RETURN (COUNT(rels1) + COUNT(rels2)) as total'''
full_query = query1 + chrom_label + query2 + chrom_label + query3
tx = graph.begin()
rel_count = tx.evaluate(full_query, mapEntry = dict_a)
tx.commit()
print("Subject " + str(i) + " complete with " + str(rel_count) + " relationships, " + str(i) + " of " + str(total_samples) + ".")
And my most recent attempt uses apoc.periodic.iterate, but if anything actually seems to work more slowly:
for chrom in chrom_list:
chrom_label = "Chromosome_" + str(chrom)
print("Starting " + chrom_label)
variants = allel.VariantChunkedTable(callset[chrom]['variants'], names=['DP', 'ID', 'POS', 'REF', 'ALT', 'numalt'], index='POS')
samples = callset[chrom]['samples']
total_samples = len(samples)
pos = variants['POS'][:].astype(int)
ref = variants['REF'][:].astype(str)
alt1 = variants['ALT'][:,0].astype(str)
alt2 = variants['ALT'][:,1].astype(str)
alt3 = variants['ALT'][:,2].astype(str)
d = {'pos': pos, 'ref': ref,'alt1': alt1, 'alt2': alt2, 'alt3': alt3}
df = pd.DataFrame(data = d)
df['chrom'] = chrom_label
calldata = callset[chrom]['calldata']
gt = allel.GenotypeDaskArray(calldata['GT'])
hap = gt.to_haplotypes()
hap1 = hap[:, ::2]
hap2 = hap[:, 1::2]
dp = callset[chrom]['calldata/DP']
zygosity = ['HOMOZYGOUS','HETEROZYGOUS','HETEROZYGOUS','HETEROZYGOUS',
'HETEROZYGOUS','HOMOZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS',
'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS', 'HETEROZYGOUS',
'HETEROZYGOUS', 'HETEROZYGOUS', 'HETEROZYGOUS', 'HOMOZYGOUS']
bp1 = [df['ref'],df['ref'],df['ref'],df['ref'],
df['alt1'],df['alt1'],df['alt1'],df['alt1'],
df['alt2'], df['alt2'], df['alt2'], df['alt2'],
df['alt3'], df['alt3'], df['alt3'], df['alt3']]
bp2 = [df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3'],
df['ref'], df['alt1'], df['alt2'], df['alt3']]
for i in range(38,total_samples):
df['subject'] = str(samples[i])
df['h1'] = hap1[:, i].compute().astype(int)
df['h2'] = hap2[:, i].compute().astype(int)
conditions = [
(df['h1'] == 0) & (df['h2'] == 0),
(df['h1'] == 0) & (df['h2'] == 1),
(df['h1'] == 0) & (df['h2'] == 2),
(df['h1'] == 0) & (df['h2'] == 3),
(df['h1'] == 1) & (df['h2'] == 0),
(df['h1'] == 1) & (df['h2'] == 1),
(df['h1'] == 1) & (df['h2'] == 2),
(df['h1'] == 1) & (df['h2'] == 3),
(df['h1'] == 2) & (df['h2'] == 0),
(df['h1'] == 2) & (df['h2'] == 1),
(df['h1'] == 2) & (df['h2'] == 2),
(df['h1'] == 2) & (df['h2'] == 3),
(df['h1'] == 3) & (df['h2'] == 0),
(df['h1'] == 3) & (df['h2'] == 1),
(df['h1'] == 3) & (df['h2'] == 2),
(df['h1'] == 3) & (df['h2'] == 3)]
df['zygosity'] = np.select(conditions, zygosity)
df['bp1'] = np.select(conditions, bp1)
df['bp2'] = np.select(conditions, bp2)
df['dp'] = dp[:,i].astype(int)
dict_a = df.to_dict('records')
entries_to_remove = ('ref','alt1','alt2','alt3','h1','h2','chrom')
for k in entries_to_remove:
d.pop(k, None)
query1 = '''
UNWIND $mapEntry AS mItem
CALL apoc.periodic.iterate(
"MATCH (s:Subject {subject_id: '" + mItem.subject + "'}),
(a:'''
query2 = ''' {pos: '" + mItem.pos + "', bp: '" + mItem.bp1 + "'}),
(b:'''
query3 = ''' {pos: '" + mItem.pos + "', bp: '" + mItem.bp2 + "'})
RETURN s, a, b",
"CALL apoc.merge.relationship(s, " + mItem.zygosity + ", {}, {dp: '" + mItem.dp + "'}, a) YIELD rel as rels1
CALL apoc.merge.relationship(s, " + mItem.zygosity + ", {}, {dp: '" + mItem.dp + "'}, b) YIELD rel as rels2
RETURN (COUNT(rels1) + COUNT(rels2)) as total",
{batchSize:40000}) YIELD batches, total, errorMessages
RETURN batches, total, errorMessages'''
full_query = query1 + chrom_label + query2 + chrom_label + query3
tx = graph.begin()
rel_count = tx.evaluate(full_query, mapEntry = dict_a)
tx.commit()
print("Subject " + str(i) + " complete with " + str(rel_count) + " relationships, " + str(i) + " of " + str(total_samples) + ".")
Each chromosome is labelled "Chromosome_n", and each has a node key constraint on bp and pos. The dictionary for the last two outputs in the following format:
[{'pos': 264941,
'ref': 'A',
'alt1': 'G',
'alt2': '',
'alt3': '',
'chrom': 'Chromosome_4',
'subject': 'UK10K_ALS5085249',
'h1': 0,
'h2': 0,
'zygosity': 'HOMOZYGOUS',
'bp1': 'A',
'bp2': 'A',
'dp': 0},
{'pos': 265013,
'ref': 'G',
'alt1': 'A',
'alt2': '',
'alt3': '',
'chrom': 'Chromosome_4',
'subject': 'UK10K_ALS5085249',
'h1': 0,
'h2': 0,
'zygosity': 'HOMOZYGOUS',
'bp1': 'G',
'bp2': 'G',
'dp': 9},
{'pos': 265024,
'ref': 'C',
'alt1': 'T',
'alt2': '',
'alt3': '',
'chrom': 'Chromosome_4',
'subject': 'UK10K_ALS5085249',
'h1': 0,
'h2': 0,
'zygosity': 'HOMOZYGOUS',
'bp1': 'C',
'bp2': 'C',
'dp': 10},...
I tried the batches at both 10,000 and 40,000 (for the chromosome I was testing, there were just under 40,000 relationships) and neither seem to make a substantial performance improvement. There are somewhere between 10-60,000 chromosome nodes per chromosome, with ~2,000 subjects.