From e822f35c898c03243781b046e6685b42a57b19a8 Mon Sep 17 00:00:00 2001 From: zrguo Date: Fri, 7 Mar 2025 14:39:06 +0800 Subject: [PATCH] Fix edit entity and relation bugs --- lightrag/kg/chroma_impl.py | 40 ++++++++++++++++++ lightrag/kg/faiss_impl.py | 21 ++++++++++ lightrag/kg/milvus_impl.py | 25 ++++++++++++ lightrag/kg/mongo_impl.py | 26 ++++++++++++ lightrag/kg/nano_vector_db_impl.py | 20 +++++++++ lightrag/kg/oracle_impl.py | 35 ++++++++++++++++ lightrag/kg/postgres_impl.py | 35 ++++++++++++++++ lightrag/kg/qdrant_impl.py | 42 +++++++++++++++++++ lightrag/kg/tidb_impl.py | 65 ++++++++++++++++++++++++++++++ lightrag/lightrag.py | 35 ++++++++++++++-- 10 files changed, 341 insertions(+), 3 deletions(-) diff --git a/lightrag/kg/chroma_impl.py b/lightrag/kg/chroma_impl.py index ea4b31a1..6b521180 100644 --- a/lightrag/kg/chroma_impl.py +++ b/lightrag/kg/chroma_impl.py @@ -229,3 +229,43 @@ class ChromaVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error while deleting vectors from {self.namespace}: {e}") raise + + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + try: + # Get all records from the collection + # Since ChromaDB doesn't directly support prefix search on IDs, + # we'll get all records and filter in Python + results = self._collection.get( + include=["metadatas", "documents", "embeddings"] + ) + + matching_records = [] + + # Filter records where ID starts with the prefix + for i, record_id in enumerate(results["ids"]): + if record_id.startswith(prefix): + matching_records.append( + { + "id": record_id, + "content": results["documents"][i], + "vector": results["embeddings"][i], + **results["metadatas"][i], + } + ) + + logger.debug( + f"Found {len(matching_records)} records with prefix '{prefix}'" + ) + return matching_records + + except Exception as e: + logger.error(f"Error during prefix search in ChromaDB: {str(e)}") + raise diff --git a/lightrag/kg/faiss_impl.py b/lightrag/kg/faiss_impl.py index 940ba73d..ab036e6f 100644 --- a/lightrag/kg/faiss_impl.py +++ b/lightrag/kg/faiss_impl.py @@ -371,3 +371,24 @@ class FaissVectorDBStorage(BaseVectorStorage): return False # Return error return True # Return success + + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + matching_records = [] + + # Search for records with IDs starting with the prefix + for faiss_id, meta in self._id_to_meta.items(): + if "__id__" in meta and meta["__id__"].startswith(prefix): + # Create a copy of all metadata and add "id" field + record = {**meta, "id": meta["__id__"]} + matching_records.append(record) + + logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'") + return matching_records diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 7242f03d..f3a6fcc4 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -206,3 +206,28 @@ class MilvusVectorDBStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error while deleting vectors from {self.namespace}: {e}") + + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + try: + # Use Milvus query with expression to find IDs with the given prefix + expression = f'id like "{prefix}%"' + results = self._client.query( + collection_name=self.namespace, + filter=expression, + output_fields=list(self.meta_fields) + ["id"], + ) + + logger.debug(f"Found {len(results)} records with prefix '{prefix}'") + return results + + except Exception as e: + logger.error(f"Error searching for records with prefix '{prefix}': {e}") + return [] diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index c2957502..f2ab6ae0 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -1045,6 +1045,32 @@ class MongoVectorDBStorage(BaseVectorStorage): except PyMongoError as e: logger.error(f"Error deleting relations for {entity_name}: {str(e)}") + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + try: + # Use MongoDB regex to find documents where _id starts with the prefix + cursor = self._data.find({"_id": {"$regex": f"^{prefix}"}}) + matching_records = await cursor.to_list(length=None) + + # Format results + results = [{**doc, "id": doc["_id"]} for doc in matching_records] + + logger.debug( + f"Found {len(results)} records with prefix '{prefix}' in {self.namespace}" + ) + return results + + except PyMongoError as e: + logger.error(f"Error searching by prefix in {self.namespace}: {str(e)}") + return [] + async def get_or_create_collection(db: AsyncIOMotorDatabase, collection_name: str): collection_names = await db.list_collection_names() diff --git a/lightrag/kg/nano_vector_db_impl.py b/lightrag/kg/nano_vector_db_impl.py index 07c800de..07ccd566 100644 --- a/lightrag/kg/nano_vector_db_impl.py +++ b/lightrag/kg/nano_vector_db_impl.py @@ -236,3 +236,23 @@ class NanoVectorDBStorage(BaseVectorStorage): return False # Return error return True # Return success + + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + storage = await self.client_storage + matching_records = [] + + # Search for records with IDs starting with the prefix + for record in storage["data"]: + if "__id__" in record and record["__id__"].startswith(prefix): + matching_records.append({**record, "id": record["__id__"]}) + + logger.debug(f"Found {len(matching_records)} records with prefix '{prefix}'") + return matching_records diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index d105aa54..eda3ca63 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -494,6 +494,41 @@ class OracleVectorDBStorage(BaseVectorStorage): logger.error(f"Error deleting relations for entity {entity_name}: {e}") raise + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + try: + # Determine the appropriate table based on namespace + table_name = namespace_to_table_name(self.namespace) + + # Create SQL query to find records with IDs starting with prefix + search_sql = f""" + SELECT * FROM {table_name} + WHERE workspace = :workspace + AND id LIKE :prefix_pattern + ORDER BY id + """ + + params = {"workspace": self.db.workspace, "prefix_pattern": f"{prefix}%"} + + # Execute query and get results + results = await self.db.query(search_sql, params, multirows=True) + + logger.debug( + f"Found {len(results) if results else 0} records with prefix '{prefix}'" + ) + return results or [] + + except Exception as e: + logger.error(f"Error searching records with prefix '{prefix}': {e}") + return [] + @final @dataclass diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 54a59f5d..644c47cd 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -575,6 +575,41 @@ class PGVectorStorage(BaseVectorStorage): except Exception as e: logger.error(f"Error deleting relations for entity {entity_name}: {e}") + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + table_name = namespace_to_table_name(self.namespace) + if not table_name: + logger.error(f"Unknown namespace for prefix search: {self.namespace}") + return [] + + search_sql = f"SELECT * FROM {table_name} WHERE workspace=$1 AND id LIKE $2" + params = {"workspace": self.db.workspace, "prefix": f"{prefix}%"} + + try: + results = await self.db.query(search_sql, params, multirows=True) + logger.debug(f"Found {len(results)} records with prefix '{prefix}'") + + # Format results to match the expected return format + formatted_results = [] + for record in results: + formatted_record = dict(record) + # Ensure id field is available (for consistency with NanoVectorDB implementation) + if "id" not in formatted_record: + formatted_record["id"] = record["id"] + formatted_results.append(formatted_record) + + return formatted_results + except Exception as e: + logger.error(f"Error during prefix search for '{prefix}': {e}") + return [] + @final @dataclass diff --git a/lightrag/kg/qdrant_impl.py b/lightrag/kg/qdrant_impl.py index c7d346e6..5049bc5c 100644 --- a/lightrag/kg/qdrant_impl.py +++ b/lightrag/kg/qdrant_impl.py @@ -233,3 +233,45 @@ class QdrantVectorDBStorage(BaseVectorStorage): logger.debug(f"No relations found for entity {entity_name}") except Exception as e: logger.error(f"Error deleting relations for {entity_name}: {e}") + + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + try: + # Use scroll method to find records with IDs starting with the prefix + results = self._client.scroll( + collection_name=self.namespace, + scroll_filter=models.Filter( + must=[ + models.FieldCondition( + key="id", match=models.MatchText(text=prefix, prefix=True) + ) + ] + ), + with_payload=True, + with_vectors=False, + limit=1000, # Adjust as needed for your use case + ) + + # Extract matching points + matching_records = results[0] + + # Format the results to match expected return format + formatted_results = [ + {**point.payload, "id": point.id} for point in matching_records + ] + + logger.debug( + f"Found {len(formatted_results)} records with prefix '{prefix}'" + ) + return formatted_results + + except Exception as e: + logger.error(f"Error searching for prefix '{prefix}': {e}") + return [] diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 684c30d7..7af9b48a 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -414,6 +414,55 @@ class TiDBVectorDBStorage(BaseVectorStorage): # Ti handles persistence automatically pass + async def search_by_prefix(self, prefix: str) -> list[dict[str, Any]]: + """Search for records with IDs starting with a specific prefix. + + Args: + prefix: The prefix to search for in record IDs + + Returns: + List of records with matching ID prefixes + """ + # Determine which table to query based on namespace + if self.namespace == NameSpace.VECTOR_STORE_ENTITIES: + sql_template = """ + SELECT entity_id as id, name as entity_name, entity_type, description, content + FROM LIGHTRAG_GRAPH_NODES + WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace + """ + elif self.namespace == NameSpace.VECTOR_STORE_RELATIONSHIPS: + sql_template = """ + SELECT relation_id as id, source_name as src_id, target_name as tgt_id, + keywords, description, content + FROM LIGHTRAG_GRAPH_EDGES + WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace + """ + elif self.namespace == NameSpace.VECTOR_STORE_CHUNKS: + sql_template = """ + SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id + FROM LIGHTRAG_DOC_CHUNKS + WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace + """ + else: + logger.warning( + f"Namespace {self.namespace} not supported for prefix search" + ) + return [] + + # Add prefix pattern parameter with % for SQL LIKE + prefix_pattern = f"{prefix}%" + params = {"prefix_pattern": prefix_pattern, "workspace": self.db.workspace} + + try: + results = await self.db.query(sql_template, params=params, multirows=True) + logger.debug( + f"Found {len(results) if results else 0} records with prefix '{prefix}'" + ) + return results if results else [] + except Exception as e: + logger.error(f"Error searching records with prefix '{prefix}': {e}") + return [] + @final @dataclass @@ -968,4 +1017,20 @@ SQL_TEMPLATES = { WHERE (source_name = :source AND target_name = :target) AND workspace = :workspace """, + # Search by prefix SQL templates + "search_entity_by_prefix": """ + SELECT entity_id as id, name as entity_name, entity_type, description, content + FROM LIGHTRAG_GRAPH_NODES + WHERE entity_id LIKE :prefix_pattern AND workspace = :workspace + """, + "search_relationship_by_prefix": """ + SELECT relation_id as id, source_name as src_id, target_name as tgt_id, keywords, description, content + FROM LIGHTRAG_GRAPH_EDGES + WHERE relation_id LIKE :prefix_pattern AND workspace = :workspace + """, + "search_chunk_by_prefix": """ + SELECT chunk_id as id, content, tokens, chunk_order_index, full_doc_id + FROM LIGHTRAG_DOC_CHUNKS + WHERE chunk_id LIKE :prefix_pattern AND workspace = :workspace + """, } diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index b06520fc..41216825 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -2044,6 +2044,9 @@ class LightRAG: # Delete old entity record from vector database old_entity_id = compute_mdhash_id(entity_name, prefix="ent-") await self.entities_vdb.delete([old_entity_id]) + logger.info( + f"Deleted old entity '{entity_name}' and its vector embedding from database" + ) # Update relationship vector representations for src, tgt, edge_data in relations_to_update: @@ -2171,6 +2174,15 @@ class LightRAG: f"Relation from '{source_entity}' to '{target_entity}' does not exist" ) + # Important: First delete the old relation record from the vector database + old_relation_id = compute_mdhash_id( + source_entity + target_entity, prefix="rel-" + ) + await self.relationships_vdb.delete([old_relation_id]) + logger.info( + f"Deleted old relation record from vector database for relation {source_entity} -> {target_entity}" + ) + # 2. Update relation information in the graph new_edge_data = {**edge_data, **updated_data} await self.chunk_entity_relation_graph.upsert_edge( @@ -2669,12 +2681,29 @@ class LightRAG: # 9. Delete source entities for entity_name in source_entities: - # Delete entity node + # Delete entity node from knowledge graph await self.chunk_entity_relation_graph.delete_node(entity_name) - # Delete record from vector database + + # Delete entity record from vector database entity_id = compute_mdhash_id(entity_name, prefix="ent-") await self.entities_vdb.delete([entity_id]) - logger.info(f"Deleted source entity '{entity_name}'") + + # Also ensure any relationships specific to this entity are deleted from vector DB + # This is a safety check, as these should have been transformed to the target entity already + entity_relation_prefix = compute_mdhash_id(entity_name, prefix="rel-") + relations_with_entity = await self.relationships_vdb.search_by_prefix( + entity_relation_prefix + ) + if relations_with_entity: + relation_ids = [r["id"] for r in relations_with_entity] + await self.relationships_vdb.delete(relation_ids) + logger.info( + f"Deleted {len(relation_ids)} relation records for entity '{entity_name}' from vector database" + ) + + logger.info( + f"Deleted source entity '{entity_name}' and its vector embedding from database" + ) # 10. Save changes await self._merge_entities_done()