From efdc8a2d26e029cc24902ee7a0df6c9ce1cef7c1 Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Wed, 19 Feb 2025 23:53:25 +0100 Subject: [PATCH] multi batches --- lightrag/lightrag.py | 144 +++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 66 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 91aacddf..fff53623 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -792,85 +792,97 @@ class LightRAG: ] logger.info(f"Number of batches to process: {len(docs_batches)}.") + + batches: list[Any] = [] # 3. iterate over batches for batch_idx, docs_batch in enumerate(docs_batches): - logger.info( - f"Start processing batch {batch_idx + 1} of {len(docs_batches)}." - ) - # 4. iterate over batch - for doc_id_processing_status in docs_batch: - doc_id, status_doc = doc_id_processing_status - # Update status in processing - doc_status_id = compute_mdhash_id(status_doc.content, prefix="doc-") - await self.doc_status.upsert( - { - doc_status_id: { - "status": DocStatus.PROCESSING, - "updated_at": datetime.now().isoformat(), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - } - } - ) - # Generate chunks from document - chunks: dict[str, Any] = { - compute_mdhash_id(dp["content"], prefix="chunk-"): { - **dp, - "full_doc_id": doc_id, - } - for dp in self.chunking_func( - status_doc.content, - split_by_character, - split_by_character_only, - self.chunk_overlap_token_size, - self.chunk_token_size, - self.tiktoken_model_name, - ) - } - # Process document (text chunks and full docs) in parallel - tasks = [ - self.chunks_vdb.upsert(chunks), - self._process_entity_relation_graph(chunks), - self.full_docs.upsert({doc_id: {"content": status_doc.content}}), - self.text_chunks.upsert(chunks), - self.doc_status.upsert( - { - doc_status_id: { - "status": DocStatus.PROCESSED, - "chunks_count": len(chunks), - "content": status_doc.content, - "content_summary": status_doc.content_summary, - "content_length": status_doc.content_length, - "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), - } - } - ), - ] - try: - await asyncio.gather(*tasks) - await self._insert_done() - - except Exception as e: - logger.error(f"Failed to process document {doc_id}: {str(e)}") + async def batch( + batch_idx: int, + docs_batch: list[tuple[str, DocProcessingStatus]], + size_batch: int, + ) -> None: + logger.info(f"Start processing batch {batch_idx + 1} of {size_batch}.") + # 4. iterate over batch + for doc_id_processing_status in docs_batch: + doc_id, status_doc = doc_id_processing_status + # Update status in processing + doc_status_id = compute_mdhash_id(status_doc.content, prefix="doc-") await self.doc_status.upsert( { doc_status_id: { - "status": DocStatus.FAILED, - "error": str(e), + "status": DocStatus.PROCESSING, + "updated_at": datetime.now().isoformat(), "content": status_doc.content, "content_summary": status_doc.content_summary, "content_length": status_doc.content_length, "created_at": status_doc.created_at, - "updated_at": datetime.now().isoformat(), } } ) - continue - logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") + # Generate chunks from document + chunks: dict[str, Any] = { + compute_mdhash_id(dp["content"], prefix="chunk-"): { + **dp, + "full_doc_id": doc_id, + } + for dp in self.chunking_func( + status_doc.content, + split_by_character, + split_by_character_only, + self.chunk_overlap_token_size, + self.chunk_token_size, + self.tiktoken_model_name, + ) + } + + # Process document (text chunks and full docs) in parallel + tasks = [ + self.chunks_vdb.upsert(chunks), + self._process_entity_relation_graph(chunks), + self.full_docs.upsert( + {doc_id: {"content": status_doc.content}} + ), + self.text_chunks.upsert(chunks), + self.doc_status.upsert( + { + doc_status_id: { + "status": DocStatus.PROCESSED, + "chunks_count": len(chunks), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + } + } + ), + ] + try: + await asyncio.gather(*tasks) + + except Exception as e: + logger.error(f"Failed to process document {doc_id}: {str(e)}") + await self.doc_status.upsert( + { + doc_status_id: { + "status": DocStatus.FAILED, + "error": str(e), + "content": status_doc.content, + "content_summary": status_doc.content_summary, + "content_length": status_doc.content_length, + "created_at": status_doc.created_at, + "updated_at": datetime.now().isoformat(), + } + } + ) + continue + logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.") + + batches.append(batch(batch_idx, docs_batch, len(docs_batches))) + + await asyncio.gather(*batches) + await self._insert_done() async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: try: