From 7d6389801513b8d28677948f9fcfd62f8f4e0b04 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 19:21:49 +0100 Subject: [PATCH 01/15] fixed bugs --- lightrag/base.py | 2 +- lightrag/kg/json_kv_impl.py | 2 +- lightrag/kg/jsondocstatus_impl.py | 49 ++++++++++++++-------- lightrag/kg/mongo_impl.py | 2 +- lightrag/kg/postgres_impl.py | 2 +- lightrag/kg/redis_impl.py | 2 +- lightrag/lightrag.py | 69 ++++++++++++++----------------- 7 files changed, 67 insertions(+), 61 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index 7a3b4f5f..9b3e5f00 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -90,7 +90,7 @@ class BaseKVStorage(StorageNameSpace): async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: raise NotImplementedError - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: """return un-exist keys""" raise NotImplementedError diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index ff184dbd..c61d088d 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -38,7 +38,7 @@ class JsonKVStorage(BaseKVStorage): for id in ids ] - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: return set([s for s in data if s not in self._data]) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 31aa836a..179b17a3 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -52,17 +52,16 @@ import os from dataclasses import dataclass from typing import Any, Union -from lightrag.utils import ( - logger, - load_json, - write_json, -) - from lightrag.base import ( - DocStatus, DocProcessingStatus, + DocStatus, DocStatusStorage, ) +from lightrag.utils import ( + load_json, + logger, + write_json, +) @dataclass @@ -75,15 +74,17 @@ class JsonDocStatusStorage(DocStatusStorage): self._data: dict[str, Any] = load_json(self._file_name) or {} logger.info(f"Loaded document status storage with {len(self._data)} records") - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" - return set( - [ - k - for k in data - if k not in self._data or self._data[k]["status"] != DocStatus.PROCESSED - ] - ) + return {k for k, _ in self._data.items() if k in data} + + async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: + result: list[dict[str, Any]] = [] + for id in ids: + data = self._data.get(id, None) + if data: + result.append(data) + return result async def get_status_counts(self) -> dict[str, int]: """Get counts of documents in each status""" @@ -94,11 +95,19 @@ class JsonDocStatusStorage(DocStatusStorage): async def get_failed_docs(self) -> dict[str, DocProcessingStatus]: """Get all failed documents""" - return {k: v for k, v in self._data.items() if v["status"] == DocStatus.FAILED} + return { + k: DocProcessingStatus(**v) + for k, v in self._data.items() + if v["status"] == DocStatus.FAILED + } async def get_pending_docs(self) -> dict[str, DocProcessingStatus]: """Get all pending documents""" - return {k: v for k, v in self._data.items() if v["status"] == DocStatus.PENDING} + return { + k: DocProcessingStatus(**v) + for k, v in self._data.items() + if v["status"] == DocStatus.PENDING + } async def index_done_callback(self): """Save data to file after indexing""" @@ -118,7 +127,11 @@ class JsonDocStatusStorage(DocStatusStorage): async def get(self, doc_id: str) -> Union[DocProcessingStatus, None]: """Get document status by ID""" - return self._data.get(doc_id) + data = self._data.get(doc_id) + if data: + return DocProcessingStatus(**data) + else: + return None async def delete(self, doc_ids: list[str]): """Delete document status by IDs""" diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 35902d37..1294a26a 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -35,7 +35,7 @@ class MongoKVStorage(BaseKVStorage): async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: return list(self._data.find({"_id": {"$in": ids}})) - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: existing_ids = [ str(x["_id"]) for x in self._data.find({"_id": {"$in": data}}, {"_id": 1}) ] diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 77fe6198..63df869e 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -421,7 +421,7 @@ class PGDocStatusStorage(DocStatusStorage): def __post_init__(self): pass - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that don't exist in storage""" keys = ",".join([f"'{_id}'" for _id in data]) sql = ( diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index 05da41b7..ef95d6db 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -32,7 +32,7 @@ class RedisKVStorage(BaseKVStorage): results = await pipe.execute() return [json.loads(result) if result else None for result in results] - async def filter_keys(self, data: list[str]) -> set[str]: + async def filter_keys(self, data: set[str]) -> set[str]: pipe = self._redis.pipeline() for key in data: pipe.exists(f"{self.namespace}:{key}") diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 5d00c508..3bd3cc8f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1,28 +1,11 @@ import asyncio import os -from tqdm.asyncio import tqdm as tqdm_async +from collections.abc import Coroutine from dataclasses import asdict, dataclass, field from datetime import datetime from functools import partial -from typing import Any, Callable, Coroutine, Optional, Type, Union, cast -from .operate import ( - chunking_by_token_size, - extract_entities, - extract_keywords_only, - kg_query, - kg_query_with_keywords, - mix_kg_vector_query, - naive_query, -) +from typing import Any, Callable, Optional, Type, Union, cast -from .utils import ( - EmbeddingFunc, - compute_mdhash_id, - limit_async_func_call, - convert_response_to_json, - logger, - set_logger, -) from .base import ( BaseGraphStorage, BaseKVStorage, @@ -33,10 +16,25 @@ from .base import ( QueryParam, StorageNameSpace, ) - from .namespace import NameSpace, make_namespace - +from .operate import ( + chunking_by_token_size, + extract_entities, + extract_keywords_only, + kg_query, + kg_query_with_keywords, + mix_kg_vector_query, + naive_query, +) from .prompt import GRAPH_FIELD_SEP +from .utils import ( + EmbeddingFunc, + compute_mdhash_id, + convert_response_to_json, + limit_async_func_call, + logger, + set_logger, +) STORAGES = { "NetworkXStorage": ".kg.networkx_impl", @@ -67,7 +65,6 @@ STORAGES = { def lazy_external_import(module_name: str, class_name: str): """Lazily import a class from an external module based on the package of the caller.""" - # Get the caller's module and package import inspect @@ -113,7 +110,7 @@ def always_get_an_event_loop() -> asyncio.AbstractEventLoop: @dataclass class LightRAG: working_dir: str = field( - default_factory=lambda: f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}" + default_factory=lambda: f'./lightrag_cache_{datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}' ) # Default not to use embedding cache embedding_cache_config: dict = field( @@ -496,15 +493,15 @@ class LightRAG: } # 3. Filter out already processed documents - add_doc_keys: set[str] = set() + new_doc_keys: set[str] = set() # Get docs ids - in_process_keys = list(new_docs.keys()) + in_process_keys = set(new_docs.keys()) # Get in progress docs ids - excluded_ids = await self.doc_status.get_by_ids(in_process_keys) + excluded_ids = await self.doc_status.filter_keys(list(in_process_keys)) # Exclude already in process - add_doc_keys = new_docs.keys() - excluded_ids + new_doc_keys = in_process_keys - excluded_ids # Filter - new_docs = {k: v for k, v in new_docs.items() if k in add_doc_keys} + new_docs = {doc_id: new_docs[doc_id] for doc_id in new_doc_keys} if not new_docs: logger.info("All documents have been processed or are duplicates") @@ -562,15 +559,12 @@ class LightRAG: # 3. iterate over batches tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} - for batch_idx, ids_doc_processing_status in tqdm_async( - enumerate(batch_docs_list), - desc="Process Batches", - ): + + logger.info(f"Number of batches to process: {len(batch_docs_list)}.") + + for batch_idx, ids_doc_processing_status in enumerate(batch_docs_list): # 4. iterate over batch - for id_doc_processing_status in tqdm_async( - ids_doc_processing_status, - desc=f"Process Batch {batch_idx}", - ): + for id_doc_processing_status in ids_doc_processing_status: id_doc, status_doc = id_doc_processing_status # Update status in processing await self.doc_status.upsert( @@ -644,6 +638,7 @@ class LightRAG: } ) continue + logger.info(f"Completed batch {batch_idx + 1} of {len(batch_docs_list)}.") async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: try: @@ -895,7 +890,6 @@ class LightRAG: 1. Extract keywords from the 'query' using new function in operate.py. 2. Then run the standard aquery() flow with the final prompt (formatted_question). """ - loop = always_get_an_event_loop() return loop.run_until_complete( self.aquery_with_separate_keyword_extraction(query, prompt, param) @@ -908,7 +902,6 @@ class LightRAG: 1. Calls extract_keywords_only to get HL/LL keywords from 'query'. 2. Then calls kg_query(...) or naive_query(...), etc. as the main query, while also injecting the newly extracted keywords if needed. """ - # --------------------- # STEP 1: Keyword Extraction # --------------------- From 55cfb4dab17b52d307ded3edd10cc9c1103d6056 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 19:24:41 +0100 Subject: [PATCH 02/15] fixed typo --- lightrag/lightrag.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 3bd3cc8f..6ff283c6 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -493,15 +493,14 @@ class LightRAG: } # 3. Filter out already processed documents - new_doc_keys: set[str] = set() # Get docs ids - in_process_keys = set(new_docs.keys()) - # Get in progress docs ids - excluded_ids = await self.doc_status.filter_keys(list(in_process_keys)) - # Exclude already in process - new_doc_keys = in_process_keys - excluded_ids - # Filter - new_docs = {doc_id: new_docs[doc_id] for doc_id in new_doc_keys} + all_new_doc_ids = set(new_docs.keys()) + # Retrieve IDs that are already being processed + existing_ids = await self.doc_status.filter_keys(all_new_doc_ids) + # Exclude IDs of documents that are already in progress + unique_new_doc_ids = all_new_doc_ids - existing_ids + # Filter new_docs to only include documents with unique IDs + new_docs = {doc_id: new_docs[doc_id] for doc_id in unique_new_doc_ids} if not new_docs: logger.info("All documents have been processed or are duplicates") From 6480ddee5ddaae84f47328295e0d4051d810e6b0 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 19:51:05 +0100 Subject: [PATCH 03/15] cleaned code --- lightrag/base.py | 28 +++++++++++++++------------- lightrag/kg/json_kv_impl.py | 18 +++++++++--------- lightrag/kg/jsondocstatus_impl.py | 2 +- lightrag/kg/mongo_impl.py | 19 ++++++++++--------- lightrag/kg/oracle_impl.py | 24 ++++++++++++++---------- lightrag/kg/postgres_impl.py | 30 ++++++++++++++++-------------- lightrag/kg/redis_impl.py | 4 ++-- lightrag/kg/tidb_impl.py | 21 ++++++++++----------- 8 files changed, 77 insertions(+), 69 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index 9b3e5f00..1a7f9c2e 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -1,24 +1,26 @@ -from enum import Enum import os from dataclasses import dataclass, field +from enum import Enum from typing import ( + Any, + Literal, Optional, TypedDict, - Union, - Literal, TypeVar, - Any, + Union, ) import numpy as np - from .utils import EmbeddingFunc -TextChunkSchema = TypedDict( - "TextChunkSchema", - {"tokens": int, "content": str, "full_doc_id": str, "chunk_order_index": int}, -) + +class TextChunkSchema(TypedDict): + tokens: int + content: str + full_doc_id: str + chunk_order_index: int + T = TypeVar("T") @@ -57,11 +59,11 @@ class StorageNameSpace: global_config: dict[str, Any] async def index_done_callback(self): - """commit the storage operations after indexing""" + """Commit the storage operations after indexing""" pass async def query_done_callback(self): - """commit the storage operations after querying""" + """Commit the storage operations after querying""" pass @@ -84,14 +86,14 @@ class BaseVectorStorage(StorageNameSpace): class BaseKVStorage(StorageNameSpace): embedding_func: EmbeddingFunc - async def get_by_id(self, id: str) -> dict[str, Any]: + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: raise NotImplementedError async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: raise NotImplementedError async def filter_keys(self, data: set[str]) -> set[str]: - """return un-exist keys""" + """Return un-exist keys""" raise NotImplementedError async def upsert(self, data: dict[str, Any]) -> None: diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index c61d088d..e545c650 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -1,16 +1,16 @@ import asyncio import os from dataclasses import dataclass -from typing import Any +from typing import Any, Union -from lightrag.utils import ( - logger, - load_json, - write_json, -) from lightrag.base import ( BaseKVStorage, ) +from lightrag.utils import ( + load_json, + logger, + write_json, +) @dataclass @@ -25,8 +25,8 @@ class JsonKVStorage(BaseKVStorage): async def index_done_callback(self): write_json(self._data, self._file_name) - async def get_by_id(self, id: str) -> dict[str, Any]: - return self._data.get(id, {}) + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: + return self._data.get(id) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: return [ @@ -39,7 +39,7 @@ class JsonKVStorage(BaseKVStorage): ] async def filter_keys(self, data: set[str]) -> set[str]: - return set([s for s in data if s not in self._data]) + return data - set(self._data.keys()) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: left_data = {k: v for k, v in data.items() if k not in self._data} diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 179b17a3..2ff06d3a 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -76,7 +76,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" - return {k for k, _ in self._data.items() if k in data} + return set(k for k in data if k not in self._data) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 1294a26a..4f919ecd 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -1,8 +1,9 @@ import os -from tqdm.asyncio import tqdm as tqdm_async from dataclasses import dataclass -import pipmaster as pm + import numpy as np +import pipmaster as pm +from tqdm.asyncio import tqdm as tqdm_async if not pm.is_installed("pymongo"): pm.install("pymongo") @@ -10,13 +11,14 @@ if not pm.is_installed("pymongo"): if not pm.is_installed("motor"): pm.install("motor") -from pymongo import MongoClient -from motor.motor_asyncio import AsyncIOMotorClient -from typing import Any, Union, List, Tuple +from typing import Any, List, Tuple, Union -from ..utils import logger -from ..base import BaseKVStorage, BaseGraphStorage +from motor.motor_asyncio import AsyncIOMotorClient +from pymongo import MongoClient + +from ..base import BaseGraphStorage, BaseKVStorage from ..namespace import NameSpace, is_namespace +from ..utils import logger @dataclass @@ -29,7 +31,7 @@ class MongoKVStorage(BaseKVStorage): self._data = database.get_collection(self.namespace) logger.info(f"Use MongoDB as KV {self.namespace}") - async def get_by_id(self, id: str) -> dict[str, Any]: + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: return self._data.find_one({"_id": id}) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: @@ -170,7 +172,6 @@ class MongoGraphStorage(BaseGraphStorage): But typically for a direct edge, we might just do a find_one. Below is a demonstration approach. """ - # We can do a single-hop graphLookup (maxDepth=0 or 1). # Then check if the target_node appears among the edges array. pipeline = [ diff --git a/lightrag/kg/oracle_impl.py b/lightrag/kg/oracle_impl.py index b648c9bc..ca6bcfb2 100644 --- a/lightrag/kg/oracle_impl.py +++ b/lightrag/kg/oracle_impl.py @@ -1,27 +1,28 @@ -import os +import array import asyncio +import os # import html # import os from dataclasses import dataclass from typing import Any, Union + import numpy as np -import array import pipmaster as pm if not pm.is_installed("oracledb"): pm.install("oracledb") -from ..utils import logger +import oracledb + from ..base import ( BaseGraphStorage, BaseKVStorage, BaseVectorStorage, ) from ..namespace import NameSpace, is_namespace - -import oracledb +from ..utils import logger class OracleDB: @@ -107,7 +108,7 @@ class OracleDB: "SELECT id FROM GRAPH_TABLE (lightrag_graph MATCH (a) COLUMNS (a.id)) fetch first row only" ) else: - await self.query("SELECT 1 FROM {k}".format(k=k)) + await self.query(f"SELECT 1 FROM {k}") except Exception as e: logger.error(f"Failed to check table {k} in Oracle database") logger.error(f"Oracle database error: {e}") @@ -181,8 +182,8 @@ class OracleKVStorage(BaseKVStorage): ################ QUERY METHODS ################ - async def get_by_id(self, id: str) -> dict[str, Any]: - """get doc_full data based on id.""" + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: + """Get doc_full data based on id.""" SQL = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"workspace": self.db.workspace, "id": id} # print("get_by_id:"+SQL) @@ -191,7 +192,10 @@ class OracleKVStorage(BaseKVStorage): res = {} for row in array_res: res[row["id"]] = row - return res + if res: + return res + else: + return None else: return await self.db.query(SQL, params) @@ -209,7 +213,7 @@ class OracleKVStorage(BaseKVStorage): return None async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: - """get doc_chunks data based on id""" + """Get doc_chunks data based on id""" SQL = SQL_TEMPLATES["get_by_ids_" + self.namespace].format( ids=",".join([f"'{id}'" for id in ids]) ) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 63df869e..d319f6f9 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -4,34 +4,35 @@ import json import os import time from dataclasses import dataclass -from typing import Union, List, Dict, Set, Any, Tuple -import numpy as np +from typing import Any, Dict, List, Set, Tuple, Union +import numpy as np import pipmaster as pm if not pm.is_installed("asyncpg"): pm.install("asyncpg") -import asyncpg import sys -from tqdm.asyncio import tqdm as tqdm_async + +import asyncpg from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_exponential, ) +from tqdm.asyncio import tqdm as tqdm_async -from ..utils import logger from ..base import ( + BaseGraphStorage, BaseKVStorage, BaseVectorStorage, - DocStatusStorage, - DocStatus, DocProcessingStatus, - BaseGraphStorage, + DocStatus, + DocStatusStorage, ) from ..namespace import NameSpace, is_namespace +from ..utils import logger if sys.platform.startswith("win"): import asyncio.windows_events @@ -82,7 +83,7 @@ class PostgreSQLDB: async def check_tables(self): for k, v in TABLES.items(): try: - await self.query("SELECT 1 FROM {k} LIMIT 1".format(k=k)) + await self.query(f"SELECT 1 FROM {k} LIMIT 1") except Exception as e: logger.error(f"Failed to check table {k} in PostgreSQL database") logger.error(f"PostgreSQL database error: {e}") @@ -183,7 +184,7 @@ class PGKVStorage(BaseKVStorage): ################ QUERY METHODS ################ - async def get_by_id(self, id: str) -> dict[str, Any]: + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: """Get doc_full data by id.""" sql = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"workspace": self.db.workspace, "id": id} @@ -192,9 +193,10 @@ class PGKVStorage(BaseKVStorage): res = {} for row in array_res: res[row["id"]] = row - return res + return res if res else None else: - return await self.db.query(sql, params) + response = await self.db.query(sql, params) + return response if response else None async def get_by_mode_and_id(self, mode: str, id: str) -> Union[dict, None]: """Specifically for llm_response_cache.""" @@ -435,12 +437,12 @@ class PGDocStatusStorage(DocStatusStorage): existed = set([element["id"] for element in result]) return set(data) - existed - async def get_by_id(self, id: str) -> dict[str, Any]: + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2" params = {"workspace": self.db.workspace, "id": id} result = await self.db.query(sql, params, True) if result is None or result == []: - return {} + return None else: return DocProcessingStatus( content=result[0]["content"], diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index ef95d6db..e97a6afc 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -1,5 +1,5 @@ import os -from typing import Any +from typing import Any, Union from tqdm.asyncio import tqdm as tqdm_async from dataclasses import dataclass import pipmaster as pm @@ -21,7 +21,7 @@ class RedisKVStorage(BaseKVStorage): self._redis = Redis.from_url(redis_url, decode_responses=True) logger.info(f"Use Redis as KV {self.namespace}") - async def get_by_id(self, id): + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: data = await self._redis.get(f"{self.namespace}:{id}") return json.loads(data) if data else None diff --git a/lightrag/kg/tidb_impl.py b/lightrag/kg/tidb_impl.py index 1f454639..d9eeb2dd 100644 --- a/lightrag/kg/tidb_impl.py +++ b/lightrag/kg/tidb_impl.py @@ -14,12 +14,12 @@ if not pm.is_installed("sqlalchemy"): from sqlalchemy import create_engine, text from tqdm import tqdm -from ..base import BaseVectorStorage, BaseKVStorage, BaseGraphStorage -from ..utils import logger +from ..base import BaseGraphStorage, BaseKVStorage, BaseVectorStorage from ..namespace import NameSpace, is_namespace +from ..utils import logger -class TiDB(object): +class TiDB: def __init__(self, config, **kwargs): self.host = config.get("host", None) self.port = config.get("port", None) @@ -108,12 +108,12 @@ class TiDBKVStorage(BaseKVStorage): ################ QUERY METHODS ################ - async def get_by_id(self, id: str) -> dict[str, Any]: + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: """Fetch doc_full data by id.""" SQL = SQL_TEMPLATES["get_by_id_" + self.namespace] params = {"id": id} - # print("get_by_id:"+SQL) - return await self.db.query(SQL, params) + response = await self.db.query(SQL, params) + return response if response else None # Query by id async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: @@ -178,7 +178,7 @@ class TiDBKVStorage(BaseKVStorage): "tokens": item["tokens"], "chunk_order_index": item["chunk_order_index"], "full_doc_id": item["full_doc_id"], - "content_vector": f"{item['__vector__'].tolist()}", + "content_vector": f'{item["__vector__"].tolist()}', "workspace": self.db.workspace, } ) @@ -222,8 +222,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): ) async def query(self, query: str, top_k: int) -> list[dict]: - """search from tidb vector""" - + """Search from tidb vector""" embeddings = await self.embedding_func([query]) embedding = embeddings[0] @@ -286,7 +285,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): "id": item["id"], "name": item["entity_name"], "content": item["content"], - "content_vector": f"{item['content_vector'].tolist()}", + "content_vector": f'{item["content_vector"].tolist()}', "workspace": self.db.workspace, } # update entity_id if node inserted by graph_storage_instance before @@ -308,7 +307,7 @@ class TiDBVectorDBStorage(BaseVectorStorage): "source_name": item["src_id"], "target_name": item["tgt_id"], "content": item["content"], - "content_vector": f"{item['content_vector'].tolist()}", + "content_vector": f'{item["content_vector"].tolist()}', "workspace": self.db.workspace, } # update relation_id if node inserted by graph_storage_instance before From 62115b836fedd6322bb46d29d25095275716f71a Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 19:56:12 +0100 Subject: [PATCH 04/15] cleaned set --- lightrag/kg/json_kv_impl.py | 2 +- lightrag/kg/jsondocstatus_impl.py | 2 +- lightrag/lightrag.py | 13 ++++++------- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index e545c650..f5df0833 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -39,7 +39,7 @@ class JsonKVStorage(BaseKVStorage): ] async def filter_keys(self, data: set[str]) -> set[str]: - return data - set(self._data.keys()) + return set(self._data.keys()).difference(data) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: left_data = {k: v for k, v in data.items() if k not in self._data} diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 2ff06d3a..35d31fba 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -76,7 +76,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" - return set(k for k in data if k not in self._data) + return set(self._data.keys()).difference(data) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 6ff283c6..bf395e29 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -409,7 +409,7 @@ class LightRAG: doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-") new_docs = {doc_key: {"content": full_text.strip()}} - _add_doc_keys = await self.full_docs.filter_keys([doc_key]) + _add_doc_keys = await self.full_docs.filter_keys(set(doc_key)) new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys} if not len(new_docs): logger.warning("This document is already in the storage.") @@ -418,7 +418,7 @@ class LightRAG: update_storage = True logger.info(f"[New Docs] inserting {len(new_docs)} docs") - inserting_chunks = {} + inserting_chunks: dict[str, Any] = {} for chunk_text in text_chunks: chunk_text_stripped = chunk_text.strip() chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-") @@ -428,11 +428,10 @@ class LightRAG: "full_doc_id": doc_key, } - _add_chunk_keys = await self.text_chunks.filter_keys( - list(inserting_chunks.keys()) - ) + doc_ids = set(inserting_chunks.keys()) + add_chunk_keys = await self.text_chunks.filter_keys(doc_ids) inserting_chunks = { - k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys + k: v for k, v in inserting_chunks.items() if k in add_chunk_keys } if not len(inserting_chunks): logger.warning("All chunks are already in the storage.") @@ -539,7 +538,7 @@ class LightRAG: logger.info("All documents have been processed or are duplicates") return - to_process_docs_ids = list(to_process_docs.keys()) + to_process_docs_ids = set(to_process_docs.keys()) # Get allready processed documents (text chunks and full docs) text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( From d70d7ff20e875402b2259a9614ad29b465b8eb1a Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 20:05:59 +0100 Subject: [PATCH 05/15] added at call check --- lightrag/lightrag.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index bf395e29..bf03447e 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -538,16 +538,6 @@ class LightRAG: logger.info("All documents have been processed or are duplicates") return - to_process_docs_ids = set(to_process_docs.keys()) - - # Get allready processed documents (text chunks and full docs) - text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( - to_process_docs_ids - ) - full_docs_processed_doc_ids = await self.full_docs.filter_keys( - to_process_docs_ids - ) - # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) batch_docs_list = [ @@ -597,14 +587,15 @@ class LightRAG: await self._process_entity_relation_graph(chunks) tasks[id_doc] = [] + # Check if document already processed the doc - if id_doc not in full_docs_processed_doc_ids: + if await self.full_docs.get_by_id(id_doc) is None: tasks[id_doc].append( self.full_docs.upsert({id_doc: {"content": status_doc.content}}) ) # Check if chunks already processed the doc - if id_doc not in text_chunks_processed_doc_ids: + if await self.text_chunks.get_by_id(id_doc) is None: tasks[id_doc].append(self.text_chunks.upsert(chunks)) # Process document (text chunks and full docs) in parallel From 07e3d2b24fe4d4affc10a6411f3f6081633dedec Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 20:11:35 +0100 Subject: [PATCH 06/15] fixed typo --- lightrag/lightrag.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index bf03447e..a20d4606 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -553,11 +553,11 @@ class LightRAG: for batch_idx, ids_doc_processing_status in enumerate(batch_docs_list): # 4. iterate over batch for id_doc_processing_status in ids_doc_processing_status: - id_doc, status_doc = id_doc_processing_status + doc_id, status_doc = id_doc_processing_status # Update status in processing await self.doc_status.upsert( { - id_doc: { + doc_id: { "status": DocStatus.PROCESSING, "updated_at": datetime.now().isoformat(), "content_summary": status_doc.content_summary, @@ -586,25 +586,25 @@ class LightRAG: await self.chunks_vdb.upsert(chunks) await self._process_entity_relation_graph(chunks) - tasks[id_doc] = [] + tasks[doc_id] = [] # Check if document already processed the doc - if await self.full_docs.get_by_id(id_doc) is None: - tasks[id_doc].append( - self.full_docs.upsert({id_doc: {"content": status_doc.content}}) + if await self.full_docs.get_by_id(doc_id) is None: + tasks[doc_id].append( + self.full_docs.upsert({doc_id: {"content": status_doc.content}}) ) # Check if chunks already processed the doc - if await self.text_chunks.get_by_id(id_doc) is None: - tasks[id_doc].append(self.text_chunks.upsert(chunks)) + if await self.text_chunks.get_by_id(doc_id) is None: + tasks[doc_id].append(self.text_chunks.upsert(chunks)) # Process document (text chunks and full docs) in parallel - for id_doc_processing_status, task in tasks.items(): + for task_doc_id, task in tasks.items(): try: await asyncio.gather(*task) await self.doc_status.upsert( { - id_doc_processing_status: { + task_doc_id: { "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "updated_at": datetime.now().isoformat(), @@ -615,11 +615,11 @@ class LightRAG: except Exception as e: logger.error( - f"Failed to process document {id_doc_processing_status}: {str(e)}" + f"Failed to process document {task_doc_id}: {str(e)}" ) await self.doc_status.upsert( { - id_doc_processing_status: { + task_doc_id: { "status": DocStatus.FAILED, "error": str(e), "updated_at": datetime.now().isoformat(), From 5e3100221ccdf7968d87e0b5c8660700c73eea3f Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 20:18:38 +0100 Subject: [PATCH 07/15] cleaned code --- lightrag/lightrag.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index a20d4606..178d8a49 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -540,7 +540,7 @@ class LightRAG: # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) - batch_docs_list = [ + docs_batches = [ list(to_process_docs.items())[i : i + batch_size] for i in range(0, len(to_process_docs), batch_size) ] @@ -548,12 +548,12 @@ class LightRAG: # 3. iterate over batches tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} - logger.info(f"Number of batches to process: {len(batch_docs_list)}.") + logger.info(f"Number of batches to process: {len(docs_batches)}.") - for batch_idx, ids_doc_processing_status in enumerate(batch_docs_list): + for batch_idx, docs_batch in enumerate(docs_batches): # 4. iterate over batch - for id_doc_processing_status in ids_doc_processing_status: - doc_id, status_doc = id_doc_processing_status + for doc_id_processing_status in docs_batch: + doc_id, status_doc = doc_id_processing_status # Update status in processing await self.doc_status.upsert( { @@ -570,7 +570,7 @@ class LightRAG: chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { **dp, - "full_doc_id": id_doc_processing_status, + "full_doc_id": doc_id, } for dp in self.chunking_func( status_doc.content, @@ -627,7 +627,7 @@ class LightRAG: } ) continue - logger.info(f"Completed batch {batch_idx + 1} of {len(batch_docs_list)}.") + logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: try: From 2b99637584679ae1b69882172021c4441624b06b Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 20:41:18 +0100 Subject: [PATCH 08/15] improved paralle --- lightrag/lightrag.py | 90 ++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 178d8a49..1fcda5df 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -524,15 +524,25 @@ class LightRAG: 3. Process each chunk for entity and relation extraction 4. Update the document status """ + + async def insert_full_doc(doc_id: str, content: str): + # Check if document is already processed + doc = await self.full_docs.get_by_id(doc_id) + if not doc: + await self.full_docs.upsert({doc_id: {"content": content}}) + + async def insert_doc_status(doc_id: str, chunks: dict[str, Any]): + # Check if chunks are already processed + doc = await self.text_chunks.get_by_id(doc_id) + if not doc: + await self.text_chunks.upsert(chunks) + # 1. get all pending and failed documents to_process_docs: dict[str, DocProcessingStatus] = {} # Fetch failed documents - failed_docs = await self.doc_status.get_failed_docs() - to_process_docs.update(failed_docs) - - pending_docs = await self.doc_status.get_pending_docs() - to_process_docs.update(pending_docs) + to_process_docs.update(await self.doc_status.get_failed_docs()) + to_process_docs.update(await self.doc_status.get_pending_docs()) if not to_process_docs: logger.info("All documents have been processed or are duplicates") @@ -545,11 +555,10 @@ class LightRAG: for i in range(0, len(to_process_docs), batch_size) ] - # 3. iterate over batches - tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} - logger.info(f"Number of batches to process: {len(docs_batches)}.") + # 3. iterate over batches + tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} for batch_idx, docs_batch in enumerate(docs_batches): # 4. iterate over batch for doc_id_processing_status in docs_batch: @@ -586,47 +595,35 @@ class LightRAG: await self.chunks_vdb.upsert(chunks) await self._process_entity_relation_graph(chunks) - tasks[doc_id] = [] - - # Check if document already processed the doc - if await self.full_docs.get_by_id(doc_id) is None: - tasks[doc_id].append( - self.full_docs.upsert({doc_id: {"content": status_doc.content}}) - ) - - # Check if chunks already processed the doc - if await self.text_chunks.get_by_id(doc_id) is None: - tasks[doc_id].append(self.text_chunks.upsert(chunks)) - # Process document (text chunks and full docs) in parallel - for task_doc_id, task in tasks.items(): - try: - await asyncio.gather(*task) - await self.doc_status.upsert( - { - task_doc_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "updated_at": datetime.now().isoformat(), - } + tasks = [] + tasks.append(insert_full_doc(doc_id, status_doc.content)) + tasks.append(insert_doc_status(doc_id, chunks)) + try: + await asyncio.gather(*tasks) + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "updated_at": datetime.now().isoformat(), } - ) - await self._insert_done() + } + ) + await self._insert_done() - except Exception as e: - logger.error( - f"Failed to process document {task_doc_id}: {str(e)}" - ) - await self.doc_status.upsert( - { - task_doc_id: { - "status": DocStatus.FAILED, - "error": str(e), - "updated_at": datetime.now().isoformat(), - } + except Exception as e: + logger.error(f"Failed to process document {doc_id}: {str(e)}") + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "updated_at": datetime.now().isoformat(), } - ) - continue + } + ) + continue logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: @@ -640,8 +637,9 @@ class LightRAG: global_config=asdict(self), ) if new_kg is None: - logger.info("No entities or relationships extracted!") + logger.info("No new entities or relationships extracted.") else: + logger.info("New entities or relationships extracted.") self.chunk_entity_relation_graph = new_kg except Exception as e: From 3e49ced307af5dcaf39ea18b4e44add5a401bf28 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:03:14 +0100 Subject: [PATCH 09/15] Improved paralle --- lightrag/lightrag.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 1fcda5df..440dbf78 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -541,8 +541,10 @@ class LightRAG: to_process_docs: dict[str, DocProcessingStatus] = {} # Fetch failed documents - to_process_docs.update(await self.doc_status.get_failed_docs()) - to_process_docs.update(await self.doc_status.get_pending_docs()) + failed_docs = await self.doc_status.get_failed_docs() + to_process_docs.update(failed_docs) + pendings_docs = await self.doc_status.get_pending_docs() + to_process_docs.update(pendings_docs) if not to_process_docs: logger.info("All documents have been processed or are duplicates") @@ -558,15 +560,15 @@ class LightRAG: logger.info(f"Number of batches to process: {len(docs_batches)}.") # 3. iterate over batches - tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} for batch_idx, docs_batch in enumerate(docs_batches): # 4. iterate over batch for doc_id_processing_status in docs_batch: doc_id, status_doc = doc_id_processing_status # Update status in processing + doc_status_id = compute_mdhash_id(status_doc.content, prefix="doc-") await self.doc_status.upsert( { - doc_id: { + doc_status_id: { "status": DocStatus.PROCESSING, "updated_at": datetime.now().isoformat(), "content_summary": status_doc.content_summary, @@ -591,32 +593,32 @@ class LightRAG: ) } - # Ensure chunk insertion and graph processing happen sequentially, not in parallel - await self.chunks_vdb.upsert(chunks) - await self._process_entity_relation_graph(chunks) - # Process document (text chunks and full docs) in parallel - tasks = [] + tasks: list[Coroutine[Any, Any, None]] = [] + tasks.append(self.chunks_vdb.upsert(chunks)) + tasks.append(self._process_entity_relation_graph(chunks)) tasks.append(insert_full_doc(doc_id, status_doc.content)) tasks.append(insert_doc_status(doc_id, chunks)) + try: await asyncio.gather(*tasks) await self.doc_status.upsert( { - doc_id: { + doc_status_id: { "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "updated_at": datetime.now().isoformat(), } } ) + logger.error(f"Complete {doc_id}") await self._insert_done() except Exception as e: logger.error(f"Failed to process document {doc_id}: {str(e)}") await self.doc_status.upsert( { - doc_id: { + doc_status_id: { "status": DocStatus.FAILED, "error": str(e), "updated_at": datetime.now().isoformat(), From 068924b59f6bce7c43163603644b52866f5f0090 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:12:39 +0100 Subject: [PATCH 10/15] updated type --- lightrag/kg/jsondocstatus_impl.py | 12 ++---------- lightrag/lightrag.py | 4 ++-- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 35d31fba..1750b202 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -122,16 +122,8 @@ class JsonDocStatusStorage(DocStatusStorage): self._data.update(data) await self.index_done_callback() - async def get_by_id(self, id: str) -> dict[str, Any]: - return self._data.get(id, {}) - - async def get(self, doc_id: str) -> Union[DocProcessingStatus, None]: - """Get document status by ID""" - data = self._data.get(doc_id) - if data: - return DocProcessingStatus(**data) - else: - return None + async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: + return self._data.get(id) async def delete(self, doc_ids: list[str]): """Delete document status by IDs""" diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 440dbf78..40cc8978 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -531,7 +531,7 @@ class LightRAG: if not doc: await self.full_docs.upsert({doc_id: {"content": content}}) - async def insert_doc_status(doc_id: str, chunks: dict[str, Any]): + async def insert_text_chunks(doc_id: str, chunks: dict[str, Any]): # Check if chunks are already processed doc = await self.text_chunks.get_by_id(doc_id) if not doc: @@ -598,7 +598,7 @@ class LightRAG: tasks.append(self.chunks_vdb.upsert(chunks)) tasks.append(self._process_entity_relation_graph(chunks)) tasks.append(insert_full_doc(doc_id, status_doc.content)) - tasks.append(insert_doc_status(doc_id, chunks)) + tasks.append(insert_text_chunks(doc_id, chunks)) try: await asyncio.gather(*tasks) From f37f2c755538eaee95173f40fc419302078c05bb Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:17:09 +0100 Subject: [PATCH 11/15] improved parallele --- lightrag/lightrag.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 40cc8978..ce029afe 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -494,10 +494,8 @@ class LightRAG: # 3. Filter out already processed documents # Get docs ids all_new_doc_ids = set(new_docs.keys()) - # Retrieve IDs that are already being processed - existing_ids = await self.doc_status.filter_keys(all_new_doc_ids) # Exclude IDs of documents that are already in progress - unique_new_doc_ids = all_new_doc_ids - existing_ids + unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids) # Filter new_docs to only include documents with unique IDs new_docs = {doc_id: new_docs[doc_id] for doc_id in unique_new_doc_ids} From 9f6621454ec05ef7ac50d615c60d6d9d2fedfbab Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:24:13 +0100 Subject: [PATCH 12/15] fixed ids --- lightrag/kg/json_kv_impl.py | 2 +- lightrag/kg/jsondocstatus_impl.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index f5df0833..cfd67367 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -39,7 +39,7 @@ class JsonKVStorage(BaseKVStorage): ] async def filter_keys(self, data: set[str]) -> set[str]: - return set(self._data.keys()).difference(data) + return set(data) - set(self._data.keys()) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: left_data = {k: v for k, v in data.items() if k not in self._data} diff --git a/lightrag/kg/jsondocstatus_impl.py b/lightrag/kg/jsondocstatus_impl.py index 1750b202..675cf643 100644 --- a/lightrag/kg/jsondocstatus_impl.py +++ b/lightrag/kg/jsondocstatus_impl.py @@ -48,8 +48,8 @@ Usage: """ -import os from dataclasses import dataclass +import os from typing import Any, Union from lightrag.base import ( @@ -76,7 +76,7 @@ class JsonDocStatusStorage(DocStatusStorage): async def filter_keys(self, data: set[str]) -> set[str]: """Return keys that should be processed (not in storage or not successfully processed)""" - return set(self._data.keys()).difference(data) + return set(data) - set(self._data.keys()) async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: result: list[dict[str, Any]] = [] From 07f31606d223e84593b9bd2ae174662c3d7c98a0 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:34:38 +0100 Subject: [PATCH 13/15] cleaned code --- lightrag/lightrag.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index ce029afe..5a6adfad 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -609,7 +609,6 @@ class LightRAG: } } ) - logger.error(f"Complete {doc_id}") await self._insert_done() except Exception as e: From a47e09c69e0770e1eb86a24f88bde47bf193bd7a Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:42:04 +0100 Subject: [PATCH 14/15] improving ainsert_custom_chunks paralelism --- lightrag/lightrag.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 5a6adfad..5f711335 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -437,27 +437,13 @@ class LightRAG: logger.warning("All chunks are already in the storage.") return - logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks") - - await self.chunks_vdb.upsert(inserting_chunks) - - logger.info("[Entity Extraction]...") - maybe_new_kg = await extract_entities( - inserting_chunks, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - global_config=asdict(self), - ) - - if maybe_new_kg is None: - logger.warning("No new entities and relationships found") - return - else: - self.chunk_entity_relation_graph = maybe_new_kg - - await self.full_docs.upsert(new_docs) - await self.text_chunks.upsert(inserting_chunks) + tasks = [ + self.chunks_vdb.upsert(inserting_chunks), + self._process_entity_relation_graph(inserting_chunks), + self.full_docs.upsert(new_docs), + self.text_chunks.upsert(inserting_chunks), + ] + await asyncio.gather(*tasks) finally: if update_storage: From bfd280450abb5ce36d6f30a93e0b02b5776cfdc3 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 21:48:19 +0100 Subject: [PATCH 15/15] cleaned code --- lightrag/lightrag.py | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 5f711335..d92b5ea4 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -1,6 +1,5 @@ import asyncio import os -from collections.abc import Coroutine from dataclasses import asdict, dataclass, field from datetime import datetime from functools import partial @@ -508,19 +507,6 @@ class LightRAG: 3. Process each chunk for entity and relation extraction 4. Update the document status """ - - async def insert_full_doc(doc_id: str, content: str): - # Check if document is already processed - doc = await self.full_docs.get_by_id(doc_id) - if not doc: - await self.full_docs.upsert({doc_id: {"content": content}}) - - async def insert_text_chunks(doc_id: str, chunks: dict[str, Any]): - # Check if chunks are already processed - doc = await self.text_chunks.get_by_id(doc_id) - if not doc: - await self.text_chunks.upsert(chunks) - # 1. get all pending and failed documents to_process_docs: dict[str, DocProcessingStatus] = {} @@ -578,12 +564,12 @@ class LightRAG: } # Process document (text chunks and full docs) in parallel - tasks: list[Coroutine[Any, Any, None]] = [] - tasks.append(self.chunks_vdb.upsert(chunks)) - tasks.append(self._process_entity_relation_graph(chunks)) - tasks.append(insert_full_doc(doc_id, status_doc.content)) - tasks.append(insert_text_chunks(doc_id, chunks)) - + tasks = [ + self.chunks_vdb.upsert(chunks), + self._process_entity_relation_graph(chunks), + self.full_docs.upsert({doc_id: {"content": status_doc.content}}), + self.text_chunks.upsert(chunks), + ] try: await asyncio.gather(*tasks) await self.doc_status.upsert(