Fix linting

This commit is contained in:
yangdx
2025-02-28 21:35:04 +08:00
parent c37b1e8aa7
commit c973498c34
8 changed files with 136 additions and 130 deletions

View File

@@ -274,6 +274,7 @@ class LightRAG:
from lightrag.kg.shared_storage import (
initialize_share_data,
)
initialize_share_data()
if not os.path.exists(self.working_dir):
@@ -671,44 +672,45 @@ class LightRAG:
4. Update the document status
"""
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
# Get pipeline status shared data and lock
pipeline_status = get_namespace_data("pipeline_status")
storage_lock = get_storage_lock()
# Check if another process is already processing the queue
process_documents = False
with storage_lock:
# Ensure only one worker is processing documents
if not pipeline_status.get("busy", False):
# No other process is busy, we can process documents
# 获取当前的 history_messages 列表
# Cleaning history_messages without breaking it as a shared list object
current_history = pipeline_status.get("history_messages", [])
# 清空当前列表内容但保持同一个列表对象
if hasattr(current_history, "clear"):
current_history.clear()
pipeline_status.update({
"busy": True,
"job_name": "indexing files",
"job_start": datetime.now().isoformat(),
"docs": 0,
"batchs": 0,
"cur_batch": 0,
"request_pending": False, # Clear any previous request
"latest_message": "",
# 保持使用同一个列表对象
"history_messages": current_history,
})
pipeline_status.update(
{
"busy": True,
"job_name": "indexing files",
"job_start": datetime.now().isoformat(),
"docs": 0,
"batchs": 0,
"cur_batch": 0,
"request_pending": False, # Clear any previous request
"latest_message": "",
"history_messages": current_history, # keep it as a shared list object
}
)
process_documents = True
else:
# Another process is busy, just set request flag and return
pipeline_status["request_pending"] = True
logger.info("Another process is already processing the document queue. Request queued.")
logger.info(
"Another process is already processing the document queue. Request queued."
)
if not process_documents:
return
try:
# Process documents until no more documents or requests
while True:
@@ -734,7 +736,7 @@ class LightRAG:
# Update pipeline status with document count (with lock)
with storage_lock:
pipeline_status["docs"] = len(to_process_docs)
# 2. split docs into chunks, insert chunks, update doc status
docs_batches = [
list(to_process_docs.items())[i : i + self.max_parallel_insert]
@@ -742,11 +744,8 @@ class LightRAG:
]
# Update pipeline status with batch information (directly, as it's atomic)
pipeline_status.update({
"batchs": len(docs_batches),
"cur_batch": 0
})
pipeline_status.update({"batchs": len(docs_batches), "cur_batch": 0})
log_message = f"Number of batches to process: {len(docs_batches)}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
@@ -757,13 +756,15 @@ class LightRAG:
for batch_idx, docs_batch in enumerate(docs_batches):
# Update current batch in pipeline status (directly, as it's atomic)
pipeline_status["cur_batch"] = batch_idx + 1
async def batch(
batch_idx: int,
docs_batch: list[tuple[str, DocProcessingStatus]],
size_batch: int,
) -> None:
log_message = f"Start processing batch {batch_idx + 1} of {size_batch}."
log_message = (
f"Start processing batch {batch_idx + 1} of {size_batch}."
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
@@ -822,7 +823,9 @@ class LightRAG:
}
)
except Exception as e:
logger.error(f"Failed to process document {doc_id}: {str(e)}")
logger.error(
f"Failed to process document {doc_id}: {str(e)}"
)
await self.doc_status.upsert(
{
doc_id: {
@@ -837,7 +840,9 @@ class LightRAG:
}
)
continue
log_message = f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
log_message = (
f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
@@ -846,7 +851,7 @@ class LightRAG:
await asyncio.gather(*batches)
await self._insert_done()
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
with storage_lock:
@@ -854,15 +859,15 @@ class LightRAG:
if has_pending_request:
# Clear the request flag before checking for more documents
pipeline_status["request_pending"] = False
if not has_pending_request:
break
log_message = "Processing additional documents due to pending request"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
finally:
# Always reset busy status when done or if an exception occurs (with lock)
with storage_lock:
@@ -901,12 +906,13 @@ class LightRAG:
if storage_inst is not None
]
await asyncio.gather(*tasks)
log_message = "All Insert done"
logger.info(log_message)
# 获取 pipeline_status 并更新 latest_message 和 history_messages
from lightrag.kg.shared_storage import get_namespace_data
pipeline_status = get_namespace_data("pipeline_status")
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)