-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex_gene_features.py
More file actions
261 lines (250 loc) · 13.3 KB
/
index_gene_features.py
File metadata and controls
261 lines (250 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
Build a searcheable COBS index from the output of extract_genes
"""
#import cobs_index as cobs
from elasticsearch import Elasticsearch
import glob
from joblib import Parallel, delayed
import json
import networkx as nx
import os
#import pyodbc
import re
import shutil
import sys
from tqdm import tqdm
import tempfile
from BacQuerya_processing.secrets import ELASTIC_API_URL, ELASTIC_GENE_API_ID, ELASTIC_GENE_API_KEY, SQL_CONNECTION_STRING
def get_options():
import argparse
description = 'Generate a searcheable COBS index from assemblies or gene sequences'
parser = argparse.ArgumentParser(description=description,
prog='index-genes')
io_opts = parser.add_argument_group('Inputs')
io_opts.add_argument("-t",
"--type",
dest="type",
required=True,
help="index assembly files or gene sequences",
choices=['assembly', 'gene'],
type=str)
io_opts.add_argument("-i",
"--input-dir",
dest="input_dir",
required=False,
help='directory output by extract_genes (required for type=gene)',
type=str)
io_opts.add_argument("-g",
"--graph-dir",
dest="graph_dir",
required=False,
help='directory of graph output by panaroo (required for type=gene and --all-genes=False)',
type=str)
io_opts.add_argument("-a",
"--assembly-dir",
dest="assembly_dir",
required=False,
help='directory of assembly sequences (required for type=assembly)',
type=str)
io_opts.add_argument("--isolate-dir",
dest="isolate_dir",
required=False,
help='directory of isolate metadata output by extract_assembly_stats.py',
type=str)
io_opts.add_argument("-o",
"--output-dir",
dest="output_dir",
required=True,
help="output directory for generated index",
type=str)
io_opts.add_argument("--kmer-length",
dest="kmer_length",
help="specify kmer length for the constructed index (default = 15)",
default=15,
type=int)
io_opts.add_argument("--false-positive",
dest="fpr",
help="false positive rate for index. Greater fpr means smaller index (default = 0.01).",
default=0.01,
type=float)
io_opts.add_argument("--index",
dest="index_name",
required=False,
help="elasticsearch index to create/append to",
type=str)
io_opts.add_argument("--elastic-index",
dest="elastic",
help="index gene json in elastic index",
action='store_true',
default=False)
io_opts.add_argument("--threads",
dest="n_cpu",
required=False,
help="number of threads for extracting features",
default=1,
type=int)
io_opts.add_argument("--dirty",
dest="dirty",
help="keep gene files used to build the index",
action='store_true',
default=False)
args = parser.parse_args()
return (args)
def elasticsearch_isolates(allIsolatesJson,
index_name,
isolateMetadataDict):
"""Function to index gene metadata. Gene annotations are index using elastic and a list of isolates containing each gene is stored in a redis database"""
# rate of indexing with elastic decreases substantially after about 1500 items
partioned_items = [
list(allIsolatesJson.keys())[i:i + 1500] for i in range(0, len(allIsolatesJson.keys()), 1500)
]
sys.stderr.write('\nIndexing gene metadata\n')
with pyodbc.connect(SQL_CONNECTION_STRING) as conn:
with conn.cursor() as cursor:
cursor.execute('''CREATE TABLE GENE_METADATA
(GENE_ID INT PRIMARY KEY NOT NULL,
METADATA TEXT NOT NULL);''')
#cursor.execute("DROP TABLE GENE_METADATA;")
for keys in tqdm(partioned_items):
elastic_client = Elasticsearch([ELASTIC_API_URL],
api_key=(ELASTIC_GENE_API_ID, ELASTIC_GENE_API_KEY))
# iterate through features
with pyodbc.connect(SQL_CONNECTION_STRING) as conn:
with conn.cursor() as cursor:
for line in tqdm(keys):
isolate_labels = allIsolatesJson[line]["foundIn_labels"]
isolate_indices = allIsolatesJson[line]["foundIn_indices"]
isolate_biosamples = allIsolatesJson[line]["foundIn_biosamples"]
isolate_annotationIDs = allIsolatesJson[line]["member_annotation_ids"]
# delete isolates from the gene metadata to reduce elastic entry size
del allIsolatesJson[line]["foundIn_labels"]
del allIsolatesJson[line]["foundIn_indices"]
del allIsolatesJson[line]["foundIn_biosamples"]
del allIsolatesJson[line]["member_annotation_ids"]
# extract supplementary metadata for isolates identified to be containing the gene
metaList = []
for isolate_row in isolateMetadataDict:
if isolate_row["isolate_index"] in isolate_indices:
isolate_metadata = {"BioSample": isolate_row["BioSample"],
"sequenceURL": isolate_row["sequenceURL"]}
if "contig_stats" in isolate_row:
isolate_metadata.update({"contig_stats": isolate_row["contig_stats"]})
if "scaffold_stats" in isolate_row:
isolate_metadata.update({"scaffold_stats": isolate_row["scaffold_stats"]})
if "In_Silico_St" in isolate_row:
isolate_metadata.update({"In_Silico_St": isolate_row["In_Silico_St"]})
if "In_Silico_Serotype" in isolate_row:
isolate_metadata.update({"In_Silico_Serotype": isolate_row["In_Silico_Serotype"]})
if "GPSC" in isolate_row:
isolate_metadata.update({"GPSC": isolate_row["GPSC"]})
if "Country" in isolate_row:
isolate_metadata.update({"Country": isolate_row["Country"]})
if "Year" in isolate_row:
isolate_metadata.update({"Year": isolate_row["Year"]})
if "Organism_name" in isolate_row:
isolate_metadata.update({"Organism_name": isolate_row["Organism_name"]})
metaList.append(isolate_metadata)
response = elastic_client.index(index = index_name,
id = int(line),
body = allIsolatesJson[line],
request_timeout=60)
# store a list of isolates containing the gene in the SQL db
MetadataJSON = json.dumps({"foundIn_labels": isolate_labels,
"foundIn_indices": isolate_indices,
"foundIn_biosamples": isolate_biosamples,
"member_annotation_ids": isolate_annotationIDs,
"isolateMetadata": metaList})
db_command = "INSERT INTO GENE_METADATA (GENE_ID,METADATA) \
VALUES (" + str(line) + ", '" + MetadataJSON + "')"
cursor.execute(db_command)
sys.stderr.write('\nGene metadata was indexed successfully\n')
def write_gene_files(gene_dict, temp_dir):
"""Write gene sequences to individual files with index as filename"""
document_name = str(gene_dict["consistentNames"])
gene_sequence = gene_dict["sequence"]
with open(os.path.join(temp_dir, document_name + ".txt"), "w") as g:
g.write(gene_sequence)
def write_assembly_files(assembly_file, temp_dir):
"""Write assembly sequences to individual files"""
with open(assembly_file, "r") as f:
assembly_sequence = f.read()
assembly_sequence = re.sub(r'[^ACTG]', '', assembly_sequence)
assembly_basename = os.path.basename(assembly_file).replace(".fna", ".txt")
with open(os.path.join(temp_dir, assembly_basename), "w") as o:
o.write(assembly_sequence)
def create_index(temp_dir, output_dir, kmer_length, fpr):
""""Create COBS index"""
params = cobs.CompactIndexParameters()
params.term_size = kmer_length
params.clobber = True # overwrite output and temporary files
params.false_positive_rate = fpr # higher false positive rate -> smaller index
cobs.compact_construct(temp_dir,
os.path.join(output_dir,
str(kmer_length) + "_index.cobs_compact"),
index_params=params)
def main():
"""Main function. Parses command line args and calls functions."""
args = get_options()
if not os.path.exists(args.output_dir):
os.mkdir(args.output_dir)
temp_dir = os.path.join(tempfile.mkdtemp(dir=args.output_dir), "")
if args.type == "gene":
if args.elastic:
with open(os.path.join(args.input_dir, "annotatedNodes.json"), "r") as inFeatures:
geneString = inFeatures.read()
# supplement the gene metadata with metadata from the isolate metadata
with open(os.path.join(args.isolate_dir, "isolateAssemblyAttributes.json"), "r") as inIsolates:
isolateMetadataDict = json.loads(inIsolates.read())["information"]
isolateGeneDicts = json.loads(geneString)["information"]
sys.stderr.write('\nBuilding elasticsearch index\n')
elasticsearch_isolates(isolateGeneDicts,
args.index_name,
isolateMetadataDict)
# load panaroo graph and write sequence files from COG representatives
sys.stderr.write('\nLoading panaroo graph\n')
G = nx.read_gml(os.path.join(args.graph_dir, "final_graph.gml"))
with open(os.path.join(os.path.dirname(args.graph_dir), "extracted_genes", "panarooPairs.json"), "r") as jsonFile:
pairString = jsonFile.read()
pairs = json.loads(pairString)
representative_sequences = []
sys.stderr.write('\nWriting gene-specific files for COBS indexing\n')
# need to apply same constraints as those in extract_genes.py
for node in tqdm(G._node):
y = G._node[node]
gene_name = y["name"]
splitNames = gene_name.split("~~~")
# need to make sure we're not indexing annotations that have been predicted by prodigal and are not supported by existing annotations
if not all("PRED_" in name for name in splitNames):
dna = y["dna"].split(";")
for key in pairs.keys():
if pairs[key]["panarooNames"] == y["name"]:
document_name = pairs[key]["consistentNames"]
for seq in range(len(dna)):
representative_sequences.append({"consistentNames": str(document_name) + "_v" + str(seq), "sequence": dna[seq]})
job_list = [
representative_sequences[i:i + args.n_cpu] for i in range(0, len(representative_sequences), args.n_cpu)
]
# parrallelise writing of gene-specific files for indexing
for job in tqdm(job_list):
Parallel(n_jobs=args.n_cpu)(delayed(write_gene_files)(feature,
temp_dir) for feature in job)
if args.type == "assembly":
assembly_files_compressed = glob.glob(os.path.join(args.assembly_dir, "*.fna"))
job_list = [
assembly_files_compressed[i:i + args.n_cpu] for i in range(0, len(assembly_files_compressed), args.n_cpu)
]
# parrallelise writing of assembly files to temp_dir for indexing
for job in tqdm(job_list):
Parallel(n_jobs=args.n_cpu)(delayed(write_assembly_files)(a,
temp_dir) for a in job)
sys.stderr.write('\nBuilding COBS sequence index\n')
create_index(temp_dir,
args.output_dir,
args.kmer_length,
args.fpr)
sys.stderr.write('\nCleaning up raw files\n')
if not args.dirty:
shutil.rmtree(temp_dir)
sys.exit(0)
if __name__ == '__main__':
main()