Add process ID to log messages for better multi-process debugging clarity
- Add PID to KV and Neo4j storage logs - Add PID to query context logs - Improve KV data count logging for llm cache
This commit is contained in:
@@ -40,7 +40,7 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
self._data.update(loaded_data)
|
self._data.update(loaded_data)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Loaded document status storage with {len(loaded_data)} records"
|
f"Process {os.getpid()} doc status load {self.namespace} with {len(loaded_data)} records"
|
||||||
)
|
)
|
||||||
|
|
||||||
async def filter_keys(self, keys: set[str]) -> set[str]:
|
async def filter_keys(self, keys: set[str]) -> set[str]:
|
||||||
@@ -90,6 +90,7 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
data_dict = (
|
data_dict = (
|
||||||
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
||||||
)
|
)
|
||||||
|
logger.info(f"Process {os.getpid()} doc status writting {len(data_dict)} records to {self.namespace}")
|
||||||
write_json(data_dict, self._file_name)
|
write_json(data_dict, self._file_name)
|
||||||
|
|
||||||
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
||||||
|
@@ -35,13 +35,34 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
loaded_data = load_json(self._file_name) or {}
|
loaded_data = load_json(self._file_name) or {}
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
self._data.update(loaded_data)
|
self._data.update(loaded_data)
|
||||||
logger.info(f"Load KV {self.namespace} with {len(loaded_data)} data")
|
|
||||||
|
# Calculate data count based on namespace
|
||||||
|
if self.namespace.endswith("cache"):
|
||||||
|
# For cache namespaces, sum the cache entries across all cache types
|
||||||
|
data_count = sum(len(first_level_dict) for first_level_dict in loaded_data.values()
|
||||||
|
if isinstance(first_level_dict, dict))
|
||||||
|
else:
|
||||||
|
# For non-cache namespaces, use the original count method
|
||||||
|
data_count = len(loaded_data)
|
||||||
|
|
||||||
|
logger.info(f"Process {os.getpid()} KV load {self.namespace} with {data_count} records")
|
||||||
|
|
||||||
async def index_done_callback(self) -> None:
|
async def index_done_callback(self) -> None:
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
data_dict = (
|
data_dict = (
|
||||||
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
dict(self._data) if hasattr(self._data, "_getvalue") else self._data
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Calculate data count based on namespace
|
||||||
|
if self.namespace.endswith("cache"):
|
||||||
|
# # For cache namespaces, sum the cache entries across all cache types
|
||||||
|
data_count = sum(len(first_level_dict) for first_level_dict in data_dict.values()
|
||||||
|
if isinstance(first_level_dict, dict))
|
||||||
|
else:
|
||||||
|
# For non-cache namespaces, use the original count method
|
||||||
|
data_count = len(data_dict)
|
||||||
|
|
||||||
|
logger.info(f"Process {os.getpid()} KV writting {data_count} records to {self.namespace}")
|
||||||
write_json(data_dict, self._file_name)
|
write_json(data_dict, self._file_name)
|
||||||
|
|
||||||
async def get_all(self) -> dict[str, Any]:
|
async def get_all(self) -> dict[str, Any]:
|
||||||
@@ -73,11 +94,12 @@ class JsonKVStorage(BaseKVStorage):
|
|||||||
return set(keys) - set(self._data.keys())
|
return set(keys) - set(self._data.keys())
|
||||||
|
|
||||||
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
|
||||||
logger.info(f"Inserting {len(data)} to {self.namespace}")
|
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
async with self._storage_lock:
|
async with self._storage_lock:
|
||||||
left_data = {k: v for k, v in data.items() if k not in self._data}
|
left_data = {k: v for k, v in data.items() if k not in self._data}
|
||||||
|
if left_data:
|
||||||
|
logger.info(f"Process {os.getpid()} KV inserting {len(left_data)} to {self.namespace}")
|
||||||
self._data.update(left_data)
|
self._data.update(left_data)
|
||||||
|
|
||||||
async def delete(self, ids: list[str]) -> None:
|
async def delete(self, ids: list[str]) -> None:
|
||||||
|
@@ -842,7 +842,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
seen_edges.add(edge_id)
|
seen_edges.add(edge_id)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Subgraph query successful | Node count: {len(result.nodes)} | Edge count: {len(result.edges)}"
|
f"Process {os.getpid()} graph query return: {len(result.nodes)} nodes, {len(result.edges)} edges"
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
await result_set.consume() # Ensure result set is consumed
|
await result_set.consume() # Ensure result set is consumed
|
||||||
|
@@ -3,6 +3,7 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
import os
|
||||||
from typing import Any, AsyncIterator
|
from typing import Any, AsyncIterator
|
||||||
from collections import Counter, defaultdict
|
from collections import Counter, defaultdict
|
||||||
|
|
||||||
@@ -1027,6 +1028,7 @@ async def _build_query_context(
|
|||||||
text_chunks_db: BaseKVStorage,
|
text_chunks_db: BaseKVStorage,
|
||||||
query_param: QueryParam,
|
query_param: QueryParam,
|
||||||
):
|
):
|
||||||
|
logger.info(f"Process {os.getpid()} buidling query context...")
|
||||||
if query_param.mode == "local":
|
if query_param.mode == "local":
|
||||||
entities_context, relations_context, text_units_context = await _get_node_data(
|
entities_context, relations_context, text_units_context = await _get_node_data(
|
||||||
ll_keywords,
|
ll_keywords,
|
||||||
|
Reference in New Issue
Block a user