diff --git a/lightrag/base.py b/lightrag/base.py index c7f77e0b..0a98c2d5 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -165,6 +165,7 @@ class DocStatus(str, Enum): @dataclass class DocProcessingStatus: """Document processing status data structure""" + content: str """Original content of the document""" content_summary: str diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 3bfa2649..79eecef7 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -535,18 +535,22 @@ class LightRAG: # Fetch failed documents failed_docs = await self.doc_status.get_failed_docs() to_process_docs.update(failed_docs) - + pending_docs = await self.doc_status.get_pending_docs() to_process_docs.update(pending_docs) - + if not to_process_docs: logger.info("All documents have been processed or are duplicates") - return + return to_process_docs_ids = list(to_process_docs.keys()) # Get allready processed documents (text chunks and full docs) - text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(to_process_docs_ids) - full_docs_processed_doc_ids = await self.full_docs.filter_keys(to_process_docs_ids) + text_chunks_processed_doc_ids = await self.text_chunks.filter_keys( + to_process_docs_ids + ) + full_docs_processed_doc_ids = await self.full_docs.filter_keys( + to_process_docs_ids + ) # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) @@ -568,7 +572,7 @@ class LightRAG: ): # Update status in processing id_doc, status_doc = id_doc_processing_status - + await self.doc_status.upsert( { id_doc: { @@ -604,9 +608,7 @@ class LightRAG: # Check if document already processed the doc if id_doc not in full_docs_processed_doc_ids: tasks[id_doc].append( - self.full_docs.upsert( - {id_doc: {"content": status_doc.content}} - ) + self.full_docs.upsert({id_doc: {"content": status_doc.content}}) ) # Check if chunks already processed the doc @@ -629,7 +631,9 @@ class LightRAG: await self._insert_done() except Exception as e: - logger.error(f"Failed to process document {id_doc_processing_status}: {str(e)}") + logger.error( + f"Failed to process document {id_doc_processing_status}: {str(e)}" + ) await self.doc_status.upsert( { id_doc_processing_status: {