diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 445008ec..e683b30b 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -443,6 +443,7 @@ async def pipeline_index_texts(rag: LightRAG, texts: List[str]): await rag.apipeline_process_enqueue_documents() +# TODO: deprecate after /insert_file is removed async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path: """Save the uploaded file to a temporary location @@ -645,6 +646,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + # TODO: deprecated, use /upload instead @router.post( "/file", response_model=InsertResponse, dependencies=[Depends(combined_auth)] ) @@ -688,6 +690,7 @@ def create_document_routes( logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + # TODO: deprecated, use /upload instead @router.post( "/file_batch", response_model=InsertResponse, @@ -758,26 +761,151 @@ def create_document_routes( """ Clear all documents from the RAG system. - This endpoint deletes all text chunks, entities vector database, and relationships - vector database, effectively clearing all documents from the RAG system. + This endpoint deletes all documents, entities, relationships, and files from the system. + It uses the storage drop methods to properly clean up all data and removes all files + from the input directory. Returns: InsertResponse: A response object containing the status and message. Raises: - HTTPException: If an error occurs during the clearing process (500). + HTTPException: If an error occurs during the clearing process (500) or if + the pipeline is busy (400). """ + from lightrag.kg.shared_storage import get_namespace_data, get_pipeline_status_lock + + # Get pipeline status and lock + pipeline_status = await get_namespace_data("pipeline_status") + pipeline_status_lock = get_pipeline_status_lock() + + # Check and set status with lock + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return InsertResponse( + status="error", + message="Cannot clear documents while pipeline is busy" + ) + # Set busy to true + pipeline_status["busy"] = True + pipeline_status["job_name"] = "Clearing Documents" + pipeline_status["latest_message"] = "Starting document clearing process" + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append("Starting document clearing process") + try: - rag.text_chunks = [] - rag.entities_vdb = None - rag.relationships_vdb = None + # Use drop method to clear all data + drop_tasks = [] + storages = [ + rag.text_chunks, + rag.full_docs, + rag.entities_vdb, + rag.relationships_vdb, + rag.chunks_vdb, + rag.chunk_entity_relation_graph, + rag.doc_status + ] + + # Log storage drop start + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append("Starting to drop storage components") + + for storage in storages: + if storage is not None: + drop_tasks.append(storage.drop()) + + # Wait for all drop tasks to complete + drop_results = await asyncio.gather(*drop_tasks, return_exceptions=True) + + # Check for errors and log results + errors = [] + storage_success_count = 0 + storage_error_count = 0 + + for i, result in enumerate(drop_results): + storage_name = storages[i].__class__.__name__ + if isinstance(result, Exception): + error_msg = f"Error dropping {storage_name}: {str(result)}" + errors.append(error_msg) + logger.error(error_msg) + storage_error_count += 1 + else: + logger.info(f"Successfully dropped {storage_name}") + storage_success_count += 1 + + # Log storage drop results + if "history_messages" in pipeline_status: + if storage_error_count > 0: + pipeline_status["history_messages"].append( + f"Dropped {storage_success_count} storage components with {storage_error_count} errors" + ) + else: + pipeline_status["history_messages"].append( + f"Successfully dropped all {storage_success_count} storage components" + ) + + # Log file deletion start + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append("Starting to delete files in input directory") + + # Delete all files in input_dir + deleted_files_count = 0 + file_errors_count = 0 + + for file_path in doc_manager.input_dir.glob("**/*"): + if file_path.is_file(): + try: + file_path.unlink() + deleted_files_count += 1 + except Exception as e: + logger.error(f"Error deleting file {file_path}: {str(e)}") + file_errors_count += 1 + + # Log file deletion results + if "history_messages" in pipeline_status: + if file_errors_count > 0: + pipeline_status["history_messages"].append( + f"Deleted {deleted_files_count} files with {file_errors_count} errors" + ) + errors.append(f"Failed to delete {file_errors_count} files") + else: + pipeline_status["history_messages"].append( + f"Successfully deleted {deleted_files_count} files" + ) + + # Prepare final result message + final_message = "" + if errors: + final_message = f"Cleared documents with some errors. Deleted {deleted_files_count} files." + status = "partial_success" + else: + final_message = f"All documents cleared successfully. Deleted {deleted_files_count} files." + status = "success" + + + # Log final result + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(final_message) + + # Return response based on results return InsertResponse( - status="success", message="All documents cleared successfully" + status=status, + message=final_message ) except Exception as e: - logger.error(f"Error DELETE /documents: {str(e)}") + error_msg = f"Error clearing documents: {str(e)}" + logger.error(error_msg) logger.error(traceback.format_exc()) + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(error_msg) raise HTTPException(status_code=500, detail=str(e)) + finally: + # Reset busy status after completion + async with pipeline_status_lock: + pipeline_status["busy"] = False + completion_msg = "Document clearing process completed" + pipeline_status["latest_message"] = completion_msg + if "history_messages" in pipeline_status: + pipeline_status["history_messages"].append(completion_msg) @router.get( "/pipeline_status",