Improved document enqueue logic with existence checks.
- Added return status to `apipeline_enqueue_documents` - Enhanced logging for duplicate documents
This commit is contained in:
@@ -1381,15 +1381,15 @@ def create_app(args):
|
|||||||
|
|
||||||
# Insert into the RAG queue
|
# Insert into the RAG queue
|
||||||
if content:
|
if content:
|
||||||
await rag.apipeline_enqueue_documents(content)
|
has_new_docs = await rag.apipeline_enqueue_documents(content)
|
||||||
logging.info(
|
if has_new_docs:
|
||||||
f"Successfully processed and enqueued file: {file_path.name}"
|
logging.info(f"Successfully processed and enqueued file: {file_path.name}")
|
||||||
)
|
else:
|
||||||
|
logging.info(f"File content already exists, skipping: {file_path.name}")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logging.error(
|
logging.error(f"No content could be extracted from file: {file_path.name}")
|
||||||
f"No content could be extracted from file: {file_path.name}"
|
return False
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(
|
logging.error(
|
||||||
|
@@ -653,7 +653,7 @@ class LightRAG:
|
|||||||
if update_storage:
|
if update_storage:
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
async def apipeline_enqueue_documents(self, input: str | list[str]):
|
async def apipeline_enqueue_documents(self, input: str | list[str]) -> bool:
|
||||||
"""
|
"""
|
||||||
Pipeline for Processing Documents
|
Pipeline for Processing Documents
|
||||||
|
|
||||||
@@ -691,11 +691,12 @@ class LightRAG:
|
|||||||
|
|
||||||
if not new_docs:
|
if not new_docs:
|
||||||
logger.info("No new unique documents were found.")
|
logger.info("No new unique documents were found.")
|
||||||
return
|
return False
|
||||||
|
|
||||||
# 4. Store status document
|
# 4. Store status document
|
||||||
await self.doc_status.upsert(new_docs)
|
await self.doc_status.upsert(new_docs)
|
||||||
logger.info(f"Stored {len(new_docs)} new unique documents")
|
logger.info(f"Stored {len(new_docs)} new unique documents")
|
||||||
|
return True
|
||||||
|
|
||||||
async def apipeline_process_enqueue_documents(
|
async def apipeline_process_enqueue_documents(
|
||||||
self,
|
self,
|
||||||
|
Reference in New Issue
Block a user