Add pipeline status control for concurrent document indexing processes

• Add shared pipeline status namespace
• Implement concurrent process control
• Add request queuing for pending jobs
This commit is contained in:
yangdx
2025-02-28 11:52:42 +08:00
parent feaa7ce69d
commit b2da69b7f1
2 changed files with 176 additions and 104 deletions

View File

@@ -81,6 +81,18 @@ def initialize_share_data(workers: int = 1):
# Mark as initialized # Mark as initialized
_initialized = True _initialized = True
# Initialize pipeline status for document indexing control
pipeline_namespace = get_namespace_data("pipeline_status")
pipeline_namespace.update({
"busy": False, # Control concurrent processes
"job_name": "Default Job", # Current job name (indexing files/indexing texts)
"job_start": None, # Job start time
"docs": 0, # Total number of documents to be indexed
"batchs": 0, # Number of batches for processing documents
"cur_batch": 0, # Current processing batch
"request_pending": False, # Flag for pending request for processing
})
def try_initialize_namespace(namespace: str) -> bool: def try_initialize_namespace(namespace: str) -> bool:
""" """

View File

@@ -273,8 +273,6 @@ class LightRAG:
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
initialize_share_data, initialize_share_data,
try_initialize_namespace,
get_namespace_data,
) )
initialize_share_data() initialize_share_data()
@@ -672,8 +670,39 @@ class LightRAG:
3. Process each chunk for entity and relation extraction 3. Process each chunk for entity and relation extraction
4. Update the document status 4. Update the document status
""" """
from lightrag.kg.shared_storage import get_namespace_data, get_storage_lock
# Get pipeline status shared data and lock
pipeline_status = get_namespace_data("pipeline_status")
storage_lock = get_storage_lock()
# Check if another process is already processing the queue
process_documents = False
with storage_lock:
if not pipeline_status.get("busy", False):
# No other process is busy, we can process documents
pipeline_status.update({
"busy": True,
"job_name": "indexing files",
"job_start": datetime.now().isoformat(),
"docs": 0,
"batchs": 0,
"cur_batch": 0,
"request_pending": False # Clear any previous request
})
process_documents = True
else:
# Another process is busy, just set request flag and return
pipeline_status["request_pending"] = True
logger.info("Another process is already processing the document queue. Request queued.")
if not process_documents:
return
try:
# Process documents until no more documents or requests
while True:
# 1. Get all pending, failed, and abnormally terminated processing documents. # 1. Get all pending, failed, and abnormally terminated processing documents.
# Run the asynchronous status retrievals in parallel using asyncio.gather
processing_docs, failed_docs, pending_docs = await asyncio.gather( processing_docs, failed_docs, pending_docs = await asyncio.gather(
self.doc_status.get_docs_by_status(DocStatus.PROCESSING), self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
self.doc_status.get_docs_by_status(DocStatus.FAILED), self.doc_status.get_docs_by_status(DocStatus.FAILED),
@@ -687,7 +716,11 @@ class LightRAG:
if not to_process_docs: if not to_process_docs:
logger.info("All documents have been processed or are duplicates") logger.info("All documents have been processed or are duplicates")
return break
# Update pipeline status with document count (with lock)
with storage_lock:
pipeline_status["docs"] = len(to_process_docs)
# 2. split docs into chunks, insert chunks, update doc status # 2. split docs into chunks, insert chunks, update doc status
docs_batches = [ docs_batches = [
@@ -695,11 +728,19 @@ class LightRAG:
for i in range(0, len(to_process_docs), self.max_parallel_insert) for i in range(0, len(to_process_docs), self.max_parallel_insert)
] ]
# Update pipeline status with batch information (directly, as it's atomic)
pipeline_status.update({
"batchs": len(docs_batches),
"cur_batch": 0
})
logger.info(f"Number of batches to process: {len(docs_batches)}.") logger.info(f"Number of batches to process: {len(docs_batches)}.")
batches: list[Any] = [] batches: list[Any] = []
# 3. iterate over batches # 3. iterate over batches
for batch_idx, docs_batch in enumerate(docs_batches): for batch_idx, docs_batch in enumerate(docs_batches):
# Update current batch in pipeline status (directly, as it's atomic)
pipeline_status["cur_batch"] = batch_idx + 1
async def batch( async def batch(
batch_idx: int, batch_idx: int,
@@ -784,6 +825,25 @@ class LightRAG:
await asyncio.gather(*batches) await asyncio.gather(*batches)
await self._insert_done() await self._insert_done()
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
with storage_lock:
has_pending_request = pipeline_status.get("request_pending", False)
if has_pending_request:
# Clear the request flag before checking for more documents
pipeline_status["request_pending"] = False
if not has_pending_request:
break
logger.info("Processing additional documents due to pending request")
finally:
# Always reset busy status when done or if an exception occurs (with lock)
with storage_lock:
pipeline_status["busy"] = False
logger.info("Document processing pipeline completed")
async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
try: try:
await extract_entities( await extract_entities(