feat(api): Enhance document clearing functionality

- Use storage drop methods to properly clean up all data
- Add file deletion from input directory
- Add pipeline status checking and locking mechanism
- Improve error handling with detailed logging and pipeline message tracking
This commit is contained in:
yangdx
2025-03-30 16:30:41 +08:00
parent 53ab5e015c
commit 949a3904a9

View File

@@ -443,6 +443,7 @@ async def pipeline_index_texts(rag: LightRAG, texts: List[str]):
await rag.apipeline_process_enqueue_documents() await rag.apipeline_process_enqueue_documents()
# TODO: deprecate after /insert_file is removed
async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path: async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
"""Save the uploaded file to a temporary location """Save the uploaded file to a temporary location
@@ -645,6 +646,7 @@ def create_document_routes(
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# TODO: deprecated, use /upload instead
@router.post( @router.post(
"/file", response_model=InsertResponse, dependencies=[Depends(combined_auth)] "/file", response_model=InsertResponse, dependencies=[Depends(combined_auth)]
) )
@@ -688,6 +690,7 @@ def create_document_routes(
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# TODO: deprecated, use /upload instead
@router.post( @router.post(
"/file_batch", "/file_batch",
response_model=InsertResponse, response_model=InsertResponse,
@@ -758,26 +761,151 @@ def create_document_routes(
""" """
Clear all documents from the RAG system. Clear all documents from the RAG system.
This endpoint deletes all text chunks, entities vector database, and relationships This endpoint deletes all documents, entities, relationships, and files from the system.
vector database, effectively clearing all documents from the RAG system. It uses the storage drop methods to properly clean up all data and removes all files
from the input directory.
Returns: Returns:
InsertResponse: A response object containing the status and message. InsertResponse: A response object containing the status and message.
Raises: Raises:
HTTPException: If an error occurs during the clearing process (500). HTTPException: If an error occurs during the clearing process (500) or if
the pipeline is busy (400).
""" """
try: from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock
rag.text_chunks = []
rag.entities_vdb = None # Get pipeline status and lock
rag.relationships_vdb = None pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Check and set status with lock
async with pipeline_status_lock:
if pipeline_status.get("busy", False):
return InsertResponse( return InsertResponse(
status="success", message="All documents cleared successfully" status="error",
message="Cannot clear documents while pipeline is busy"
)
# Set busy to true
pipeline_status["busy"] = True
pipeline_status["job_name"] = "Clearing Documents"
pipeline_status["latest_message"] = "Starting document clearing process"
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting document clearing process")
try:
# Use drop method to clear all data
drop_tasks = []
storages = [
rag.text_chunks,
rag.full_docs,
rag.entities_vdb,
rag.relationships_vdb,
rag.chunks_vdb,
rag.chunk_entity_relation_graph,
rag.doc_status
]
# Log storage drop start
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting to drop storage components")
for storage in storages:
if storage is not None:
drop_tasks.append(storage.drop())
# Wait for all drop tasks to complete
drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True)
# Check for errors and log results
errors = []
storage_success_count = 0
storage_error_count = 0
for i, result in enumerate(drop_results):
storage_name = storages[i].__class__.__name__
if isinstance(result, Exception):
error_msg = f"Error dropping {storage_name}: {str(result)}"
errors.append(error_msg)
logger.error(error_msg)
storage_error_count += 1
else:
logger.info(f"Successfully dropped {storage_name}")
storage_success_count += 1
# Log storage drop results
if "history_messages" in pipeline_status:
if storage_error_count > 0:
pipeline_status["history_messages"].append(
f"Dropped {storage_success_count} storage components with {storage_error_count} errors"
)
else:
pipeline_status["history_messages"].append(
f"Successfully dropped all {storage_success_count} storage components"
)
# Log file deletion start
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append("Starting to delete files in input directory")
# Delete all files in input_dir
deleted_files_count = 0
file_errors_count = 0
for file_path in doc_manager.input_dir.glob("**/*"):
if file_path.is_file():
try:
file_path.unlink()
deleted_files_count += 1
except Exception as e:
logger.error(f"Error deleting file {file_path}: {str(e)}")
file_errors_count += 1
# Log file deletion results
if "history_messages" in pipeline_status:
if file_errors_count > 0:
pipeline_status["history_messages"].append(
f"Deleted {deleted_files_count} files with {file_errors_count} errors"
)
errors.append(f"Failed to delete {file_errors_count} files")
else:
pipeline_status["history_messages"].append(
f"Successfully deleted {deleted_files_count} files"
)
# Prepare final result message
final_message = ""
if errors:
final_message = f"Cleared documents with some errors. Deleted {deleted_files_count} files."
status = "partial_success"
else:
final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files."
status = "success"
# Log final result
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(final_message)
# Return response based on results
return InsertResponse(
status=status,
message=final_message
) )
except Exception as e: except Exception as e:
logger.error(f"Error DELETE /documents: {str(e)}") error_msg = f"Error clearing documents: {str(e)}"
logger.error(error_msg)
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(error_msg)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
finally:
# Reset busy status after completion
async with pipeline_status_lock:
pipeline_status["busy"] = False
completion_msg = "Document clearing process completed"
pipeline_status["latest_message"] = completion_msg
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].append(completion_msg)
@router.get( @router.get(
"/pipeline_status", "/pipeline_status",