Source code for kgx.utils.kgx_utils

import importlib
import re
import time
import uuid
import sqlite3
from enum import Enum
from functools import lru_cache
from typing import List, Dict, Set, Optional, Any, Union
import stringcase
from inflection import camelize
from linkml_runtime.linkml_model.meta import (
    TypeDefinitionName,
    EnumDefinition,
    ElementName,
    SlotDefinition,
    ClassDefinition,
    TypeDefinition,
    Element,
)
from bmt import Toolkit
from cachetools import LRUCache
import pandas as pd
import numpy as np
from prefixcommons.curie_util import contract_uri
from prefixcommons.curie_util import expand_uri
from kgx.config import get_logger, get_jsonld_context, get_biolink_model_schema
from kgx.graph.base_graph import BaseGraph

curie_lookup_service = None
cache = None

log = get_logger()

CORE_NODE_PROPERTIES = {"id", "name"}
CORE_EDGE_PROPERTIES = {"id", "subject", "predicate", "object", "type"}
XSD_STRING = "xsd:string"

tk = Toolkit()


[docs]class GraphEntityType(Enum): GRAPH = "graph" NODE = "node" EDGE = "edge"
# Biolink 2.0 "Knowledge Source" association slots, # including the deprecated 'provided_by' slot provenance_slot_types = { "knowledge_source": str, "primary_knowledge_source": str, "original_knowledge_source": str, "aggregator_knowledge_source": list, "supporting_data_source": list, "provided_by": list, } column_types = { "publications": list, "qualifiers": list, "category": list, "synonym": list, "same_as": list, "negated": bool, "xrefs": list, } column_types.update(provenance_slot_types) knowledge_provenance_properties = set(provenance_slot_types.keys()) extension_types = {"csv": ",", "tsv": "\t", "csv:neo4j": ",", "tsv:neo4j": "\t", "sql": "|"} archive_read_mode = {"tar": "r", "tar.gz": "r:gz", "tar.bz2": "r:bz2"} archive_write_mode = {"tar": "w", "tar.gz": "w:gz", "tar.bz2": "w:bz2"} archive_format = { "r": "tar", "r:gz": "tar.gz", "r:bz2": "tar.bz2", "w": "tar", "w:gz": "tar.gz", "w:bz2": "tar.bz2", } is_provenance_property_multivalued = { "knowledge_source": True, "primary_knowledge_source": False, "original_knowledge_source": False, "aggregator_knowledge_source": True, "supporting_data_source": True, "provided_by": True, } is_property_multivalued = { "id": False, "subject": False, "object": False, "predicate": False, "description": False, "synonym": True, "in_taxon": False, "same_as": True, "name": False, "has_evidence": False, "category": True, "publications": True, "type": False, "relation": False, } is_property_multivalued.update(is_provenance_property_multivalued)
[docs]def camelcase_to_sentencecase(s: str) -> str: """ Convert CamelCase to sentence case. Parameters ---------- s: str Input string in CamelCase Returns ------- str string in sentence case form """ return stringcase.sentencecase(s).lower()
[docs]def snakecase_to_sentencecase(s: str) -> str: """ Convert snake_case to sentence case. Parameters ---------- s: str Input string in snake_case Returns ------- str string in sentence case form """ return stringcase.sentencecase(s).lower()
[docs]@lru_cache(maxsize=1024) def sentencecase_to_snakecase(s: str) -> str: """ Convert sentence case to snake_case. Parameters ---------- s: str Input string in sentence case Returns ------- str string in snake_case form """ return stringcase.snakecase(s).lower()
[docs]@lru_cache(maxsize=1024) def sentencecase_to_camelcase(s: str) -> str: """ Convert sentence case to CamelCase. Parameters ---------- s: str Input string in sentence case Returns ------- str string in CamelCase form """ return camelize(stringcase.snakecase(s))
@lru_cache(maxsize=1024) def format_biolink_slots(s: str) -> str: if re.match("biolink:.+", s): return s else: formatted = sentencecase_to_snakecase(s) return f"biolink:{formatted}"
[docs]def contract( uri: str, prefix_maps: Optional[List[Dict]] = None, fallback: bool = True ) -> str: """ Contract a given URI to a CURIE, based on mappings from `prefix_maps`. If no prefix map is provided then will use defaults from prefixcommons-py. This method will return the URI as the CURIE if there is no mapping found. Parameters ---------- uri: str A URI prefix_maps: Optional[List[Dict]] A list of prefix maps to use for mapping fallback: bool Determines whether to fallback to default prefix mappings, as determined by `prefixcommons.curie_util`, when URI prefix is not found in `prefix_maps`. Returns ------- str A CURIE corresponding to the URI """ curie = uri default_curie_maps = [ get_jsonld_context("monarch_context"), get_jsonld_context("obo_context"), ] if prefix_maps: curie_list = contract_uri(uri, prefix_maps) if len(curie_list) == 0: if fallback: curie_list = contract_uri(uri, default_curie_maps) if curie_list: curie = curie_list[0] else: curie = curie_list[0] else: curie_list = contract_uri(uri, default_curie_maps) if len(curie_list) > 0: curie = curie_list[0] return curie
[docs]def expand( curie: str, prefix_maps: Optional[List[dict]] = None, fallback: bool = True ) -> str: """ Expand a given CURIE to an URI, based on mappings from `prefix_map`. This method will return the CURIE as the IRI if there is no mapping found. Parameters ---------- curie: str A CURIE prefix_maps: Optional[List[dict]] A list of prefix maps to use for mapping fallback: bool Determines whether to fallback to default prefix mappings, as determined by `prefixcommons.curie_util`, when CURIE prefix is not found in `prefix_maps`. Returns ------- str A URI corresponding to the CURIE """ default_curie_maps = [ get_jsonld_context("monarch_context"), get_jsonld_context("obo_context"), ] if prefix_maps: uri = expand_uri(curie, prefix_maps) if uri == curie and fallback: uri = expand_uri(curie, default_curie_maps) else: uri = expand_uri(curie, default_curie_maps) return uri
_default_toolkit = None _toolkit_versions: Dict[str, Toolkit] = dict()
[docs]def get_toolkit(biolink_release: Optional[str] = None) -> Toolkit: """ Get an instance of bmt.Toolkit If there no instance defined, then one is instantiated and returned. Parameters ---------- biolink_release: Optional[str] URL to (Biolink) Model Schema to be used for validated (default: None, use default Biolink Model Toolkit schema) """ global _default_toolkit, _toolkit_versions if biolink_release: if biolink_release in _toolkit_versions: toolkit = _toolkit_versions[biolink_release] else: schema = get_biolink_model_schema(biolink_release) toolkit = Toolkit(schema=schema) _toolkit_versions[biolink_release] = toolkit else: if _default_toolkit is None: _default_toolkit = Toolkit() toolkit = _default_toolkit biolink_release = toolkit.get_model_version() if biolink_release not in _toolkit_versions: _toolkit_versions[biolink_release] = toolkit return toolkit
[docs]def generate_edge_key(s: str, edge_predicate: str, o: str) -> str: """ Generates an edge key based on a given subject, predicate, and object. Parameters ---------- s: str Subject edge_predicate: str Edge label o: str Object id: str Optional identifier that is used as the key if provided Returns ------- str Edge key as a string """ return "{}-{}-{}".format(s, edge_predicate, o)
[docs]def get_curie_lookup_service(): """ Get an instance of kgx.curie_lookup_service.CurieLookupService Returns ------- kgx.curie_lookup_service.CurieLookupService An instance of ``CurieLookupService`` """ global curie_lookup_service if curie_lookup_service is None: from kgx.curie_lookup_service import CurieLookupService curie_lookup_service = CurieLookupService() return curie_lookup_service
[docs]def get_cache(maxsize=10000): """ Get an instance of cachetools.cache Parameters ---------- maxsize: int The max size for the cache (``10000``, by default) Returns ------- cachetools.cache An instance of cachetools.cache """ global cache if cache is None: cache = LRUCache(maxsize) return cache
[docs]def current_time_in_millis(): """ Get current time in milliseconds. Returns ------- int Time in milliseconds """ return int(round(time.time() * 1000))
[docs]def get_prefix_prioritization_map() -> Dict[str, List]: """ Get prefix prioritization map as defined in Biolink Model. Returns ------- Dict[str, List] """ toolkit = get_toolkit() prefix_prioritization_map = {} descendants = toolkit.get_descendants("named thing") descendants.append("named thing") for d in descendants: element = toolkit.get_element(d) if element and "id_prefixes" in element: prefixes = element.id_prefixes key = format_biolink_category(element.name) prefix_prioritization_map[key] = prefixes return prefix_prioritization_map
[docs]def get_type_for_property(p: str) -> str: """ Get type for a property. Parameters ---------- p: str Returns ------- str The type for a given property """ toolkit = get_toolkit() e = toolkit.get_element(p) t = XSD_STRING if e: if isinstance(e, ClassDefinition): t = "uriorcurie" elif isinstance(e, TypeDefinition): t = e.uri elif isinstance(e, EnumDefinition): t = "uriorcurie" else: r = e.range if isinstance(r, SlotDefinition): t = r.range t = get_type_for_property(t) elif isinstance(r, TypeDefinitionName): t = get_type_for_property(r) elif isinstance(r, ElementName): t = get_type_for_property(r) else: t = XSD_STRING if t is None: t = XSD_STRING return t
[docs]def prepare_data_dict(d1: Dict, d2: Dict, preserve: bool = True) -> Dict: """ Given two dict objects, make a new dict object that is the intersection of the two. If a key is known to be multivalued then it's value is converted to a list. If a key is already multivalued then it is updated with new values. If a key is single valued, and a new unique value is found then the existing value is converted to a list and the new value is appended to this list. Parameters ---------- d1: Dict Dict object d2: Dict Dict object preserve: bool Whether or not to preserve values for conflicting keys Returns ------- Dict The intersection of d1 and d2 """ new_data = {} for key, value in d2.items(): if isinstance(value, (list, set, tuple)): new_value = [x for x in value] else: new_value = value if key in is_property_multivalued: if is_property_multivalued[key]: # value for key is supposed to be multivalued if key in d1: # key is in data if isinstance(d1[key], (list, set, tuple)): # existing key has value type list new_data[key] = d1[key] if isinstance(new_value, (list, set, tuple)): new_data[key] += [ x for x in new_value if x not in new_data[key] ] else: if new_value not in new_data[key]: new_data[key].append(new_value) else: if key in CORE_NODE_PROPERTIES or key in CORE_EDGE_PROPERTIES: log.debug( f"cannot modify core property '{key}': {d2[key]} vs {d1[key]}" ) else: # existing key does not have value type list; converting to list new_data[key] = [d1[key]] if isinstance(new_value, (list, set, tuple)): new_data[key] += [ x for x in new_value if x not in new_data[key] ] else: if new_value not in new_data[key]: new_data[key].append(new_value) else: # key is not in data; adding if isinstance(new_value, (list, set, tuple)): new_data[key] = [x for x in new_value] else: new_data[key] = [new_value] else: # key is not multivalued; adding/replacing as-is if key in d1: if isinstance(d1[key], (list, set, tuple)): new_data[key] = d1[key] if isinstance(new_value, (list, set, tuple)): new_data[key] += [x for x in new_value] else: new_data[key].append(new_value) else: if key in CORE_NODE_PROPERTIES or key in CORE_EDGE_PROPERTIES: log.debug( f"cannot modify core property '{key}': {d2[key]} vs {d1[key]}" ) else: if preserve: new_data[key] = [d1[key]] if isinstance(new_value, (list, set, tuple)): new_data[key] += [ x for x in new_value if x not in new_data[key] ] else: new_data[key].append(new_value) else: new_data[key] = new_value else: new_data[key] = new_value else: # treating key as multivalued if key in d1: # key is in data if key in CORE_NODE_PROPERTIES or key in CORE_EDGE_PROPERTIES: log.debug( f"cannot modify core property '{key}': {d2[key]} vs {d1[key]}" ) else: if isinstance(d1[key], (list, set, tuple)): # existing key has value type list new_data[key] = d1[key] if isinstance(new_value, (list, set, tuple)): new_data[key] += [ x for x in new_value if x not in new_data[key] ] else: new_data[key].append(new_value) else: # existing key does not have value type list; converting to list if preserve: new_data[key] = [d1[key]] if isinstance(new_value, (list, set, tuple)): new_data[key] += [ x for x in new_value if x not in new_data[key] ] else: new_data[key].append(new_value) else: new_data[key] = new_value else: new_data[key] = new_value for key, value in d1.items(): if key not in new_data: new_data[key] = value return new_data
[docs]def apply_filters( graph: BaseGraph, node_filters: Dict[str, Union[str, Set]], edge_filters: Dict[str, Union[str, Set]], ) -> None: """ Apply filters to graph and remove nodes and edges that do not pass given filters. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph node_filters: Dict[str, Union[str, Set]] Node filters edge_filters: Dict[str, Union[str, Set]] Edge filters """ apply_node_filters(graph, node_filters) apply_edge_filters(graph, edge_filters)
[docs]def apply_node_filters( graph: BaseGraph, node_filters: Dict[str, Union[str, Set]] ) -> None: """ Apply filters to graph and remove nodes that do not pass given filters. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph node_filters: Dict[str, Union[str, Set]] Node filters """ nodes_to_remove = [] for node, node_data in graph.nodes(data=True): pass_filter = True for k, v in node_filters.items(): if k == "category": if not any(x in node_data[k] for x in v): pass_filter = False if not pass_filter: nodes_to_remove.append(node) for node in nodes_to_remove: # removing node that fails category filter log.debug(f"Removing node {node}") graph.remove_node(node)
[docs]def apply_edge_filters( graph: BaseGraph, edge_filters: Dict[str, Union[str, Set]] ) -> None: """ Apply filters to graph and remove edges that do not pass given filters. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph The graph edge_filters: Dict[str, Union[str, Set]] Edge filters """ edges_to_remove = [] for subject_node, object_node, key, data in graph.edges(keys=True, data=True): pass_filter = True for k, v in edge_filters.items(): if k == "predicate": if data[k] not in v: pass_filter = False elif k == "relation": if data[k] not in v: pass_filter = False if not pass_filter: edges_to_remove.append((subject_node, object_node, key)) for edge in edges_to_remove: # removing edge that fails edge filters log.debug(f"Removing edge {edge}") graph.remove_edge(edge[0], edge[1], edge[2])
[docs]def generate_uuid(): """ Generates a UUID. Returns ------- str A UUID """ return f"urn:uuid:{uuid.uuid4()}"
[docs]def generate_edge_identifiers(graph: BaseGraph): """ Generate unique identifiers for edges in a graph that do not have an ``id`` field. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph """ for u, v, data in graph.edges(data=True): if "id" not in data: data["id"] = generate_uuid()
[docs]def sanitize_import(data: Dict, list_delimiter: str=None) -> Dict: """ Sanitize key-value pairs in dictionary. This should be used to ensure proper syntax and types for node and edge data as it is imported. Parameters ---------- data: Dict A dictionary containing key-value pairs list_delimiter: str Optionally provide a delimiter character or string to be used to split strings into lists. Returns ------- Dict A dictionary containing processed key-value pairs """ tidy_data = {} for key, value in data.items(): new_value = remove_null(value) if new_value is not None: tidy_data[key] = _sanitize_import_property(key, new_value, list_delimiter) return tidy_data
@lru_cache(maxsize=1) def _get_all_multivalued_slots() -> Set[str]: return set([sentencecase_to_snakecase(x) for x in tk.get_all_multivalued_slots()]) def _sanitize_import_property(key: str, value: Any, list_delimiter: str) -> Any: """ Sanitize value for a key for the purpose of import. Casts all values to primitive types like str or bool according to the specified type in ``column_types``. If a list_delimiter is provided lists will be converted into strings using the delimiter. Parameters ---------- key: str Key corresponding to a node/edge property value: Any Value corresponding to the key Returns ------- value: Any Sanitized value """ if key in column_types: if column_types[key] == list: if isinstance(value, (list, set, tuple)): value = [ v.replace("\n", " ").replace("\t", " ") if isinstance(v, str) else v for v in value ] new_value = list(value) elif isinstance(value, str): value = value.replace("\n", " ").replace("\t", " ") new_value = [x for x in value.split(list_delimiter) if x] if list_delimiter else [value] else: new_value = [str(value).replace("\n", " ").replace("\t", " ")] # remove duplication in the list value_set: Set = set() for entry in new_value: # Make sure it's hashable if isinstance(entry, (dict, list)): # Skip unhashable types continue value_set.add(entry) new_value = sorted(list(value_set)) elif column_types[key] == bool: try: new_value = bool(value) except: new_value = False # the rest of this if/else block doesn't seem right: # it's not checking the type against the expected type even though one exists elif isinstance(value, (str, float)): new_value = value else: # we might want to raise an exception or somehow indicate a type mismatch in the input data new_value = str(value).replace("\n", " ").replace("\t", " ") else: if isinstance(value, (list, set, tuple)): value = [ v.replace("\n", " ").replace("\t", " ") if isinstance(v, str) else v for v in value ] new_value = list(value) elif isinstance(value, str): multivalued_slots = _get_all_multivalued_slots() if list_delimiter and list_delimiter in value: value = value.replace("\n", " ").replace("\t", " ") new_value = [x for x in value.split(list_delimiter) if x] elif key in multivalued_slots: new_value = [value] else: new_value = value.replace("\n", " ").replace("\t", " ") elif isinstance(value, bool): try: new_value = bool(value) except: new_value = False elif isinstance(value, (str, float)): new_value = value else: new_value = str(value).replace("\n", " ").replace("\t", " ") return new_value
[docs]def build_export_row(data: Dict, list_delimiter: str=None) -> Dict: """ Sanitize key-value pairs in dictionary. This should be used to ensure proper syntax and types for node and edge data as it is exported. Parameters ---------- data: Dict A dictionary containing key-value pairs list_delimiter: str Optionally provide a delimiter character or string to be used to convert lists into strings. Returns ------- Dict A dictionary containing processed key-value pairs """ tidy_data = {} for key, value in data.items(): new_value = remove_null(value) if new_value: tidy_data[key] = _sanitize_export_property(key, new_value, list_delimiter) return tidy_data
def _sanitize_export_property(key: str, value: Any, list_delimiter: str=None) -> Any: """ Sanitize value for a key for the purpose of export. Casts all values to primitive types like str or bool according to the specified type in ``column_types``. If a list_delimiter is provided lists will be converted into strings using the delimiter. Parameters ---------- key: str Key corresponding to a node/edge property value: Any Value corresponding to the key list_delimiter: str Optionally provide a delimiter character or string to be used to convert lists into strings. Returns ------- value: Any Sanitized value """ if key in column_types: if column_types[key] == list: if isinstance(value, (list, set, tuple)): value = [ v.replace("\n", " ").replace('\\"', "").replace("\t", " ") if isinstance(v, str) else v for v in value ] new_value = list_delimiter.join([str(x) for x in value]) if list_delimiter else value else: new_value = ( str(value).replace("\n", " ").replace('\\"', "").replace("\t", " ") ) elif column_types[key] == bool: try: new_value = bool(value) except: new_value = False else: new_value = ( str(value).replace("\n", " ").replace('\\"', "").replace("\t", " ") ) else: if type(value) == list: value = [ v.replace("\n", " ").replace('\\"', "").replace("\t", " ") if isinstance(v, str) else v for v in value ] new_value = list_delimiter.join([str(x) for x in value]) if list_delimiter else value column_types[key] = list elif type(value) == bool: try: new_value = bool(value) column_types[key] = bool # this doesn't seem right, shouldn't column_types come from the biolink model? except: new_value = False else: new_value = ( str(value).replace("\n", " ").replace('\\"', "").replace("\t", " ") ) return new_value
[docs]def remove_null(input: Any) -> Any: """ Remove any null values from input. Parameters ---------- input: Any Can be a str, list or dict Returns ------- Any The input without any null values """ new_value: Any = None if isinstance(input, (list, set, tuple)): # value is a list, set or a tuple new_value = [] for v in input: x = remove_null(v) if x: new_value.append(x) elif isinstance(input, dict): # value is a dict new_value = {} for k, v in input.items(): x = remove_null(v) if x: new_value[k] = x elif isinstance(input, str): # value is a str if not is_null(input): new_value = input else: if not is_null(input): new_value = input return new_value
[docs]def is_null(item: Any) -> bool: """ Checks if a given item is null or correspond to null. This method checks for: ``None``, ``numpy.nan``, ``pandas.NA``, ``pandas.NaT``, and ` ` Parameters ---------- item: Any The item to check Returns ------- bool Whether the given item is null or not """ null_values = {np.nan, pd.NA, pd.NaT, None, "", " "} return item in null_values
[docs]def apply_graph_operations(graph: BaseGraph, operations: List) -> None: """ Apply graph operations to a given graph. Parameters ---------- graph: kgx.graph.base_graph.BaseGraph An instance of BaseGraph operations: List A list of graph operations with configuration """ for operation in operations: op_name = operation["name"] op_args = operation["args"] module_name = ".".join(op_name.split(".")[0:-1]) function_name = op_name.split(".")[-1] f = getattr(importlib.import_module(module_name), function_name) f(graph, **op_args)
[docs]def create_connection(db_file): """ create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None """ conn = None try: conn = sqlite3.connect(db_file) except ConnectionError as e: print(e) return conn
[docs]def close_connection(conn): """ close a database connection to the SQLite database :return: None """ try: if conn: conn.close() except ConnectionError as e: print(e) return conn
def drop_existing_tables(conn): try: # Get a cursor object c = conn.cursor() # Get a list of all tables in the database c.execute("SELECT name FROM sqlite_master WHERE type='table';") table_names = [row[0] for row in c.fetchall()] # Loop through the table names and drop each table for table_name in table_names: drop_table_sql = f"DROP TABLE IF EXISTS {table_name};" c.execute(drop_table_sql) # Commit the changes and close the connection conn.commit() except sqlite3.Error as e: print(f"An error occurred while removing all tables and data from the SQLite database: {e}")