From c854aabde09b569e15721212a31a285206a1e07f Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Mar 2025 15:25:10 +0800 Subject: [PATCH] Add process ID to log messages for better multi-process debugging clarity - Add PID to KV and Neo4j storage logs - Add PID to query context logs - Improve KV data count logging for llm cache --- lightrag/kg/json_doc_status_impl.py | 3 ++- lightrag/kg/json_kv_impl.py | 28 +++++++++++++++++++++++++--- lightrag/kg/neo4j_impl.py | 2 +- lightrag/operate.py | 2 ++ 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/lightrag/kg/json_doc_status_impl.py b/lightrag/kg/json_doc_status_impl.py index 824bd052..e05c04f6 100644 --- a/lightrag/kg/json_doc_status_impl.py +++ b/lightrag/kg/json_doc_status_impl.py @@ -40,7 +40,7 @@ class JsonDocStatusStorage(DocStatusStorage): async with self._storage_lock: self._data.update(loaded_data) logger.info( - f"Loaded document status storage with {len(loaded_data)} records" + f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records" ) async def filter_keys(self, keys: set[str]) -> set[str]: @@ -90,6 +90,7 @@ class JsonDocStatusStorage(DocStatusStorage): data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) + logger.info(f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}") write_json(data_dict, self._file_name) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 96217d4b..c0aa81b2 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -35,13 +35,34 @@ class JsonKVStorage(BaseKVStorage): loaded_data = load_json(self._file_name) or {} async with self._storage_lock: self._data.update(loaded_data) - logger.info(f"Load KV {self.namespace} with {len(loaded_data)} data") + + # Calculate data count based on namespace + if self.namespace.endswith("cache"): + # For cache namespaces, sum the cache entries across all cache types + data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values() + if isinstance(first_level_dict, dict)) + else: + # For non-cache namespaces, use the original count method + data_count = len(loaded_data) + + logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records") async def index_done_callback(self) -> None: async with self._storage_lock: data_dict = ( dict(self._data) if hasattr(self._data, "_getvalue") else self._data ) + + # Calculate data count based on namespace + if self.namespace.endswith("cache"): + # # For cache namespaces, sum the cache entries across all cache types + data_count = sum(len(first_level_dict) for first_level_dict in data_dict.values() + if isinstance(first_level_dict, dict)) + else: + # For non-cache namespaces, use the original count method + data_count = len(data_dict) + + logger.info(f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}") write_json(data_dict, self._file_name) async def get_all(self) -> dict[str, Any]: @@ -73,12 +94,13 @@ class JsonKVStorage(BaseKVStorage): return set(keys) - set(self._data.keys()) async def upsert(self, data: dict[str, dict[str, Any]]) -> None: - logger.info(f"Inserting {len(data)} to {self.namespace}") if not data: return async with self._storage_lock: left_data = {k: v for k, v in data.items() if k not in self._data} - self._data.update(left_data) + if left_data: + logger.info(f"Process {os.getpid()} KV inserting {len(left_data)} to {self.namespace}") + self._data.update(left_data) async def delete(self, ids: list[str]) -> None: async with self._storage_lock: diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index d0841eec..8d5a1a55 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -842,7 +842,7 @@ class Neo4JStorage(BaseGraphStorage): seen_edges.add(edge_id) logger.info( - f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}" + f"Process {os.getpid()} graph query return: {len(result.nodes)} nodes, {len(result.edges)} edges" ) finally: await result_set.consume() # Ensure result set is consumed diff --git a/lightrag/operate.py b/lightrag/operate.py index ce686feb..d16e170c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import json import re +import os from typing import Any, AsyncIterator from collections import Counter, defaultdict @@ -1027,6 +1028,7 @@ async def _build_query_context( text_chunks_db: BaseKVStorage, query_param: QueryParam, ): + logger.info(f"Process {os.getpid()} buidling query context...") if query_param.mode == "local": entities_context, relations_context, text_units_context = await _get_node_data( ll_keywords,