Sink#
A Sink can be implemented for any file, local, and/or remote store to which a graph can be written to. A Sink is responsible for writing nodes and edges from a graph.
A Sink must subclass kgx.sink.sink.Sink
class and must implement the following methods:
__init__
write_nodes
write_edges
finalize
__init__
method#
The __init__
method is used to instantiate a Sink with configurations required for writing to a store.
In the case of files, the
__init__
method will take thefilename
andformat
as argumentsIn the case of a graph store like Neo4j, the
__init__
method will take theuri
,username
, andpassword
as arguments.
The __init__
method also has an optional kwargs
argument which can be used to supply variable number of arguments to this method, depending on the requirements for the store for which the Sink is being implemented.
write_nodes
method#
Responsible for receiving a node record and writing to a file/store
write_edges
method#
Responsible for receiving an edge record and writing to a file/store
finalize
method#
Any operation that needs to be performed after writing all the nodes and edges to a file/store must be defined in this method.
For example,
kgx.source.tsv_source.TsvSource
has afinalize
method that closes the file handles and creates an archive, if compression is desiredkgx.source.neo_sink.NeoSink
has afinalize
method that writes any cached node and edge records
kgx.sink.sink#
Base class for all Sinks in KGX.
- class kgx.sink.sink.Sink(owner)[source]#
Bases:
object
A Sink is responsible for writing data as records to a store where the store is a file or a database.
Parameters:#
- param owner:
Transformer Transformer to which the GraphSink belongs
- finalize() None [source]#
Operations that ought to be done after writing all the incoming data should be called by this method.
- set_reverse_prefix_map(m: Dict) None [source]#
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.graph_sink#
GraphSink
is responsible for writing to an instance of kgx.graph.base_graph.BaseGraph
and must use only
the methods exposed by BaseGraph
to access the graph.
- class kgx.sink.graph_sink.GraphSink(owner)[source]#
Bases:
Sink
GraphSink is responsible for writing data as records to an in memory graph representation.
The underlying store is determined by the graph store class defined in config (
kgx.graph.nx_graph.NxGraph
, by default).- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.tsv_sink#
TsvSink
is responsible for writing a KGX formatted CSV or TSV using Pandas.
KGX writes two separate files - one for nodes and another for edges.
- class kgx.sink.tsv_sink.TsvSink(owner, filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]#
Bases:
Sink
TsvSink is responsible for writing data as records to a TSV/CSV.
- Parameters:
owner (Transformer) – Transformer to which the GraphSink belongs
filename (str) – The filename to write to
format (str) – The file format (
tsv
,csv
)compression (str) – The compression type (
tar
,tar.gz
)kwargs (Any) – Any additional arguments
- set_edge_properties(edge_properties: List) None [source]#
Update edge properties index with a given list.
- Parameters:
edge_properties (List) – A list of edge properties
- set_node_properties(node_properties: List) None [source]#
Update node properties index with a given list.
- Parameters:
node_properties (List) – A list of node properties
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.json_sink#
JsonSink
is responsible for writing a KGX formatted JSON using the jsonstreams
library, which allows for streaming records to the file.
- class kgx.sink.json_sink.JsonSink(owner, filename: str, format: str = 'json', compression: Optional[str] = None, **kwargs: Any)[source]#
Bases:
Sink
JsonSink is responsible for writing data as records to a JSON.
- Parameters:
wner (Transformer) – Transformer to which the GraphSink belongs
filename (str) – The filename to write to
format (str) – The file format (
json
)compression (Optional[str]) – The compression type (
gz
)kwargs (Any) – Any additional arguments
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.jsonl_sink#
JsonlSink
is responsible for writing a KGX formatted JSON Lines using the
jsonlines library.
KGX writes two separate JSON Lines files - one for nodes and another for edges.
KGX JSON Lines Format Specification#
The JSON Lines format provides a simple and efficient way to represent KGX data where each line contains a single JSON object representing either a node or an edge. This format combines the advantages of JSON (flexible schema, native support for lists and nested objects) with the streaming capabilities of line-oriented formats.
File Structure#
{filename}_nodes.jsonl
: Contains one node per line, each as a complete JSON object{filename}_edges.jsonl
: Contains one edge per line, each as a complete JSON object
Node Record Format#
Required Properties#
id
(string): A CURIE that uniquely identifies the node in the graphcategory
(array of strings): List of Biolink categories for the node, from the NamedThing hierarchy
Common Optional Properties#
name
(string): Human-readable name of the entitydescription
(string): Human-readable description of the entityprovided_by
(array of strings): List of sources that provided this nodexref
(array of strings): List of database cross-references as CURIEssynonym
(array of strings): List of alternative names for the entity
Edge Record Format#
Required Properties#
subject
(string): CURIE of the source nodepredicate
(string): Biolink predicate representing the relationship typeobject
(string): CURIE of the target nodeknowledge_level
(string): Level of knowledge representation (observation, assertion, concept, statement) according to Biolink Modelagent_type
(string): Autonomous agents for edges (informational, computational, biochemical, biological) according to Biolink Model
Common Optional Properties#
id
(string): Unique identifier for the edge, often a UUIDrelation
(string): Relation CURIE from a formal relation ontology (e.g., RO)category
(array of strings): List of Biolink association categoriesknowledge_source
(array of strings): Sources of knowledge (deprecated:provided_by
)primary_knowledge_source
(array of strings): Primary knowledge sourcesaggregator_knowledge_source
(array of strings): Knowledge aggregator sourcespublications
(array of strings): List of publication CURIEs supporting the edge
Examples#
Node Example (nodes.jsonl):
Each line in a nodes.jsonl file represents a complete node record. Here are examples of different node types:
{
"id": "HGNC:11603",
"name": "TBX4",
"category": [
"biolink:Gene"
]
},
{
"id": "MONDO:0005002",
"name": "chronic obstructive pulmonary disease",
"category": [
"biolink:Disease"
]
},
{
"id": "CHEBI:15365",
"name": "acetaminophen",
"category": [
"biolink:SmallMolecule",
"biolink:ChemicalEntity"
]
}
In the actual jsonlines file, each record would be on a single line without comments and formatting:
{"id":"HGNC:11603","name":"TBX4","category":["biolink:Gene"]}
{"id":"MONDO:0005002","name":"chronic obstructive pulmonary disease","category":["biolink:Disease"]}
{"id":"CHEBI:15365","name":"acetaminophen","category":["biolink:SmallMolecule","biolink:ChemicalEntity"]}
Edge Example (edges.jsonl):
Each line in a jsonlines file represents a complete edge record. Here are examples of different edge types:
{
"id": "a8575c4e-61a6-428a-bf09-fcb3e8d1644d",
"subject": "HGNC:11603",
"object": "MONDO:0005002",
"predicate": "biolink:contributes_to",
"knowledge_level": "assertion",
"agent_type": "computational"
}
{
"id": "urn:uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e",
"subject": "HGNC:11603",
"predicate": "biolink:contributes_to",
"object": "MONDO:0005002",
"category": [
"biolink:GeneToDiseaseAssociation"
],
"primary_knowledge_source": [
"infores:agr"
],
"aggregator_knowledge_source": [
"infores:monarchinitiative"
],
"publications": [
"PMID:26634245",
"PMID:26634244"
],
"knowledge_level": "manual_assertion",
"agent_type": ""
}
{
"id": "c7d632b4-6708-4296-9cfe-44bc586d32c8",
"subject": "CHEBI:15365",
"predicate": "biolink:affects",
"qualified_predicate": "biolink:causes",
"object": "HGNC:11603",
"object_aspect_qualifier": "biolink:expression",
"object_direction_qualifier": "biolink:increased",
"category": [
"biolink:ChemicalAffectsGeneAssociation"
],
"primary_knowledge_source": [
"infores:ctd"
],
"aggregator_knowledge_source": [
"infores:monarchinitiative"
],
"publications": [
"PMID:12345678"
],
"knowledge_level": "assertion",
"agent_type": "computational"
}
In the actual jsonlines file, each record would be on a single line without comments and formatting:
{"id":"a8575c4e-61a6-428a-bf09-fcb3e8d1644d","subject":"HGNC:11603","object":"MONDO:0005002","predicate":"biolink:related_to","relation":"RO:0003304","knowledge_level":"assertion","agent_type":"computational"}
{"id":"urn:uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e","subject":"HGNC:11603","predicate":"biolink:contributes_to","object":"MONDO:0005002","relation":"RO:0003304","category":["biolink:GeneToDiseaseAssociation"],"primary_knowledge_source":["infores:gwas-catalog"],"publications":["PMID:26634245","PMID:26634244"],"knowledge_level":"observation","agent_type":"biological"}
Usage Notes#
All field values should follow the KGX specification and Biolink Model requirements
Arrays should be represented as JSON arrays (not pipe-delimited strings)
For large KGs, JSON Lines offers better streaming performance than monolithic JSON
- class kgx.sink.jsonl_sink.JsonlSink(owner, filename: str, format: str = 'jsonl', compression: Optional[str] = None, **kwargs: Any)[source]#
Bases:
Sink
JsonlSink is responsible for writing data as records to JSON lines.
- Parameters:
owner (Transformer) – Transformer to which the GraphSink belongs
filename (str) – The filename to write to
format (str) – The file format (
jsonl
)compression (Optional[str]) – The compression type (
gz
)kwargs (Any) – Any additional arguments
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.neo_sink#
NeoSink
is responsible for writing data to a local or remote Neo4j instance.
- class kgx.sink.neo_sink.NeoSink(owner, uri: str, username: str, password: str, **kwargs: Any)[source]#
Bases:
Sink
NeoSink is responsible for writing data as records to a Neo4j instance.
- Parameters:
owner (Transformer) – Transformer to which the GraphSink belongs
uri (str) – The URI for the Neo4j instance. For example, http://localhost:7474
username (str) – The username
password (str) – The password
kwargs (Any) – Any additional arguments
- create_constraints(categories: Union[set, list]) None [source]#
Create a unique constraint on node ‘id’ for all
categories
in Neo4j.
- static generate_unwind_edge_query(edge_predicate: str) str [source]#
Generate UNWIND cypher query for saving edges into Neo4j.
Query uses
self.DEFAULT_NODE_CATEGORY
to quickly lookup the required subject and object node.
- static generate_unwind_node_query(category: str) str [source]#
Generate UNWIND cypher query for saving nodes into Neo4j.
There should be a CONSTRAINT in Neo4j for
self.DEFAULT_NODE_CATEGORY
. The query usesself.DEFAULT_NODE_CATEGORY
as the node label to increase speed for adding nodes. The query also sets label toself.DEFAULT_NODE_CATEGORY
for any node to make sure that the CONSTRAINT applies.
- static sanitize_category(category: List) List [source]#
Sanitize category for use in UNWIND cypher clause. This method adds escape characters to each element in category list to ensure the category is processed correctly.
- Parameters:
category (List) – Category
- Returns:
Sanitized category list
- Return type:
List
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
kgx.sink.rdf_sink#
RdfSink
is responsible for writing data as RDF N-Triples.
- class kgx.sink.rdf_sink.RdfSink(owner, filename: str, format: str = 'nt', compression: Optional[bool] = None, reify_all_edges: bool = True, **kwargs: Any)[source]#
Bases:
Sink
RdfSink is responsible for writing data as records to an RDF serialization.
Note
Currently only RDF N-Triples serialization is supported.
- Parameters:
owner (Transformer) – Transformer to which the GraphSink belongs
filename (str) – The filename to write to
format (str) – The file format (
nt
)compression (str) – The compression type (
gz
)reify_all_edges (bool) – Whether or not to reify all the edges
kwargs (Any) – Any additional arguments
- get_biolink_element(predicate: Any) Optional[Element] [source]#
Returns a Biolink Model element for a given predicate.
- Parameters:
predicate (Any) – The CURIE of a predicate
- Returns:
The corresponding Biolink Model element
- Return type:
Optional[Element]
- process_predicate(p: Optional[Union[URIRef, str]]) Tuple [source]#
Process a predicate where the method checks if there is a mapping in Biolink Model.
- Parameters:
p (Optional[Union[URIRef, str]]) – The predicate
- Returns:
A tuple that contains the Biolink CURIE (if available), the Biolink slot_uri CURIE (if available), the CURIE form of p, the reference of p
- Return type:
Tuple
- set_property_types(m: Dict) None [source]#
Set export type for properties that are not in Biolink Model.
- Parameters:
m (Dict) – A dictionary where the keys are property names and values are their corresponding types.
- set_reverse_predicate_mapping(m: Dict) None [source]#
Set reverse predicate mappings.
Use this method to update mappings for predicates that are not in Biolink Model.
- Parameters:
m (Dict) – A dictionary where the keys are property names and values are their corresponding IRI.
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings
- uriref(identifier: str) URIRef [source]#
Generate a rdflib.URIRef for a given string.
- Parameters:
identifier (str) – Identifier as string.
- Returns:
URIRef form of the input
identifier
- Return type:
rdflib.URIRef
kgx.sink.parquet_sink#
ParquetSink
is responsible for writing data as Parquet table files.
KGX writes two separate files - one for nodes and another for edges.
Sink for Parquet format.
- class kgx.sink.parquet_sink.ParquetSink(owner, filename: str, **kwargs: Any)[source]#
Bases:
Sink
A ParquetSink writes data to Parquet files.
- Parameters:
owner (Transformer) – Transformer to which the ParquetSink belongs
filename (str) – Name of the Parquet file to write to
kwargs (Any) – Any additional arguments
- set_reverse_prefix_map(m: Dict) None #
Update default reverse prefix map.
- Parameters:
m (Dict) – A dictionary with IRI to prefix mappings