Transformer#

The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.

The Transformer supports two modes:

  • No streaming

  • Streaming

No streaming

In this mode, the Transformer reads records from a source and writes to an intermediate graph. One can then use this intermediate graph as a substrate for various graph operations.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=False)

# read from TSV
t.transform(input_args=input_args)

# The intermediate graph store can be accessed via t.store.graph

# write to JSON
t.save(output_args=output_args)

Streaming

In this mode, records are read from a source and written to sink, on-the-fly.

from kgx.transformer import Transformer

input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}

t = Transformer(stream=True)

# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)

Inspecting the Knowledge Data Flow#

Note that transform operation accepts an optional inspect Callable argument which injects node/edge data stream inspection into the Transform.process operation of Transform.transform operations. See the unit test module in the KGX project tests/integration/test_transform.py for an example of usage of this callable argument.

This feature, when coupled with the --stream and a ‘null’ Transformer Sink (i.e. output_args = {'format': 'null'}'), allows “just-in-time” processing of the nodes and edges of huge graphs without incurring a large in-memory footprint.

Provenance of Nodes and Edges#

Biolink Model 2.0 specified new properties for edge provenance to replace the (now deprecated) provided_by provenance property (the provided_by property may still be used for node annotation).

One or more of these provenance properties may optionally be inserted as dictionary entries into the input arguments to specify default global values for these properties. Such values will be used when an edge lacks an explicit provenance property. If one does not specify such a global property, then the algorithm heuristically infers and sets a default knowledge_source value.

from kgx.transformer import Transformer

input_args = {
    filename': [
            'graph_nodes.tsv', 
            'graph_edges.tsv'],
    'format': 'tsv',
    'provided_by': "My Test Source",
    'aggregator_knowledge_source': "My Test Source"
    
}

t = Transformer()

# read from TSV 
t.transform(input_args=input_args)

# use the transformed graph
t.store.graph.nodes()
t.store.graph.edges()

InfoRes Identifier Rewriting#

The provided_by and/or knowledge_source et al. field values of KGX node and edge records generally contain a name of a knowledge source for the node or edge. In some cases, (e.g. Monarch) such values in source knowledge sources could be quite verbose. To normalize such names to a concise standard, Biolink Model uses Information Resource (“InfoRes”) CURIE identifiers.

To help generate and document such InfoRes identifiers, the provenance property values may optionally trigger a rewrite of their knowledge source names to a candidate InfoRes, as follows:

  1. Setting the provenance property to a boolean *True or (case-insensitive) string “True” triggers a simple reformatting of knowledge source names into lower case alphanumeric strings removing non-alphanumeric characters and replacing space delimiting words, with hyphens.

  2. Setting the provenance property to a boolean *False or (case-insensitive) string “False” suppresses the given provenance annotation on the output graph.

  3. Providing a tuple with a single string argument not equal to True, then the string is assumed to be a standard regular expression to match against knowledge source names. If you do not provide any other string argument (see below), then a matching substring in the name triggers deletion of the matched pattern. The simple reformatting (as in 1 above) is then applied to the resulting string.

  4. Similar to 2 above, except providing a second string in the tuple which is substituted for the regular expression matched string, followed by simple reformatting.

  5. Providing a third string in the tuple to add a prefix string to the name (as a separate word) of all the generated InfoRes identifiers. Note that if one sets the first and second elements of the tuple to empty strings, the result is the simple addition of a prefix to the provenance property value. Again, the algorithm then applies the simple reformatting rules, but no other internal changes.

The unit tests provide examples of these various rewrites, in the KGX project tests/integration/test_transform.py.

The catalog of inferred InfoRes mappings onto knowledge source names is available programmatically, after completion of transform call by using the get_infores_catalog() method of the Transformer class.

kgx.transformer#

class kgx.transformer.Transformer(stream: bool = False, infores_catalog: Optional[str] = None, error_log=None)[source]#

Bases: ErrorDetecting

The Transformer class is responsible for transforming data from one form to another.

Parameters:
  • stream (bool) – Whether or not to stream (default: False)

  • infores_catalog (Optional[str]) – Optional dump of a TSV file of InfoRes CURIE to Knowledge Source mappings

  • error_log – Where to write any graph processing error message (stderr, by default).

clear_errors()#

Clears the current error log list

get_errors(level: Optional[str] = None) Dict#

Get the index list of distinct error messages.

Parameters:

level (str) – Optional filter (case insensitive) name of error message level (generally either “Error” or “Warning”)

Returns:

A raw dictionary of entities indexed by [message_level][error_type][message] or only just [error_type][message] specific to a given message level if the optional level filter is given

Return type:

Dict

get_infores_catalog()[source]#
Return catalog of Information Resource mappings

aggregated from all Transformer associated sources

get_sink(**kwargs: Dict) Sink[source]#

Get an instance of Sink that corresponds to a given format.

Parameters:

kwargs (Dict) – Arguments required for initializing an instance of Sink

Returns:

An instance of kgx.sink.Sink

Return type:

Sink

get_source(format: str) Source[source]#

Get an instance of Source that corresponds to a given format.

Parameters:

format (str) – The input store format

Returns:

An instance of kgx.source.Source

Return type:

Source

log_error(entity: str, error_type: ErrorType, message: str, message_level: MessageLevel = MessageLevel.ERROR)#

Log an error to the list of such errors.

Parameters:
  • entity – source of parse error

  • error_type – ValidationError ErrorType,

  • message – message string describing the error

  • message_level – ValidationError MessageLevel

process(source: Generator, sink: Sink) None[source]#

This method is responsible for reading from source and writing to sink by calling the relevant methods based on the incoming data.

Note

The streamed data must not be mutated.

Parameters:
  • source (Generator) – A generator from a Source

  • sink (kgx.sink.sink.Sink) – An instance of Sink

save(output_args: Dict) None[source]#

Save data from the in-memory store to a desired sink.

Parameters:

output_args (Dict) – Arguments relevant to your output sink

transform(input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[GraphEntityType, List], None]] = None) None[source]#

Transform an input source and write to an output sink.

If output_args is not defined then the data is persisted to an in-memory graph.

The ‘inspector’ argument is an optional Callable which the transformer.process() method applies to ‘inspect’ source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should not mutate the record.

Parameters:
  • input_args (Dict) – Arguments relevant to your input source

  • output_args (Optional[Dict]) – Arguments relevant to your output sink (

  • inspector (Optional[Callable[[GraphEntityType, List], None]]) – Optional Callable to ‘inspect’ source records during processing.

write_report(outstream: Optional[TextIO] = None, level: Optional[str] = None) None#

Write error get_errors to a file

Parameters:
  • outstream (TextIO) – The stream to which to write

  • level (str) – Optional filter (case insensitive) name of error message level (generally either “Error” or “Warning”)