Skip to content

Commit 624bd2f

Browse files
authored
Merge pull request #312 from TranslatorSRI/improve-logging
This is to track how quickly NodeNorm normalizes identifiers, and also which identifiers are being normalized as per #277 (comment). Also removed some unnecessary catch-everything blocks as per #180.
2 parents e885c64 + 531d7c3 commit 624bd2f

6 files changed

Lines changed: 658 additions & 700 deletions

File tree

node_normalizer/load_compendia.py

Lines changed: 119 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -84,48 +84,44 @@ async def load(compendia_files, block_size, dry_run) -> bool:
8484
# 5-X: conflation databases consisting of canonical_id -> (list of conflated canonical_ids)
8585
# Each of these databases corresponds to a particular conflation e.g. gene/protein or chemical/drug
8686

87-
try:
88-
# get the list of files in the directory
89-
types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db")
90-
# for each file validate and process
91-
92-
# check the validity of the files
93-
for comp in compendia_files:
94-
if not validate_compendia(comp):
95-
logger.warning(f"Compendia file {comp} is invalid.")
96-
return False
97-
98-
for comp in compendia_files:
99-
if not validate_compendia(comp):
100-
logger.warning(f"Compendia file {comp} is invalid.")
101-
return False
102-
103-
104-
for comp in compendia_files:
105-
# check the validity of the file
106-
107-
if not validate_compendia(comp):
108-
logger.warning(f"Compendia file {comp} is invalid.")
109-
continue
110-
111-
# try to load the file
112-
loaded = await load_compendium(comp, block_size, dry_run)
113-
semantic_types_redis_pipeline = types_prefixes_redis.pipeline()
114-
# @TODO add meta data about files eg. checksum to this object
115-
# semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes}))
116-
if dry_run:
117-
response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline)
118-
if asyncio.coroutines.iscoroutine(response):
119-
await response
120-
# self.source_prefixes = {}
121-
if not loaded:
122-
logger.warning(f"Compendia file {comp} did not load.")
123-
continue
124-
# merge all semantic counts from other files / loaders
125-
await merge_semantic_meta_data()
126-
except Exception as e:
127-
logger.error(f"Exception thrown in load(): {e}")
128-
raise e
87+
# get the list of files in the directory
88+
types_prefixes_redis: redis_adapter.RedisConnection = await get_redis("curie_to_bl_type_db")
89+
# for each file validate and process
90+
91+
# check the validity of the files
92+
for comp in compendia_files:
93+
if not validate_compendia(comp):
94+
logger.warning(f"Compendia file {comp} is invalid.")
95+
return False
96+
97+
for comp in compendia_files:
98+
if not validate_compendia(comp):
99+
logger.warning(f"Compendia file {comp} is invalid.")
100+
return False
101+
102+
103+
for comp in compendia_files:
104+
# check the validity of the file
105+
106+
if not validate_compendia(comp):
107+
logger.warning(f"Compendia file {comp} is invalid.")
108+
continue
109+
110+
# try to load the file
111+
loaded = await load_compendium(comp, block_size, dry_run)
112+
semantic_types_redis_pipeline = types_prefixes_redis.pipeline()
113+
# @TODO add meta data about files eg. checksum to this object
114+
# semantic_types_redis_pipeline.set(f"file-{str(comp)}", json.dumps({"source_prefixes": self.source_prefixes}))
115+
if dry_run:
116+
response = await redis_adapter.RedisConnection.execute_pipeline(semantic_types_redis_pipeline)
117+
if asyncio.coroutines.iscoroutine(response):
118+
await response
119+
# self.source_prefixes = {}
120+
if not loaded:
121+
logger.warning(f"Compendia file {comp} did not load.")
122+
continue
123+
# merge all semantic counts from other files / loaders
124+
await merge_semantic_meta_data()
129125

130126
# return to the caller
131127
return True
@@ -237,97 +233,93 @@ def get_ancestors(input_type):
237233

238234
# init a line counter
239235
line_counter: int = 0
240-
try:
241-
term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db")
242-
id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db")
243-
id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db")
244-
info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db")
245-
246-
term2id_pipeline = term2id_redis.pipeline()
247-
id2eqids_pipeline = id2eqids_redis.pipeline()
248-
id2type_pipeline = id2type_redis.pipeline()
249-
info_content_pipeline = info_content_redis.pipeline()
250-
251-
with open(compendium_filename, "r", encoding="utf-8") as compendium:
252-
logger.info(f"Processing {compendium_filename}...")
253-
254-
# for each line in the file
255-
for line in compendium:
256-
line_counter = line_counter + 1
257-
258-
# load the line into memory
259-
instance: dict = json.loads(line)
260-
261-
# save the identifier
262-
# "The" identifier is the first one in the presorted identifiers list
263-
identifier: str = instance["identifiers"][0]["i"]
264-
265-
# We want to accumulate statistics for each implied type as well, though we are only keeping the
266-
# leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same
267-
# list on output.
268-
semantic_types = get_ancestors(instance["type"])
269-
270-
# for each semantic type in the list
271-
for semantic_type in semantic_types:
272-
# save the semantic type in a set to avoid duplicates
273-
semantic_types.add(semantic_type)
274-
275-
# create a source prefix if it has not been encountered
276-
if source_prefixes.get(semantic_type) is None:
277-
source_prefixes[semantic_type] = {}
278-
279-
# go through each equivalent identifier in the data row
280-
# each will be assigned the semantic type information
281-
for equivalent_id in instance["identifiers"]:
282-
# split the identifier to just get the data source out of the curie
283-
source_prefix: str = equivalent_id["i"].split(":")[0]
284-
285-
# save the source prefix if no already there
286-
if source_prefixes[semantic_type].get(source_prefix) is None:
287-
source_prefixes[semantic_type][source_prefix] = 1
288-
# else just increment the count for the semantic type/source
289-
else:
290-
source_prefixes[semantic_type][source_prefix] += 1
291-
292-
# equivalent_id might be an array, where the first element is
293-
# the identifier, or it might just be a string. not worrying about that case yet.
294-
equivalent_id = equivalent_id["i"]
295-
term2id_pipeline.set(equivalent_id.upper(), identifier)
296-
# term2id_pipeline.set(equivalent_id, identifier)
297-
298-
id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"]))
299-
id2type_pipeline.set(identifier, instance["type"])
300-
301-
# if there is information content add it to the cache
302-
if "ic" in instance and instance["ic"] is not None:
303-
info_content_pipeline.set(identifier, instance["ic"])
304-
305-
if not dry_run and line_counter % block_size == 0:
306-
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
307-
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
308-
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
309-
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)
310-
311-
# Pipeline executed create a new one error
312-
term2id_pipeline = term2id_redis.pipeline()
313-
id2eqids_pipeline = id2eqids_redis.pipeline()
314-
id2type_pipeline = id2type_redis.pipeline()
315-
info_content_pipeline = info_content_redis.pipeline()
316-
317-
logger.info(f"{line_counter} {compendium_filename} lines processed")
318-
319-
if not dry_run:
236+
term2id_redis: redis_adapter.RedisConnection = await get_redis("eq_id_to_id_db")
237+
id2eqids_redis: redis_adapter.RedisConnection = await get_redis("id_to_eqids_db")
238+
id2type_redis: redis_adapter.RedisConnection = await get_redis("id_to_type_db")
239+
info_content_redis: redis_adapter.RedisConnection = await get_redis("info_content_db")
240+
241+
term2id_pipeline = term2id_redis.pipeline()
242+
id2eqids_pipeline = id2eqids_redis.pipeline()
243+
id2type_pipeline = id2type_redis.pipeline()
244+
info_content_pipeline = info_content_redis.pipeline()
245+
246+
with open(compendium_filename, "r", encoding="utf-8") as compendium:
247+
logger.info(f"Processing {compendium_filename}...")
248+
249+
# for each line in the file
250+
for line in compendium:
251+
line_counter = line_counter + 1
252+
253+
# load the line into memory
254+
instance: dict = json.loads(line)
255+
256+
# save the identifier
257+
# "The" identifier is the first one in the presorted identifiers list
258+
identifier: str = instance["identifiers"][0]["i"]
259+
260+
# We want to accumulate statistics for each implied type as well, though we are only keeping the
261+
# leaf type in the file (and redis). so now is the time to expand. We'll regenerate the same
262+
# list on output.
263+
semantic_types = get_ancestors(instance["type"])
264+
265+
# for each semantic type in the list
266+
for semantic_type in semantic_types:
267+
# save the semantic type in a set to avoid duplicates
268+
semantic_types.add(semantic_type)
269+
270+
# create a source prefix if it has not been encountered
271+
if source_prefixes.get(semantic_type) is None:
272+
source_prefixes[semantic_type] = {}
273+
274+
# go through each equivalent identifier in the data row
275+
# each will be assigned the semantic type information
276+
for equivalent_id in instance["identifiers"]:
277+
# split the identifier to just get the data source out of the curie
278+
source_prefix: str = equivalent_id["i"].split(":")[0]
279+
280+
# save the source prefix if no already there
281+
if source_prefixes[semantic_type].get(source_prefix) is None:
282+
source_prefixes[semantic_type][source_prefix] = 1
283+
# else just increment the count for the semantic type/source
284+
else:
285+
source_prefixes[semantic_type][source_prefix] += 1
286+
287+
# equivalent_id might be an array, where the first element is
288+
# the identifier, or it might just be a string. not worrying about that case yet.
289+
equivalent_id = equivalent_id["i"]
290+
term2id_pipeline.set(equivalent_id.upper(), identifier)
291+
# term2id_pipeline.set(equivalent_id, identifier)
292+
293+
id2eqids_pipeline.set(identifier, json.dumps(instance["identifiers"]))
294+
id2type_pipeline.set(identifier, instance["type"])
295+
296+
# if there is information content add it to the cache
297+
if "ic" in instance and instance["ic"] is not None:
298+
info_content_pipeline.set(identifier, instance["ic"])
299+
300+
if not dry_run and line_counter % block_size == 0:
320301
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
321302
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
322303
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
323304
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)
324305

325-
logger.info(f"{line_counter} {compendium_filename} total lines processed")
306+
# Pipeline executed create a new one error
307+
term2id_pipeline = term2id_redis.pipeline()
308+
id2eqids_pipeline = id2eqids_redis.pipeline()
309+
id2type_pipeline = id2type_redis.pipeline()
310+
info_content_pipeline = info_content_redis.pipeline()
311+
312+
logger.info(f"{line_counter} {compendium_filename} lines processed")
313+
314+
if not dry_run:
315+
await redis_adapter.RedisConnection.execute_pipeline(term2id_pipeline)
316+
await redis_adapter.RedisConnection.execute_pipeline(id2eqids_pipeline)
317+
await redis_adapter.RedisConnection.execute_pipeline(id2type_pipeline)
318+
await redis_adapter.RedisConnection.execute_pipeline(info_content_pipeline)
319+
320+
logger.info(f"{line_counter} {compendium_filename} total lines processed")
326321

327-
print(f"Done loading {compendium_filename}...")
328-
except Exception as e:
329-
logger.error(f"Exception thrown in load_compendium({compendium_filename}), line {line_counter}: {e}")
330-
return False
322+
print(f"Done loading {compendium_filename}...")
331323

332324
# return to the caller
333325
return True

node_normalizer/load_conflation.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -146,38 +146,34 @@ async def load_conflation(self, conflation: dict, block_size: int) -> bool:
146146
conflation_redis_connection_name = conflation["redis_db"]
147147
# init a line counter
148148
line_counter: int = 0
149-
try:
150-
conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name)
151-
conflation_pipeline = conflation_redis.pipeline()
149+
conflation_redis: RedisConnection = await self.get_redis(conflation_redis_connection_name)
150+
conflation_pipeline = conflation_redis.pipeline()
152151

153-
with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile:
154-
logger.info(f"Processing {conflation_file}...")
152+
with open(f"{self._conflation_directory}/{conflation_file}", "r", encoding="utf-8") as cfile:
153+
logger.info(f"Processing {conflation_file}...")
155154

156-
# for each line in the file
157-
for line in cfile:
158-
line_counter = line_counter + 1
155+
# for each line in the file
156+
for line in cfile:
157+
line_counter = line_counter + 1
159158

160-
# load the line into memory
161-
instance: dict = json.loads(line)
162-
163-
for identifier in instance:
164-
# We need to include the identifier in the list of identifiers so that we know its position
165-
conflation_pipeline.set(identifier, line)
159+
# load the line into memory
160+
instance: dict = json.loads(line)
166161

167-
if self._test_mode != 1 and line_counter % block_size == 0:
168-
await RedisConnection.execute_pipeline(conflation_pipeline)
169-
# Pipeline executed create a new one error
170-
conflation_pipeline = conflation_redis.pipeline()
171-
logger.info(f"{line_counter} {conflation_file} lines processed")
162+
for identifier in instance:
163+
# We need to include the identifier in the list of identifiers so that we know its position
164+
conflation_pipeline.set(identifier, line)
172165

173-
if self._test_mode != 1:
166+
if self._test_mode != 1 and line_counter % block_size == 0:
174167
await RedisConnection.execute_pipeline(conflation_pipeline)
175-
logger.info(f"{line_counter} {conflation_file} total lines processed")
168+
# Pipeline executed create a new one error
169+
conflation_pipeline = conflation_redis.pipeline()
170+
logger.info(f"{line_counter} {conflation_file} lines processed")
171+
172+
if self._test_mode != 1:
173+
await RedisConnection.execute_pipeline(conflation_pipeline)
174+
logger.info(f"{line_counter} {conflation_file} total lines processed")
176175

177-
print(f"Done loading {conflation_file}...")
178-
except Exception as e:
179-
logger.error(f"Exception thrown in load_conflation({conflation_file}), line {line_counter}: {e}")
180-
return False
176+
print(f"Done loading {conflation_file}...")
181177

182178
# return to the caller
183179
return True

0 commit comments

Comments
 (0)