Transformer#
The Transformer class is responsible for connecting a source to a sink where records are read from the source and written to a sink.
The Transformer supports two modes:
No streaming
Streaming
No streaming
In this mode, the Transformer reads records from a source and writes to an intermediate graph. One can then use this intermediate graph as a substrate for various graph operations.
from kgx.transformer import Transformer
input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}
t = Transformer(stream=False)
# read from TSV
t.transform(input_args=input_args)
# The intermediate graph store can be accessed via t.store.graph
# write to JSON
t.save(output_args=output_args)
Streaming
In this mode, records are read from a source and written to sink, on-the-fly.
from kgx.transformer import Transformer
input_args = {'filename': ['graph_nodes.tsv', 'graph_edges.tsv'], 'format': 'tsv'}
output_args = {'filename': 'graph.json', 'format': 'json'}
t = Transformer(stream=True)
# read from TSV and write to JSON
t.transform(input_args=input_args, output_args=output_args)
Inspecting the Knowledge Data Flow#
Note that transform
operation accepts an optional inspect Callable argument which injects node/edge data stream inspection into the Transform.process
operation of Transform.transform
operations. See the unit test module in the KGX project tests/integration/test_transform.py for an example of usage of this callable argument.
This feature, when coupled with the --stream
and a ‘null’ Transformer Sink (i.e. output_args = {'format': 'null'}'
), allows “just-in-time” processing of the nodes and edges of huge graphs without incurring a large in-memory footprint.
Provenance of Nodes and Edges#
Biolink Model 2.0 specified new properties for edge provenance to replace the (now deprecated) provided_by
provenance property (the provided_by
property may still be used for node annotation).
One or more of these provenance properties may optionally be inserted as dictionary entries into the input arguments to specify default global values for these properties. Such values will be used when an edge lacks an explicit provenance property. If one does not specify such a global property, then the algorithm heuristically infers and sets a default knowledge_source
value.
from kgx.transformer import Transformer
input_args = {
filename': [
'graph_nodes.tsv',
'graph_edges.tsv'],
'format': 'tsv',
'provided_by': "My Test Source",
'aggregator_knowledge_source': "My Test Source"
}
t = Transformer()
# read from TSV
t.transform(input_args=input_args)
# use the transformed graph
t.store.graph.nodes()
t.store.graph.edges()
InfoRes Identifier Rewriting#
The provided_by
and/or knowledge_source
et al. field values of KGX node and edge records generally contain a name
of a knowledge source for the node or edge. In some cases, (e.g. Monarch) such values in source knowledge sources
could be quite verbose. To normalize such names to a concise standard, Biolink Model uses
Information Resource (“InfoRes”) CURIE identifiers.
To help generate and document such InfoRes identifiers, the provenance property values may optionally trigger a rewrite of their knowledge source names to a candidate InfoRes, as follows:
Setting the provenance property to a boolean *True or (case-insensitive) string “True” triggers a simple reformatting of knowledge source names into lower case alphanumeric strings removing non-alphanumeric characters and replacing space delimiting words, with hyphens.
Setting the provenance property to a boolean *False or (case-insensitive) string “False” suppresses the given provenance annotation on the output graph.
Providing a tuple with a single string argument not equal to True, then the string is assumed to be a standard regular expression to match against knowledge source names. If you do not provide any other string argument (see below), then a matching substring in the name triggers deletion of the matched pattern. The simple reformatting (as in 1 above) is then applied to the resulting string.
Similar to 2 above, except providing a second string in the tuple which is substituted for the regular expression matched string, followed by simple reformatting.
Providing a third string in the tuple to add a prefix string to the name (as a separate word) of all the generated InfoRes identifiers. Note that if one sets the first and second elements of the tuple to empty strings, the result is the simple addition of a prefix to the provenance property value. Again, the algorithm then applies the simple reformatting rules, but no other internal changes.
The unit tests provide examples of these various rewrites, in the KGX project tests/integration/test_transform.py.
The catalog of inferred InfoRes mappings onto knowledge source names is available programmatically, after completion
of transform call by using the get_infores_catalog()
method of the Transformer class.
kgx.transformer#
- class kgx.transformer.Transformer(stream: bool = False, infores_catalog: Optional[str] = None, error_log=None)[source]#
Bases:
ErrorDetecting
The Transformer class is responsible for transforming data from one form to another.
- Parameters:
- clear_errors()#
Clears the current error log list
- get_errors(level: Optional[str] = None) Dict #
Get the index list of distinct error messages.
- Parameters:
level (str) – Optional filter (case insensitive) name of error message level (generally either “Error” or “Warning”)
- Returns:
A raw dictionary of entities indexed by [message_level][error_type][message] or only just [error_type][message] specific to a given message level if the optional level filter is given
- Return type:
Dict
- get_infores_catalog()[source]#
- Return catalog of Information Resource mappings
aggregated from all Transformer associated sources
- get_sink(**kwargs: Dict) Sink [source]#
Get an instance of Sink that corresponds to a given format.
- Parameters:
kwargs (Dict) – Arguments required for initializing an instance of Sink
- Returns:
An instance of kgx.sink.Sink
- Return type:
- get_source(format: str) Source [source]#
Get an instance of Source that corresponds to a given format.
- log_error(entity: str, error_type: ErrorType, message: str, message_level: MessageLevel = MessageLevel.ERROR)#
Log an error to the list of such errors.
- Parameters:
entity – source of parse error
error_type – ValidationError ErrorType,
message – message string describing the error
message_level – ValidationError MessageLevel
- process(source: Generator, sink: Sink) None [source]#
This method is responsible for reading from
source
and writing tosink
by calling the relevant methods based on the incoming data.Note
The streamed data must not be mutated.
- Parameters:
source (Generator) – A generator from a Source
sink (kgx.sink.sink.Sink) – An instance of Sink
- save(output_args: Dict) None [source]#
Save data from the in-memory store to a desired sink.
- Parameters:
output_args (Dict) – Arguments relevant to your output sink
- transform(input_args: Dict, output_args: Optional[Dict] = None, inspector: Optional[Callable[[GraphEntityType, List], None]] = None) None [source]#
Transform an input source and write to an output sink.
If
output_args
is not defined then the data is persisted to an in-memory graph.The ‘inspector’ argument is an optional Callable which the transformer.process() method applies to ‘inspect’ source records prior to writing them out to the Sink. The first (GraphEntityType) argument of the Callable tags the record as a NODE or an EDGE. The second argument given to the Callable is the current record itself. This Callable is strictly meant to be procedural and should not mutate the record.
- Parameters:
input_args (Dict) – Arguments relevant to your input source
output_args (Optional[Dict]) – Arguments relevant to your output sink (
inspector (Optional[Callable[[GraphEntityType, List], None]]) – Optional Callable to ‘inspect’ source records during processing.