diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index f9196cf3..985fd329 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -563,29 +563,29 @@ class LightRAG: pending_doc_ids[i : i + batch_size] for i in range(0, len(pending_doc_ids), batch_size) ] batch_len = len(batch_docs_list) + 1 + # 3. iterate over batches tasks: dict[str, list[Coroutine[Any, Any, None]]] = {} for batch_idx, doc_ids in enumerate(batch_docs_list): - - doc_status: dict[str, Any] = { - "status": DocStatus.PROCESSING, - "updated_at": datetime.now().isoformat(), - } + # 4. iterate over batch for doc_id in tqdm_async( doc_ids, desc=f"Level 1 - Batch {batch_idx} / {batch_len}", ): - doc = await self.doc_status.get_by_id(doc_id) - doc_status.update( + # Update status in processing + status_doc = await self.doc_status.get_by_id(doc_id) + await self.doc_status.upsert( { - "content_summary": doc["content_summary"], - "content_length": doc["content_length"], - "created_at": doc["created_at"], + doc_id: { + "status": DocStatus.PROCESSING, + "updated_at": datetime.now().isoformat(), + "content_summary": status_doc["content_summary"], + "content_length": status_doc["content_length"], + "created_at": status_doc["created_at"], + } } ) - await self.doc_status.upsert({doc_id: doc_status}) - # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { @@ -593,7 +593,7 @@ class LightRAG: "full_doc_id": doc_id, } for dp in self.chunking_func( - doc["content"], + status_doc["content"], split_by_character, split_by_character_only, self.chunk_overlap_token_size, @@ -601,57 +601,47 @@ class LightRAG: self.tiktoken_model_name, ) } - try: - # If fails it's failed on full doc and text chunks upset - if doc["status"] != DocStatus.FAILED: - # Ensure chunk insertion and graph processing happen sequentially - await self._process_entity_relation_graph(chunks) - await self.chunks_vdb.upsert(chunks) - except Exception as e: - doc_status.update( - { - "status": DocStatus.PENDING, - "error": str(e), - "updated_at": datetime.now().isoformat(), - } - ) - await self.doc_status.upsert({doc_id: doc_status}) + + # Ensure chunk insertion and graph processing happen sequentially, not in parallel + await self._process_entity_relation_graph(chunks) + await self.chunks_vdb.upsert(chunks) + # Check if document already processed the doc if doc_id not in full_docs_processed_doc_ids: tasks[doc_id].append( - self.full_docs.upsert({doc_id: {"content": doc["content"]}}) + self.full_docs.upsert({doc_id: {"content": status_doc["content"]}}) ) - + + # check if chunks already processed the doc if doc_id not in text_chunks_processed_doc_ids: tasks[doc_id].append(self.text_chunks.upsert(chunks)) for doc_id, task in tasks.items(): try: await asyncio.gather(*task) - - # Update document status - doc_status.update( + await self.doc_status.upsert( { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "updated_at": datetime.now().isoformat(), + doc_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "updated_at": datetime.now().isoformat(), + } } ) - await self.doc_status.upsert({doc_id: doc_status}) await self._insert_done() except Exception as e: - # Update status with failed information - doc_status.update( - { - "status": DocStatus.FAILED, - "error": str(e), - "updated_at": datetime.now().isoformat(), - } - ) - await self.doc_status.upsert({doc_id: doc_status}) logger.error( f"Failed to process document {doc_id}: {str(e)}\n{traceback.format_exc()}" + ) + await self.doc_status.upsert( + { + doc_id: { + "status": DocStatus.FAILED, + "error": str(e), + "updated_at": datetime.now().isoformat(), + } + } ) continue @@ -674,102 +664,6 @@ class LightRAG: logger.error("Failed to extract entities and relationships") raise e - # async def apipeline_process_extract_graph(self): - # """ - # Process pending or failed chunks to extract entities and relationships. - - # This method retrieves all chunks that are currently marked as pending or have previously failed. - # It then extracts entities and relationships from each chunk and updates the status accordingly. - - # Steps: - # 1. Retrieve all pending and failed chunks. - # 2. For each chunk, attempt to extract entities and relationships. - # 3. Update the chunk's status to processed if successful, or failed if an error occurs. - - # Raises: - # Exception: If there is an error during the extraction process. - - # Returns: - # None - # """ - # # 1. get all pending and failed chunks - # to_process_doc_keys: list[str] = [] - - # # Process failes - # to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) - # if to_process_docs: - # to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) - - # # Process Pending - # to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) - # if to_process_docs: - # to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) - - # if not to_process_doc_keys: - # logger.info("All documents have been processed or are duplicates") - # return - - # # Process documents in batches - # batch_size = self.addon_params.get("insert_batch_size", 10) - - # semaphore = asyncio.Semaphore( - # batch_size - # ) # Control the number of tasks that are processed simultaneously - - # async def process_chunk(chunk_id: str): - # async with semaphore: - # chunks: dict[str, Any] = { - # i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id]) - # } - # async def _process_chunk(chunk_id: str): - # chunks: dict[str, Any] = { - # i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id]) - # } - - # # Extract and store entities and relationships - # try: - # maybe_new_kg = await extract_entities( - # chunks, - # knowledge_graph_inst=self.chunk_entity_relation_graph, - # entity_vdb=self.entities_vdb, - # relationships_vdb=self.relationships_vdb, - # llm_response_cache=self.llm_response_cache, - # global_config=asdict(self), - # ) - # if maybe_new_kg is None: - # logger.warning("No entities or relationships extracted!") - # # Update status to processed - # await self.text_chunks.upsert(chunks) - # await self.doc_status.upsert({chunk_id: {"status": DocStatus.PROCESSED}}) - # except Exception as e: - # logger.error("Failed to extract entities and relationships") - # # Mark as failed if any step fails - # await self.doc_status.upsert({chunk_id: {"status": DocStatus.FAILED}}) - # raise e - - # with tqdm_async( - # total=len(to_process_doc_keys), - # desc="\nLevel 1 - Processing chunks", - # unit="chunk", - # position=0, - # ) as progress: - # tasks: list[asyncio.Task[None]] = [] - # for chunk_id in to_process_doc_keys: - # task = asyncio.create_task(process_chunk(chunk_id)) - # tasks.append(task) - - # for future in asyncio.as_completed(tasks): - # await future - # progress.update(1) - # progress.set_postfix( - # { - # "LLM call": statistic_data["llm_call"], - # "LLM cache": statistic_data["llm_cache"], - # } - # ) - - # # Ensure all indexes are updated after each document - async def _insert_done(self): tasks = [] for storage_inst in [