Add graph_db_lock to esure consistency across multiple processes for node and edge edition jobs

This commit is contained in:
yangdx
2025-04-14 00:07:31 +08:00
parent 00c8394601
commit 6dd67748ca

View File

@@ -11,12 +11,18 @@ from functools import partial
from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal from typing import Any, AsyncIterator, Callable, Iterator, cast, final, Literal
import pandas as pd import pandas as pd
from .kg.shared_storage import get_graph_db_lock
from lightrag.kg import ( from lightrag.kg import (
STORAGES, STORAGES,
verify_storage_implementation, verify_storage_implementation,
) )
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
from .base import ( from .base import (
BaseGraphStorage, BaseGraphStorage,
BaseKVStorage, BaseKVStorage,
@@ -779,10 +785,6 @@ class LightRAG:
3. Process each chunk for entity and relation extraction 3. Process each chunk for entity and relation extraction
4. Update the document status 4. Update the document status
""" """
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
)
# Get pipeline status shared data and lock # Get pipeline status shared data and lock
pipeline_status = await get_namespace_data("pipeline_status") pipeline_status = await get_namespace_data("pipeline_status")
@@ -1431,8 +1433,10 @@ class LightRAG:
loop = always_get_an_event_loop() loop = always_get_an_event_loop()
return loop.run_until_complete(self.adelete_by_entity(entity_name)) return loop.run_until_complete(self.adelete_by_entity(entity_name))
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def adelete_by_entity(self, entity_name: str) -> None: async def adelete_by_entity(self, entity_name: str) -> None:
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
await self.entities_vdb.delete_entity(entity_name) await self.entities_vdb.delete_entity(entity_name)
await self.relationships_vdb.delete_entity_relation(entity_name) await self.relationships_vdb.delete_entity_relation(entity_name)
@@ -1469,7 +1473,6 @@ class LightRAG:
self.adelete_by_relation(source_entity, target_entity) self.adelete_by_relation(source_entity, target_entity)
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None: async def adelete_by_relation(self, source_entity: str, target_entity: str) -> None:
"""Asynchronously delete a relation between two entities. """Asynchronously delete a relation between two entities.
@@ -1477,6 +1480,9 @@ class LightRAG:
source_entity: Name of the source entity source_entity: Name of the source entity
target_entity: Name of the target entity target_entity: Name of the target entity
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# TODO: check if has_edge function works on reverse relation # TODO: check if has_edge function works on reverse relation
# Check if the relation exists # Check if the relation exists
@@ -1539,7 +1545,8 @@ class LightRAG:
""" """
return await self.doc_status.get_docs_by_status(status) return await self.doc_status.get_docs_by_status(status)
# TODO: Lock all KG relative DB to esure consistency across multiple processes # TODO: Deprecated (Deleting documents can cause hallucinations in RAG.)
# Document delete is not working properly for most of the storage implementations.
async def adelete_by_doc_id(self, doc_id: str) -> None: async def adelete_by_doc_id(self, doc_id: str) -> None:
"""Delete a document and all its related data """Delete a document and all its related data
@@ -1898,7 +1905,6 @@ class LightRAG:
"""Synchronous version of aclear_cache.""" """Synchronous version of aclear_cache."""
return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes)) return always_get_an_event_loop().run_until_complete(self.aclear_cache(modes))
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def aedit_entity( async def aedit_entity(
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]: ) -> dict[str, Any]:
@@ -1914,6 +1920,9 @@ class LightRAG:
Returns: Returns:
Dictionary containing updated entity information Dictionary containing updated entity information
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# 1. Get current entity information # 1. Get current entity information
node_exists = await self.chunk_entity_relation_graph.has_node(entity_name) node_exists = await self.chunk_entity_relation_graph.has_node(entity_name)
@@ -2111,7 +2120,6 @@ class LightRAG:
] ]
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def aedit_relation( async def aedit_relation(
self, source_entity: str, target_entity: str, updated_data: dict[str, Any] self, source_entity: str, target_entity: str, updated_data: dict[str, Any]
) -> dict[str, Any]: ) -> dict[str, Any]:
@@ -2127,6 +2135,9 @@ class LightRAG:
Returns: Returns:
Dictionary containing updated relation information Dictionary containing updated relation information
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# 1. Get current relation information # 1. Get current relation information
edge_exists = await self.chunk_entity_relation_graph.has_edge( edge_exists = await self.chunk_entity_relation_graph.has_edge(
@@ -2245,6 +2256,9 @@ class LightRAG:
Returns: Returns:
Dictionary containing created entity information Dictionary containing created entity information
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# Check if entity already exists # Check if entity already exists
existing_node = await self.chunk_entity_relation_graph.has_node(entity_name) existing_node = await self.chunk_entity_relation_graph.has_node(entity_name)
@@ -2325,6 +2339,9 @@ class LightRAG:
Returns: Returns:
Dictionary containing created relation information Dictionary containing created relation information
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# Check if both entities exist # Check if both entities exist
source_exists = await self.chunk_entity_relation_graph.has_node( source_exists = await self.chunk_entity_relation_graph.has_node(
@@ -2426,7 +2443,6 @@ class LightRAG:
self.acreate_relation(source_entity, target_entity, relation_data) self.acreate_relation(source_entity, target_entity, relation_data)
) )
# TODO: Lock all KG relative DB to esure consistency across multiple processes
async def amerge_entities( async def amerge_entities(
self, self,
source_entities: list[str], source_entities: list[str],
@@ -2454,6 +2470,9 @@ class LightRAG:
Returns: Returns:
Dictionary containing the merged entity information Dictionary containing the merged entity information
""" """
graph_db_lock = get_graph_db_lock(enable_logging=False)
# Use graph database lock to ensure atomic graph and vector db operations
async with graph_db_lock:
try: try:
# Default merge strategy # Default merge strategy
default_strategy = { default_strategy = {