"""
KGX Source for Simple Standard for Sharing Ontology Mappings ("SSSOM")
"""
import gzip
import re
import typing
import pandas as pd
from typing import Optional, Generator, Any, Dict, Tuple
import yaml
from kgx.error_detection import ErrorType, MessageLevel
from kgx.prefix_manager import PrefixManager
from kgx.config import get_logger
from kgx.source import Source
from kgx.utils.kgx_utils import (
sanitize_import,
generate_uuid,
generate_edge_key,
)
from kgx.utils.rdf_utils import process_predicate
log = get_logger()
SSSOM_NODE_PROPERTY_MAPPING = {
"subject_id": "id",
"subject_category": "category",
"object_id": "id",
"object_category": "category",
}
[docs]class SssomSource(Source):
"""
SssomSource is responsible for reading data as records
from an SSSOM file.
"""
def __init__(self, owner):
super().__init__(owner)
self.predicate_mapping = {}
[docs] def set_prefix_map(self, m: Dict) -> None:
"""
Add or override default prefix to IRI map.
Parameters
----------
m: Dict
Prefix to IRI map
"""
self.prefix_manager.set_prefix_map(m)
[docs] def set_reverse_prefix_map(self, m: Dict) -> None:
"""
Add or override default IRI to prefix map.
Parameters
----------
m: Dict
IRI to prefix map
"""
self.prefix_manager.set_reverse_prefix_map(m)
[docs] def parse(
self,
filename: str,
format: str,
compression: Optional[str] = None,
**kwargs: Any,
) -> typing.Generator:
"""
Parse a SSSOM TSV
Parameters
----------
filename: str
File to read from
format: str
The input file format (``tsv``, by default)
compression: Optional[str]
The compression (``gz``)
kwargs: Dict
Any additional arguments
Returns
-------
Generator
A generator for node and edge records
"""
if "delimiter" not in kwargs:
kwargs["delimiter"] = "\t"
self.parse_header(filename, compression)
# SSSOM 'mapping provider' may override the default 'knowledge_source' setting?
if "mapping_provider" in self.graph_metadata:
kwargs["knowledge_source"] = self.graph_metadata["mapping_provider"]
self.set_provenance_map(kwargs)
if compression:
FH = gzip.open(filename, "rb")
else:
FH = open(filename)
file_iter = pd.read_csv(
FH,
comment="#",
dtype=str,
chunksize=10000,
low_memory=False,
keep_default_na=False,
**kwargs,
)
for chunk in file_iter:
yield from self.load_edges(chunk)
[docs] def load_node(self, node_data: Dict) -> Optional[Tuple[str, Dict]]:
"""
Load a node into an instance of BaseGraph
Parameters
----------
node_data: Dict
A node
Returns
-------
Optional[Tuple[str, Dict]]
A tuple that contains node id and node data
"""
node_data = self.validate_node(node_data)
if not node_data:
return None
node_data = sanitize_import(node_data.copy())
if "id" in node_data:
n = node_data["id"]
self.set_node_provenance(node_data)
self.node_properties.update(list(node_data.keys()))
return n, node_data
else:
self.owner.log_error(
entity=str(node_data),
error_type=ErrorType.MISSING_NODE_PROPERTY,
message="Ignoring node with no 'id'",
message_level=MessageLevel.WARNING
)
[docs] def load_edges(self, df: pd.DataFrame) -> Generator:
"""
Load edges from pandas.DataFrame into an instance of BaseGraph
Parameters
----------
df : pandas.DataFrame
Dataframe containing records that represent edges
Returns
-------
Generator
A generator for edge records
"""
for obj in df.to_dict("records"):
yield from self.load_edge(obj)
[docs] def load_edge(self, edge: Dict) -> Generator:
"""
Load an edge into an instance of BaseGraph
Parameters
----------
edge : Dict
An edge
Returns
-------
Generator
A generator for node and edge records
"""
(element_uri, canonical_uri, predicate, property_name) = process_predicate(
self.prefix_manager, edge["predicate_id"], self.predicate_mapping
)
if element_uri:
edge_predicate = element_uri
elif predicate:
edge_predicate = predicate
else:
edge_predicate = property_name
if canonical_uri:
edge_predicate = element_uri
data = {
"subject": edge["subject_id"],
"predicate": edge_predicate,
"object": edge["object_id"],
}
del edge["predicate_id"]
data = self.validate_edge(data)
if not data:
return # ?
subject_node = {}
object_node = {}
for k, v in edge.items():
if k in SSSOM_NODE_PROPERTY_MAPPING:
if k.startswith("subject"):
mapped_k = SSSOM_NODE_PROPERTY_MAPPING[k]
if mapped_k == "category" and not PrefixManager.is_curie(v):
v = f"biolink:OntologyClass"
subject_node[mapped_k] = v
elif k.startswith("object"):
mapped_k = SSSOM_NODE_PROPERTY_MAPPING[k]
if mapped_k == "category" and not PrefixManager.is_curie(v):
v = f"biolink:OntologyClass"
object_node[mapped_k] = v
else:
log.info(f"Ignoring {k} {v}")
else:
data[k] = v
subject_node = self.load_node(subject_node)
object_node = self.load_node(object_node)
if not (subject_node and object_node):
return # ?
objs = [subject_node, object_node]
for k, v in self.graph_metadata.items():
if k not in {"curie_map"}:
data[k] = v
edge_data = sanitize_import(data.copy())
if "subject" in edge_data and "object" in edge_data:
if "id" not in edge_data:
edge_data["id"] = generate_uuid()
s = edge_data["subject"]
o = edge_data["object"]
self.set_edge_provenance(edge_data)
key = generate_edge_key(s, edge_data["predicate"], o)
self.edge_properties.update(list(edge_data.keys()))
objs.append((s, o, key, edge_data))
else:
self.owner.log_error(
entity=str(edge_data),
error_type=ErrorType.MISSING_NODE,
message="Ignoring edge with either a missing 'subject' or 'object'",
message_level=MessageLevel.WARNING
)
for o in objs:
yield o