From bf89dc18b7eed06cc7bdef3aff30f4e982c6d6cf Mon Sep 17 00:00:00 2001 From: Yannick Stephan Date: Sun, 9 Feb 2025 13:03:50 +0100 Subject: [PATCH] fixed the processed --- lightrag/lightrag.py | 113 ++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 45 deletions(-) diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 2fbba5ac..e6a0f50f 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -497,11 +497,8 @@ class LightRAG: # 3. Filter out already processed documents add_doc_keys: set[str] = set() - for doc_id in new_docs.keys(): - current_doc = await self.doc_status.get_by_id(doc_id) - if not current_doc or current_doc["status"] == DocStatus.FAILED: - add_doc_keys.add(doc_id) - + excluded_ids = await self.doc_status.all_keys() + add_doc_keys = new_docs.keys() - excluded_ids new_docs = {k: v for k, v in new_docs.items() if k in add_doc_keys} if not new_docs: @@ -509,7 +506,7 @@ class LightRAG: return # 4. Store original document - await self.full_docs.upsert(new_docs) + await self.doc_status.upsert(new_docs) logger.info(f"Stored {len(new_docs)} new unique documents") async def apipeline_process_chunks( @@ -533,21 +530,22 @@ class LightRAG: to_process_doc_keys: list[str] = [] # Process failes - to_process_docs = await self.full_docs.get_by_status(status=DocStatus.FAILED) + to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) if to_process_docs: to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) # Process Pending - to_process_docs = await self.full_docs.get_by_status(status=DocStatus.PENDING) + to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) if to_process_docs: to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) if not to_process_doc_keys: logger.info("All documents have been processed or are duplicates") return - - full_docs_ids = await self.full_docs.get_by_ids(to_process_doc_keys) - new_docs = {doc["id"]: doc for doc in full_docs_ids or []} + + # If included in text_chunks is all processed, return + new_docs_ids = await self.text_chunks.filter_keys(to_process_doc_keys) + new_docs = await self.doc_status.get_by_ids(list(new_docs_ids)) if not new_docs: logger.info("All documents have been processed or are duplicates") @@ -555,12 +553,10 @@ class LightRAG: # 2. split docs into chunks, insert chunks, update doc status batch_size = self.addon_params.get("insert_batch_size", 10) - for i in range(0, len(new_docs), batch_size): - batch_docs = dict(list(new_docs.items())[i : i + batch_size]) - - for doc_id, doc in tqdm_async( - batch_docs.items(), desc=f"Processing batch {i // batch_size + 1}" - ): + batch_docs_list = [new_docs[i:i+batch_size] for i in range(0, len(new_docs), batch_size)] + for i, el in enumerate(batch_docs_list): + items = ((k, v) for d in el for k, v in d.items()) + for doc_id, doc in tqdm_async(items, desc=f"Level 1 - Spliting doc in batch {i // len(batch_docs_list) + 1}"): doc_status: dict[str, Any] = { "content_summary": doc["content_summary"], "content_length": doc["content_length"], @@ -570,7 +566,7 @@ class LightRAG: } try: await self.doc_status.upsert({doc_id: doc_status}) - + # Generate chunks from document chunks: dict[str, Any] = { compute_mdhash_id(dp["content"], prefix="chunk-"): { @@ -588,16 +584,21 @@ class LightRAG: } # Update status with chunks information + + await self._process_entity_relation_graph(chunks) + await self.chunks_vdb.upsert(chunks) + await self.text_chunks.upsert(chunks) doc_status.update( { + "status": DocStatus.PROCESSED, "chunks_count": len(chunks), "updated_at": datetime.now().isoformat(), } - ) - await self.chunks_vdb.upsert(chunks) + ) await self.doc_status.upsert({doc_id: doc_status}) except Exception as e: + # Update status with failed information doc_status.update( { "status": DocStatus.FAILED, @@ -611,6 +612,26 @@ class LightRAG: ) continue + + async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None: + try: + new_kg = await extract_entities( + chunk, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + ) + if new_kg is None: + logger.info("No entities or relationships extracted!") + else: + self.chunk_entity_relation_graph = new_kg + + except Exception as e: + logger.error("Failed to extract entities and relationships") + raise e + async def apipeline_process_extract_graph(self): """ Process pending or failed chunks to extract entities and relationships. @@ -633,12 +654,12 @@ class LightRAG: to_process_doc_keys: list[str] = [] # Process failes - to_process_docs = await self.full_docs.get_by_status(status=DocStatus.FAILED) + to_process_docs = await self.doc_status.get_by_status(status=DocStatus.FAILED) if to_process_docs: to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) # Process Pending - to_process_docs = await self.full_docs.get_by_status(status=DocStatus.PENDING) + to_process_docs = await self.doc_status.get_by_status(status=DocStatus.PENDING) if to_process_docs: to_process_doc_keys.extend([doc["id"] for doc in to_process_docs]) @@ -658,29 +679,31 @@ class LightRAG: chunks: dict[str, Any] = { i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id]) } - # Extract and store entities and relationships - try: - maybe_new_kg = await extract_entities( - chunks, - knowledge_graph_inst=self.chunk_entity_relation_graph, - entity_vdb=self.entities_vdb, - relationships_vdb=self.relationships_vdb, - llm_response_cache=self.llm_response_cache, - global_config=asdict(self), - ) - if maybe_new_kg is None: - logger.warning("No entities or relationships extracted!") - # Update status to processed - await self.text_chunks.upsert( - {chunk_id: {"status": DocStatus.PROCESSED}} - ) - except Exception as e: - logger.error("Failed to extract entities and relationships") - # Mark as failed if any step fails - await self.text_chunks.upsert( - {chunk_id: {"status": DocStatus.FAILED}} - ) - raise e + async def _process_chunk(chunk_id: str): + chunks: dict[str, Any] = { + i["id"]: i for i in await self.text_chunks.get_by_ids([chunk_id]) + } + + # Extract and store entities and relationships + try: + maybe_new_kg = await extract_entities( + chunks, + knowledge_graph_inst=self.chunk_entity_relation_graph, + entity_vdb=self.entities_vdb, + relationships_vdb=self.relationships_vdb, + llm_response_cache=self.llm_response_cache, + global_config=asdict(self), + ) + if maybe_new_kg is None: + logger.warning("No entities or relationships extracted!") + # Update status to processed + await self.text_chunks.upsert(chunks) + await self.doc_status.upsert({chunk_id: {"status": DocStatus.PROCESSED}}) + except Exception as e: + logger.error("Failed to extract entities and relationships") + # Mark as failed if any step fails + await self.doc_status.upsert({chunk_id: {"status": DocStatus.FAILED}}) + raise e with tqdm_async( total=len(to_process_doc_keys),