Sink#

A Sink can be implemented for any file, local, and/or remote store to which a graph can be written to. A Sink is responsible for writing nodes and edges from a graph.

A Sink must subclass kgx.sink.sink.Sink class and must implement the following methods:

  • __init__

  • write_nodes

  • write_edges

  • finalize

__init__ method#

The __init__ method is used to instantiate a Sink with configurations required for writing to a store.

  • In the case of files, the __init__ method will take the filename and format as arguments

  • In the case of a graph store like Neo4j, the __init__ method will take the uri, username, and password as arguments.

The __init__ method also has an optional kwargs argument which can be used to supply variable number of arguments to this method, depending on the requirements for the store for which the Sink is being implemented.

write_nodes method#

  • Responsible for receiving a node record and writing to a file/store

write_edges method#

  • Responsible for receiving an edge record and writing to a file/store

finalize method#

Any operation that needs to be performed after writing all the nodes and edges to a file/store must be defined in this method.

For example,

  • kgx.source.tsv_source.TsvSource has a finalize method that closes the file handles and creates an archive, if compression is desired

  • kgx.source.neo_sink.NeoSink has a finalize method that writes any cached node and edge records

kgx.sink.sink#

Base class for all Sinks in KGX.

class kgx.sink.sink.Sink(owner)[source]#

Bases: object

A Sink is responsible for writing data as records to a store where the store is a file or a database.

Parameters:#

param owner:

Transformer Transformer to which the GraphSink belongs

finalize() None[source]#

Operations that ought to be done after writing all the incoming data should be called by this method.

set_reverse_prefix_map(m: Dict) None[source]#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record) None[source]#

Write an edge record to the underlying store.

Parameters:

record (Any) – An edge record

write_node(record) None[source]#

Write a node record to the underlying store.

Parameters:

record (Any) – A node record

kgx.sink.graph_sink#

GraphSink is responsible for writing to an instance of kgx.graph.base_graph.BaseGraph and must use only the methods exposed by BaseGraph to access the graph.

class kgx.sink.graph_sink.GraphSink(owner)[source]#

Bases: Sink

GraphSink is responsible for writing data as records to an in memory graph representation.

The underlying store is determined by the graph store class defined in config (kgx.graph.nx_graph.NxGraph, by default).

finalize() None[source]#

Perform any operations after writing nodes and edges to graph.

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) None[source]#

Write an edge record to graph.

Parameters:

record (Dict) – An edge record

write_node(record: Dict) None[source]#

Write a node record to graph.

Parameters:

record (Dict) – A node record

kgx.sink.tsv_sink#

TsvSink is responsible for writing a KGX formatted CSV or TSV using Pandas.

KGX writes two separate files - one for nodes and another for edges.

class kgx.sink.tsv_sink.TsvSink(owner, filename: str, format: str, compression: Optional[str] = None, **kwargs: Any)[source]#

Bases: Sink

TsvSink is responsible for writing data as records to a TSV/CSV.

Parameters:
  • owner (Transformer) – Transformer to which the GraphSink belongs

  • filename (str) – The filename to write to

  • format (str) – The file format (tsv, csv)

  • compression (str) – The compression type (tar, tar.gz)

  • kwargs (Any) – Any additional arguments

finalize() None[source]#

Close file handles and create an archive if compression mode is defined.

set_edge_properties(edge_properties: List) None[source]#

Update edge properties index with a given list.

Parameters:

edge_properties (List) – A list of edge properties

set_node_properties(node_properties: List) None[source]#

Update node properties index with a given list.

Parameters:

node_properties (List) – A list of node properties

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) None[source]#

Write an edge record to the underlying store.

Parameters:

record (Dict) – An edge record

write_node(record: Dict) None[source]#

Write a node record to the underlying store.

Parameters:

record (Dict) – A node record

kgx.sink.json_sink#

JsonSink is responsible for writing a KGX formatted JSON using the jsonstreams library, which allows for streaming records to the file.

class kgx.sink.json_sink.JsonSink(owner, filename: str, format: str = 'json', compression: Optional[str] = None, **kwargs: Any)[source]#

Bases: Sink

JsonSink is responsible for writing data as records to a JSON.

Parameters:
  • wner (Transformer) – Transformer to which the GraphSink belongs

  • filename (str) – The filename to write to

  • format (str) – The file format (json)

  • compression (Optional[str]) – The compression type (gz)

  • kwargs (Any) – Any additional arguments

finalize() None[source]#

Finalize by creating a compressed file, if needed.

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) None[source]#

Write an edge record to JSON.

Parameters:

record (Dict) – An edge record

write_node(record: Dict) None[source]#

Write a node record to JSON.

Parameters:

record (Dict) – A node record

kgx.sink.jsonl_sink#

JsonlSink is responsible for writing a KGX formatted JSON Lines using the jsonlines library.

KGX writes two separate JSON Lines files - one for nodes and another for edges.

KGX JSON Lines Format Specification#

The JSON Lines format provides a simple and efficient way to represent KGX data where each line contains a single JSON object representing either a node or an edge. This format combines the advantages of JSON (flexible schema, native support for lists and nested objects) with the streaming capabilities of line-oriented formats.

File Structure#

  • {filename}_nodes.jsonl: Contains one node per line, each as a complete JSON object

  • {filename}_edges.jsonl: Contains one edge per line, each as a complete JSON object

Node Record Format#

Required Properties#

  • id (string): A CURIE that uniquely identifies the node in the graph

  • category (array of strings): List of Biolink categories for the node, from the NamedThing hierarchy

Common Optional Properties#

  • name (string): Human-readable name of the entity

  • description (string): Human-readable description of the entity

  • provided_by (array of strings): List of sources that provided this node

  • xref (array of strings): List of database cross-references as CURIEs

  • synonym (array of strings): List of alternative names for the entity

Edge Record Format#

Required Properties#

  • subject (string): CURIE of the source node

  • predicate (string): Biolink predicate representing the relationship type

  • object (string): CURIE of the target node

  • knowledge_level (string): Level of knowledge representation (observation, assertion, concept, statement) according to Biolink Model

  • agent_type (string): Autonomous agents for edges (informational, computational, biochemical, biological) according to Biolink Model

Common Optional Properties#

  • id (string): Unique identifier for the edge, often a UUID

  • relation (string): Relation CURIE from a formal relation ontology (e.g., RO)

  • category (array of strings): List of Biolink association categories

  • knowledge_source (array of strings): Sources of knowledge (deprecated: provided_by)

  • primary_knowledge_source (array of strings): Primary knowledge sources

  • aggregator_knowledge_source (array of strings): Knowledge aggregator sources

  • publications (array of strings): List of publication CURIEs supporting the edge

Examples#

Node Example (nodes.jsonl):

Each line in a nodes.jsonl file represents a complete node record. Here are examples of different node types:

{
  "id": "HGNC:11603",
  "name": "TBX4",
  "category": [
    "biolink:Gene"
  ]
},
{
  "id": "MONDO:0005002",
  "name": "chronic obstructive pulmonary disease",
  "category": [
    "biolink:Disease"
  ]
},
{
  "id": "CHEBI:15365",
  "name": "acetaminophen",
  "category": [
    "biolink:SmallMolecule",
    "biolink:ChemicalEntity"
  ]
}

In the actual jsonlines file, each record would be on a single line without comments and formatting:

{"id":"HGNC:11603","name":"TBX4","category":["biolink:Gene"]}
{"id":"MONDO:0005002","name":"chronic obstructive pulmonary disease","category":["biolink:Disease"]}
{"id":"CHEBI:15365","name":"acetaminophen","category":["biolink:SmallMolecule","biolink:ChemicalEntity"]}

Edge Example (edges.jsonl):

Each line in a jsonlines file represents a complete edge record. Here are examples of different edge types:

{
  "id": "a8575c4e-61a6-428a-bf09-fcb3e8d1644d",
  "subject": "HGNC:11603",
  "object": "MONDO:0005002",
  "predicate": "biolink:contributes_to",
  "knowledge_level": "assertion",
  "agent_type": "computational"
}
{
  "id": "urn:uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e",
  "subject": "HGNC:11603",
  "predicate": "biolink:contributes_to",
  "object": "MONDO:0005002",
  "category": [
    "biolink:GeneToDiseaseAssociation"
  ],
  "primary_knowledge_source": [
    "infores:agr"
  ],
  "aggregator_knowledge_source": [
    "infores:monarchinitiative"
  ],
  "publications": [
    "PMID:26634245",
    "PMID:26634244"
  ],
  "knowledge_level": "manual_assertion",
  "agent_type": ""
}
{
  "id": "c7d632b4-6708-4296-9cfe-44bc586d32c8",
  "subject": "CHEBI:15365",
  "predicate": "biolink:affects",
  "qualified_predicate": "biolink:causes",
  "object": "HGNC:11603",
  "object_aspect_qualifier": "biolink:expression",
  "object_direction_qualifier": "biolink:increased",
  "category": [
    "biolink:ChemicalAffectsGeneAssociation"
  ],
  "primary_knowledge_source": [
    "infores:ctd"
  ],
  "aggregator_knowledge_source": [
    "infores:monarchinitiative"
  ],
  "publications": [
    "PMID:12345678"
  ],
  "knowledge_level": "assertion",
  "agent_type": "computational"
}

In the actual jsonlines file, each record would be on a single line without comments and formatting:

{"id":"a8575c4e-61a6-428a-bf09-fcb3e8d1644d","subject":"HGNC:11603","object":"MONDO:0005002","predicate":"biolink:related_to","relation":"RO:0003304","knowledge_level":"assertion","agent_type":"computational"}
{"id":"urn:uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e","subject":"HGNC:11603","predicate":"biolink:contributes_to","object":"MONDO:0005002","relation":"RO:0003304","category":["biolink:GeneToDiseaseAssociation"],"primary_knowledge_source":["infores:gwas-catalog"],"publications":["PMID:26634245","PMID:26634244"],"knowledge_level":"observation","agent_type":"biological"}

Usage Notes#

  • All field values should follow the KGX specification and Biolink Model requirements

  • Arrays should be represented as JSON arrays (not pipe-delimited strings)

  • For large KGs, JSON Lines offers better streaming performance than monolithic JSON

class kgx.sink.jsonl_sink.JsonlSink(owner, filename: str, format: str = 'jsonl', compression: Optional[str] = None, **kwargs: Any)[source]#

Bases: Sink

JsonlSink is responsible for writing data as records to JSON lines.

Parameters:
  • owner (Transformer) – Transformer to which the GraphSink belongs

  • filename (str) – The filename to write to

  • format (str) – The file format (jsonl)

  • compression (Optional[str]) – The compression type (gz)

  • kwargs (Any) – Any additional arguments

finalize() None[source]#

Perform any operations after writing the file.

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record: Dict) None[source]#

Write an edge record to JSON.

Parameters:

record (Dict) – A node record

write_node(record: Dict) None[source]#

Write a node record to JSON.

Parameters:

record (Dict) – A node record

kgx.sink.neo_sink#

NeoSink is responsible for writing data to a local or remote Neo4j instance.

class kgx.sink.neo_sink.NeoSink(owner, uri: str, username: str, password: str, **kwargs: Any)[source]#

Bases: Sink

NeoSink is responsible for writing data as records to a Neo4j instance.

Parameters:
  • owner (Transformer) – Transformer to which the GraphSink belongs

  • uri (str) – The URI for the Neo4j instance. For example, http://localhost:7474

  • username (str) – The username

  • password (str) – The password

  • kwargs (Any) – Any additional arguments

static create_constraint_query(category: str) str[source]#

Create a Cypher CONSTRAINT query

Parameters:

category (str) – The category to create a constraint on

Returns:

The Cypher CONSTRAINT query

Return type:

str

create_constraints(categories: Union[set, list]) None[source]#

Create a unique constraint on node ‘id’ for all categories in Neo4j.

Parameters:

categories (Union[set, list]) – Set of categories

finalize() None[source]#

Write any remaining cached node and/or edge records.

static generate_unwind_edge_query(edge_predicate: str) str[source]#

Generate UNWIND cypher query for saving edges into Neo4j.

Query uses self.DEFAULT_NODE_CATEGORY to quickly lookup the required subject and object node.

Parameters:

edge_predicate (str) – Edge label as string

Returns:

The UNWIND cypher query

Return type:

str

static generate_unwind_node_query(category: str) str[source]#

Generate UNWIND cypher query for saving nodes into Neo4j.

There should be a CONSTRAINT in Neo4j for self.DEFAULT_NODE_CATEGORY. The query uses self.DEFAULT_NODE_CATEGORY as the node label to increase speed for adding nodes. The query also sets label to self.DEFAULT_NODE_CATEGORY for any node to make sure that the CONSTRAINT applies.

Parameters:

category (str) – Node category

Returns:

The UNWIND cypher query

Return type:

str

static sanitize_category(category: List) List[source]#

Sanitize category for use in UNWIND cypher clause. This method adds escape characters to each element in category list to ensure the category is processed correctly.

Parameters:

category (List) – Category

Returns:

Sanitized category list

Return type:

List

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record) None[source]#

Cache an edge record that is to be written to Neo4j. This method writes a cache of edge records when the total number of records exceeds CACHE_SIZE

Parameters:

record (Dict) – An edge record

write_node(record) None[source]#

Cache a node record that is to be written to Neo4j. This method writes a cache of node records when the total number of records exceeds CACHE_SIZE

Parameters:

record (Dict) – A node record

kgx.sink.rdf_sink#

RdfSink is responsible for writing data as RDF N-Triples.

class kgx.sink.rdf_sink.RdfSink(owner, filename: str, format: str = 'nt', compression: Optional[bool] = None, reify_all_edges: bool = True, **kwargs: Any)[source]#

Bases: Sink

RdfSink is responsible for writing data as records to an RDF serialization.

Note

Currently only RDF N-Triples serialization is supported.

Parameters:
  • owner (Transformer) – Transformer to which the GraphSink belongs

  • filename (str) – The filename to write to

  • format (str) – The file format (nt)

  • compression (str) – The compression type (gz)

  • reify_all_edges (bool) – Whether or not to reify all the edges

  • kwargs (Any) – Any additional arguments

finalize() None[source]#

Perform any operations after writing the file.

Returns a Biolink Model element for a given predicate.

Parameters:

predicate (Any) – The CURIE of a predicate

Returns:

The corresponding Biolink Model element

Return type:

Optional[Element]

process_predicate(p: Optional[Union[URIRef, str]]) Tuple[source]#

Process a predicate where the method checks if there is a mapping in Biolink Model.

Parameters:

p (Optional[Union[URIRef, str]]) – The predicate

Returns:

A tuple that contains the Biolink CURIE (if available), the Biolink slot_uri CURIE (if available), the CURIE form of p, the reference of p

Return type:

Tuple

reify(u: str, v: str, data: Dict) Dict[source]#

Create a node representation of an edge.

Parameters:
  • u (str) – Subject

  • v (str) – Object

  • k (str) – Edge key

  • data (Dict) – Edge data

Returns:

The reified node

Return type:

Dict

set_property_types(m: Dict) None[source]#

Set export type for properties that are not in Biolink Model.

Parameters:

m (Dict) – A dictionary where the keys are property names and values are their corresponding types.

set_reverse_predicate_mapping(m: Dict) None[source]#

Set reverse predicate mappings.

Use this method to update mappings for predicates that are not in Biolink Model.

Parameters:

m (Dict) – A dictionary where the keys are property names and values are their corresponding IRI.

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

uriref(identifier: str) URIRef[source]#

Generate a rdflib.URIRef for a given string.

Parameters:

identifier (str) – Identifier as string.

Returns:

URIRef form of the input identifier

Return type:

rdflib.URIRef

write_edge(record: Dict) None[source]#

Write an edge record as triples.

Parameters:

record (Dict) – An edge record

write_node(record: Dict) None[source]#

Write a node record as triples.

Parameters:

record (Dict) – A node record

kgx.sink.parquet_sink#

ParquetSink is responsible for writing data as Parquet table files.

KGX writes two separate files - one for nodes and another for edges.

Sink for Parquet format.

class kgx.sink.parquet_sink.ParquetSink(owner, filename: str, **kwargs: Any)[source]#

Bases: Sink

A ParquetSink writes data to Parquet files.

Parameters:
  • owner (Transformer) – Transformer to which the ParquetSink belongs

  • filename (str) – Name of the Parquet file to write to

  • kwargs (Any) – Any additional arguments

finalize() None[source]#

Finalize writing the data to the underlying store.

set_reverse_prefix_map(m: Dict) None#

Update default reverse prefix map.

Parameters:

m (Dict) – A dictionary with IRI to prefix mappings

write_edge(record) None[source]#

Write an edge record to the underlying store.

Parameters:

record (Any) – An edge record

write_node(record) None[source]#

Write a node record to the underlying store.

Parameters:

record (Any) – A node record