Source code for kgx.transformer

import itertools
import os
from os.path import exists
from sys import stderr
from typing import Dict, Generator, List, Optional, Callable, Set

from kgx.config import get_logger
from kgx.error_detection import ErrorType, MessageLevel, ErrorDetecting
from kgx.source import (
    GraphSource,
    Source,
    TsvSource,
    JsonSource,
    JsonlSource,
    ObographSource,
    TrapiSource,
    NeoSource,
    RdfSource,
    OwlSource,
    SssomSource,
)
from kgx.sink import (
    Sink,
    GraphSink,
    JsonSink,
    JsonlSink,
    NeoSink,
    NullSink,
    RdfSink,
    SqlSink,
    TsvSink,
    ParquetSink
)
from kgx.utils.kgx_utils import (
    apply_graph_operations,
    GraphEntityType,
    knowledge_provenance_properties,
)

SOURCE_MAP = {
    "tsv": TsvSource,
    "csv": TsvSource,
    "graph": GraphSource,
    "json": JsonSource,
    "jsonl": JsonlSource,
    "obojson": ObographSource,
    "obo-json": ObographSource,
    "trapi-json": TrapiSource,
    "neo4j": NeoSource,
    "nt": RdfSource,
    "owl": OwlSource,
    "sssom": SssomSource,
    "parquet": GraphSource,
}

SINK_MAP = {
    "csv": TsvSink,
    "graph": GraphSink,
    "json": JsonSink,
    "jsonl": JsonlSink,
    "neo4j": NeoSink,
    "nt": RdfSink,
    "null": NullSink,
    "sql": SqlSink,
    "tsv": TsvSink,
    "parquet": ParquetSink,
}


log = get_logger()


[docs]class Transformer(ErrorDetecting): """ The Transformer class is responsible for transforming data from one form to another. Parameters ---------- stream: bool Whether or not to stream (default: False) infores_catalog: Optional[str] Optional dump of a TSV file of InfoRes CURIE to Knowledge Source mappings error_log: Where to write any graph processing error message (stderr, by default). """ def __init__( self, stream: bool = False, infores_catalog: Optional[str] = None, error_log=None ): """ stream: bool Whether or not to stream infores_catalog: Optional[str] Optional dump of a TSV file of InfoRes CURIE to Knowledge Source mappings error_log: Where to write any graph processing error message (stderr, by default). """ ErrorDetecting.__init__(self, error_log) self.stream = stream self.node_filters = {} self.edge_filters = {} self.inspector: Optional[Callable[[GraphEntityType, List], None]] = None self.store = self.get_source("graph") self._seen_nodes = set() self._infores_catalog: Dict[str, str] = dict() if infores_catalog and exists(infores_catalog): with open(infores_catalog, "r") as irc: for entry in irc: if len(entry): entry = entry.strip() if entry: source, infores = entry.split("\t") self._infores_catalog[source] = infores
[docs] def transform( self, input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[GraphEntityType, List], None]] = None, ) -> None: """ Transform an input source and write to an output sink. If ``output_args`` is not defined then the data is persisted to an in-memory graph. The 'inspector' argument is an optional Callable which the transformer.process() method applies to 'inspect' source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should *not* mutate the record. Parameters ---------- input_args: Dict Arguments relevant to your input source output_args: Optional[Dict] Arguments relevant to your output sink ( inspector: Optional[Callable[[GraphEntityType, List], None]] Optional Callable to 'inspect' source records during processing. """ sources = [] generators = [] input_format = input_args["format"] prefix_map = input_args.pop("prefix_map", {}) predicate_mappings = input_args.pop("predicate_mappings", {}) node_property_predicates = input_args.pop("node_property_predicates", {}) node_filters = input_args.pop("node_filters", {}) edge_filters = input_args.pop("edge_filters", {}) operations = input_args.pop("operations", []) # Optional process() data stream inspector self.inspector = inspector if input_format in {"neo4j", "graph"}: source = self.get_source(input_format) source.set_prefix_map(prefix_map) source.set_node_filters(node_filters) self.node_filters = source.node_filters self.edge_filters = source.edge_filters source.set_edge_filters(edge_filters) self.node_filters = source.node_filters self.edge_filters = source.edge_filters if "uri" in input_args: default_provenance = input_args["uri"] else: default_provenance = None g = source.parse(default_provenance=default_provenance, **input_args) sources.append(source) generators.append(g) else: filename = input_args.pop("filename", {}) for f in filename: source = self.get_source(input_format) source.set_prefix_map(prefix_map) if isinstance(source, RdfSource): source.set_predicate_mapping(predicate_mappings) source.set_node_property_predicates(node_property_predicates) source.set_node_filters(node_filters) self.node_filters = source.node_filters self.edge_filters = source.edge_filters source.set_edge_filters(edge_filters) self.node_filters = source.node_filters self.edge_filters = source.edge_filters default_provenance = os.path.basename(f) g = source.parse(f, default_provenance=default_provenance, **input_args) sources.append(source) generators.append(g) source_generator = itertools.chain(*generators) if output_args: if self.stream: if output_args["format"] in {"tsv", "csv"}: if "node_properties" not in output_args or "edge_properties" not in output_args: error_type = ErrorType.MISSING_PROPERTY self.log_error( entity=f"{output_args['format']} stream", error_type=error_type, message=f"'node_properties' and 'edge_properties' must be defined for output while" f"streaming. The exported format will be limited to a subset of the columns.", message_level=MessageLevel.WARNING ) sink = self.get_sink(**output_args) if "reverse_prefix_map" in output_args: sink.set_reverse_prefix_map(output_args["reverse_prefix_map"]) if isinstance(sink, RdfSink): if "reverse_predicate_mapping" in output_args: sink.set_reverse_predicate_mapping( output_args["reverse_predicate_mapping"] ) if "property_types" in output_args: sink.set_property_types(output_args["property_types"]) # stream from source to sink self.process(source_generator, sink) sink.finalize() else: # stream from source to intermediate intermediate_sink = GraphSink(self) intermediate_sink.node_properties.update(self.store.node_properties) intermediate_sink.edge_properties.update(self.store.edge_properties) self.process(source_generator, intermediate_sink) for s in sources: intermediate_sink.node_properties.update(s.node_properties) intermediate_sink.edge_properties.update(s.edge_properties) apply_graph_operations(intermediate_sink.graph, operations) # stream from intermediate to output sink intermediate_source = self.get_source("graph") intermediate_source.node_properties.update( intermediate_sink.node_properties ) intermediate_source.edge_properties.update( intermediate_sink.edge_properties ) # Need to propagate knowledge source specifications here? ks_args = dict() for ksf in knowledge_provenance_properties: if ksf in input_args: ks_args[ksf] = input_args[ksf] intermediate_source_generator = intermediate_source.parse( intermediate_sink.graph, **ks_args ) if output_args["format"] in {"tsv", "csv"}: if "node_properties" not in output_args: output_args[ "node_properties" ] = intermediate_source.node_properties log.debug("output_args['node_properties']: " + str(output_args["node_properties"]), file=stderr) if "edge_properties" not in output_args: output_args[ "edge_properties" ] = intermediate_source.edge_properties sink = self.get_sink(**output_args) if "reverse_prefix_map" in output_args: sink.set_reverse_prefix_map(output_args["reverse_prefix_map"]) if isinstance(sink, RdfSink): if "reverse_predicate_mapping" in output_args: sink.set_reverse_predicate_mapping( output_args["reverse_predicate_mapping"] ) if "property_types" in output_args: sink.set_property_types(output_args["property_types"]) else: sink = self.get_sink(**output_args) sink.node_properties.update(intermediate_source.node_properties) sink.edge_properties.update(intermediate_source.edge_properties) self.process(intermediate_source_generator, sink) sink.finalize() self.store.node_properties.update(sink.node_properties) self.store.edge_properties.update(sink.edge_properties) else: # stream from source to intermediate sink = GraphSink(self) self.process(source_generator, sink) sink.node_properties.update(self.store.node_properties) sink.edge_properties.update(self.store.edge_properties) for s in sources: sink.node_properties.update(s.node_properties) sink.edge_properties.update(s.edge_properties) sink.finalize() self.store.node_properties.update(sink.node_properties) self.store.edge_properties.update(sink.edge_properties) apply_graph_operations(sink.graph, operations) # Aggregate the InfoRes catalogs from all sources for s in sources: for k, v in s.get_infores_catalog().items(): self._infores_catalog[k] = v
[docs] def get_infores_catalog(self): """ Return catalog of Information Resource mappings aggregated from all Transformer associated sources """ return self._infores_catalog
[docs] def process(self, source: Generator, sink: Sink) -> None: """ This method is responsible for reading from ``source`` and writing to ``sink`` by calling the relevant methods based on the incoming data. .. note:: The streamed data must not be mutated. Parameters ---------- source: Generator A generator from a Source sink: kgx.sink.sink.Sink An instance of Sink """ for rec in source: if rec: log.debug("length of rec", len(rec), "rec", rec) if len(rec) == 4: # infer an edge record write_edge = True if "subject_category" in self.edge_filters: if rec[0] in self._seen_nodes: write_edge = True else: write_edge = False if "object_category" in self.edge_filters: if rec[1] in self._seen_nodes: if "subject_category" in self.edge_filters: if write_edge: write_edge = True else: write_edge = True else: write_edge = False if write_edge: if self.inspector: self.inspector(GraphEntityType.EDGE, rec) sink.write_edge(rec[-1]) else: # infer a node record if "category" in self.node_filters: self._seen_nodes.add(rec[0]) if self.inspector: self.inspector(GraphEntityType.NODE, rec) # last element of rec is the node properties sink.write_node(rec[-1])
[docs] def save(self, output_args: Dict) -> None: """ Save data from the in-memory store to a desired sink. Parameters ---------- output_args: Dict Arguments relevant to your output sink """ if not self.store: raise Exception("self.store is empty.") source = self.store source.node_properties.update(self.store.node_properties) source.edge_properties.update(self.store.edge_properties) source_generator = source.parse(self.store.graph) if "node_properties" not in output_args: output_args["node_properties"] = source.node_properties if "edge_properties" not in output_args: output_args["edge_properties"] = source.edge_properties sink = self.get_sink(**output_args) sink.node_properties.update(source.node_properties) sink.edge_properties.update(source.edge_properties) if "reverse_prefix_map" in output_args: sink.set_reverse_prefix_map(output_args["reverse_prefix_map"]) if isinstance(sink, RdfSink): if "reverse_predicate_mapping" in output_args: sink.set_reverse_predicate_mapping( output_args["reverse_predicate_mapping"] ) if "property_types" in output_args: sink.set_property_types(output_args["property_types"]) self.process(source_generator, sink) sink.finalize()
[docs] def get_source(self, format: str) -> Source: """ Get an instance of Source that corresponds to a given format. Parameters ---------- format: str The input store format Returns ------- Source: An instance of kgx.source.Source """ if format in SOURCE_MAP: s = SOURCE_MAP[format] return s(self) else: raise TypeError(f"{format} in an unrecognized format")
[docs] def get_sink(self, **kwargs: Dict) -> Sink: """ Get an instance of Sink that corresponds to a given format. Parameters ---------- kwargs: Dict Arguments required for initializing an instance of Sink Returns ------- Sink: An instance of kgx.sink.Sink """ if kwargs["format"] in SINK_MAP: s = SINK_MAP[kwargs["format"]] return s(self, **kwargs) else: raise TypeError(f"{kwargs['format']} in an unrecognized format")