Improved graph storage documentation and methods

- Added detailed docstrings for graph methods
- Added bulk node/edge removal methods
This commit is contained in:
yangdx
2025-04-11 18:34:03 +08:00
parent 1e79257976
commit c084358dc9

View File

@@ -12,7 +12,6 @@ from typing import (
TypeVar, TypeVar,
Callable, Callable,
) )
import numpy as np
from .utils import EmbeddingFunc from .utils import EmbeddingFunc
from .types import KnowledgeGraph from .types import KnowledgeGraph
@@ -281,63 +280,164 @@ class BaseGraphStorage(StorageNameSpace, ABC):
@abstractmethod @abstractmethod
async def has_node(self, node_id: str) -> bool: async def has_node(self, node_id: str) -> bool:
"""Check if an edge exists in the graph.""" """Check if a node exists in the graph.
Args:
node_id: The ID of the node to check
Returns:
True if the node exists, False otherwise
"""
@abstractmethod @abstractmethod
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool: async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
"""Get the degree of a node.""" """Check if an edge exists between two nodes.
Args:
source_node_id: The ID of the source node
target_node_id: The ID of the target node
Returns:
True if the edge exists, False otherwise
"""
@abstractmethod @abstractmethod
async def node_degree(self, node_id: str) -> int: async def node_degree(self, node_id: str) -> int:
"""Get the degree of an edge.""" """Get the degree (number of connected edges) of a node.
Args:
node_id: The ID of the node
Returns:
The number of edges connected to the node
"""
@abstractmethod @abstractmethod
async def edge_degree(self, src_id: str, tgt_id: str) -> int: async def edge_degree(self, src_id: str, tgt_id: str) -> int:
"""Get a node by its id.""" """Get the total degree of an edge (sum of degrees of its source and target nodes).
Args:
src_id: The ID of the source node
tgt_id: The ID of the target node
Returns:
The sum of the degrees of the source and target nodes
"""
@abstractmethod @abstractmethod
async def get_node(self, node_id: str) -> dict[str, str] | None: async def get_node(self, node_id: str) -> dict[str, str] | None:
"""Get node by its label identifier, return only node properties""" """Get node by its ID, returning only node properties.
Args:
node_id: The ID of the node to retrieve
Returns:
A dictionary of node properties if found, None otherwise
"""
@abstractmethod @abstractmethod
async def get_edge( async def get_edge(
self, source_node_id: str, target_node_id: str self, source_node_id: str, target_node_id: str
) -> dict[str, str] | None: ) -> dict[str, str] | None:
"""Get edge properties between two nodes""" """Get edge properties between two nodes.
Args:
source_node_id: The ID of the source node
target_node_id: The ID of the target node
Returns:
A dictionary of edge properties if found, None otherwise
"""
@abstractmethod @abstractmethod
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None: async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
"""Upsert a node into the graph.""" """Get all edges connected to a node.
Args:
source_node_id: The ID of the node to get edges for
Returns:
A list of (source_id, target_id) tuples representing edges,
or None if the node doesn't exist
"""
@abstractmethod @abstractmethod
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None: async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
"""Upsert an edge into the graph.""" """Insert a new node or update an existing node in the graph.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args:
node_id: The ID of the node to insert or update
node_data: A dictionary of node properties
"""
@abstractmethod @abstractmethod
async def upsert_edge( async def upsert_edge(
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str] self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
) -> None: ) -> None:
"""Insert a new edge or update an existing edge in the graph.
Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args:
source_node_id: The ID of the source node
target_node_id: The ID of the target node
edge_data: A dictionary of edge properties
"""
@abstractmethod
async def delete_node(self, node_id: str) -> None:
"""Delete a node from the graph. """Delete a node from the graph.
Importance notes for in-memory storage: Importance notes for in-memory storage:
1. Changes will be persisted to disk during the next index_done_callback 1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback, 2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption KG-storage-log should be used to avoid data corruption
Args:
node_id: The ID of the node to delete
""" """
@abstractmethod @abstractmethod
async def delete_node(self, node_id: str) -> None: async def remove_nodes(self, nodes: list[str]):
"""Embed nodes using an algorithm.""" """Delete multiple nodes
Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args:
nodes: List of node IDs to be deleted
"""
@abstractmethod @abstractmethod
async def embed_nodes( async def remove_edges(self, edges: list[tuple[str, str]]):
self, algorithm: str """Delete multiple edges
) -> tuple[np.ndarray[Any, Any], list[str]]:
"""Get all labels in the graph.""" Importance notes:
1. Changes will be persisted to disk during the next index_done_callback
2. Only one process should updating the storage at a time before index_done_callback,
KG-storage-log should be used to avoid data corruption
Args:
edges: List of edges to be deleted, each edge is a (source, target) tuple
"""
@abstractmethod @abstractmethod
async def get_all_labels(self) -> list[str]: async def get_all_labels(self) -> list[str]:
"""Get a knowledge graph of a node.""" """Get all labels in the graph.
Returns:
A list of all node labels in the graph, sorted alphabetically
"""
@abstractmethod @abstractmethod
async def get_knowledge_graph( async def get_knowledge_graph(