Merge pull request #892 from PiochU19/main
add support of providing ids for documents insert
This commit is contained in:
14
README.md
14
README.md
@@ -545,6 +545,20 @@ The `insert_batch_size` parameter in `addon_params` controls how many documents
|
|||||||
|
|
||||||
</details>
|
</details>
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary> <b> Insert with ID </b></summary>
|
||||||
|
|
||||||
|
If you want to provide your own IDs for your documents, number of documents and number of IDs must be the same.
|
||||||
|
|
||||||
|
```python
|
||||||
|
# Insert single text, and provide ID for it
|
||||||
|
rag.insert("TEXT1", ids=["ID_FOR_TEXT1"])
|
||||||
|
|
||||||
|
# Insert multiple texts, and provide IDs for them
|
||||||
|
rag.insert(["TEXT1", "TEXT2",...], ids=["ID_FOR_TEXT1", "ID_FOR_TEXT2"])
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
<details>
|
<details>
|
||||||
<summary><b>Incremental Insert</b></summary>
|
<summary><b>Incremental Insert</b></summary>
|
||||||
|
@@ -1,8 +1,8 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
|
||||||
import configparser
|
import configparser
|
||||||
|
import os
|
||||||
from dataclasses import asdict, dataclass, field
|
from dataclasses import asdict, dataclass, field
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
@@ -41,11 +41,11 @@ from .utils import (
|
|||||||
always_get_an_event_loop,
|
always_get_an_event_loop,
|
||||||
compute_mdhash_id,
|
compute_mdhash_id,
|
||||||
convert_response_to_json,
|
convert_response_to_json,
|
||||||
|
encode_string_by_tiktoken,
|
||||||
lazy_external_import,
|
lazy_external_import,
|
||||||
limit_async_func_call,
|
limit_async_func_call,
|
||||||
logger,
|
logger,
|
||||||
set_logger,
|
set_logger,
|
||||||
encode_string_by_tiktoken,
|
|
||||||
)
|
)
|
||||||
from .types import KnowledgeGraph
|
from .types import KnowledgeGraph
|
||||||
|
|
||||||
@@ -479,6 +479,7 @@ class LightRAG:
|
|||||||
input: str | list[str],
|
input: str | list[str],
|
||||||
split_by_character: str | None = None,
|
split_by_character: str | None = None,
|
||||||
split_by_character_only: bool = False,
|
split_by_character_only: bool = False,
|
||||||
|
ids: list[str] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Sync Insert documents with checkpoint support
|
"""Sync Insert documents with checkpoint support
|
||||||
|
|
||||||
@@ -487,10 +488,11 @@ class LightRAG:
|
|||||||
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
|
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
|
||||||
split_by_character_only: if split_by_character_only is True, split the string by character only, when
|
split_by_character_only: if split_by_character_only is True, split the string by character only, when
|
||||||
split_by_character is None, this parameter is ignored.
|
split_by_character is None, this parameter is ignored.
|
||||||
|
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
|
||||||
"""
|
"""
|
||||||
loop = always_get_an_event_loop()
|
loop = always_get_an_event_loop()
|
||||||
loop.run_until_complete(
|
loop.run_until_complete(
|
||||||
self.ainsert(input, split_by_character, split_by_character_only)
|
self.ainsert(input, split_by_character, split_by_character_only, ids)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def ainsert(
|
async def ainsert(
|
||||||
@@ -498,6 +500,7 @@ class LightRAG:
|
|||||||
input: str | list[str],
|
input: str | list[str],
|
||||||
split_by_character: str | None = None,
|
split_by_character: str | None = None,
|
||||||
split_by_character_only: bool = False,
|
split_by_character_only: bool = False,
|
||||||
|
ids: list[str] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Async Insert documents with checkpoint support
|
"""Async Insert documents with checkpoint support
|
||||||
|
|
||||||
@@ -506,8 +509,9 @@ class LightRAG:
|
|||||||
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
|
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
|
||||||
split_by_character_only: if split_by_character_only is True, split the string by character only, when
|
split_by_character_only: if split_by_character_only is True, split the string by character only, when
|
||||||
split_by_character is None, this parameter is ignored.
|
split_by_character is None, this parameter is ignored.
|
||||||
|
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
|
||||||
"""
|
"""
|
||||||
await self.apipeline_enqueue_documents(input)
|
await self.apipeline_enqueue_documents(input, ids)
|
||||||
await self.apipeline_process_enqueue_documents(
|
await self.apipeline_process_enqueue_documents(
|
||||||
split_by_character, split_by_character_only
|
split_by_character, split_by_character_only
|
||||||
)
|
)
|
||||||
@@ -564,24 +568,51 @@ class LightRAG:
|
|||||||
if update_storage:
|
if update_storage:
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
async def apipeline_enqueue_documents(self, input: str | list[str]) -> None:
|
async def apipeline_enqueue_documents(
|
||||||
|
self, input: str | list[str], ids: list[str] | None
|
||||||
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Pipeline for Processing Documents
|
Pipeline for Processing Documents
|
||||||
|
|
||||||
1. Remove duplicate contents from the list
|
1. Validate ids if provided or generate MD5 hash IDs
|
||||||
2. Generate document IDs and initial status
|
2. Remove duplicate contents
|
||||||
3. Filter out already processed documents
|
3. Generate document initial status
|
||||||
4. Enqueue document in status
|
4. Filter out already processed documents
|
||||||
|
5. Enqueue document in status
|
||||||
"""
|
"""
|
||||||
if isinstance(input, str):
|
if isinstance(input, str):
|
||||||
input = [input]
|
input = [input]
|
||||||
|
|
||||||
# 1. Remove duplicate contents from the list
|
# 1. Validate ids if provided or generate MD5 hash IDs
|
||||||
unique_contents = list(set(doc.strip() for doc in input))
|
if ids is not None:
|
||||||
|
# Check if the number of IDs matches the number of documents
|
||||||
|
if len(ids) != len(input):
|
||||||
|
raise ValueError("Number of IDs must match the number of documents")
|
||||||
|
|
||||||
# 2. Generate document IDs and initial status
|
# Check if IDs are unique
|
||||||
|
if len(ids) != len(set(ids)):
|
||||||
|
raise ValueError("IDs must be unique")
|
||||||
|
|
||||||
|
# Generate contents dict of IDs provided by user and documents
|
||||||
|
contents = {id_: doc.strip() for id_, doc in zip(ids, input)}
|
||||||
|
else:
|
||||||
|
# Generate contents dict of MD5 hash IDs and documents
|
||||||
|
contents = {
|
||||||
|
compute_mdhash_id(doc.strip(), prefix="doc-"): doc.strip()
|
||||||
|
for doc in input
|
||||||
|
}
|
||||||
|
|
||||||
|
# 2. Remove duplicate contents
|
||||||
|
unique_contents = {
|
||||||
|
id_: content
|
||||||
|
for content, id_ in {
|
||||||
|
content: id_ for id_, content in contents.items()
|
||||||
|
}.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
# 3. Generate document initial status
|
||||||
new_docs: dict[str, Any] = {
|
new_docs: dict[str, Any] = {
|
||||||
compute_mdhash_id(content, prefix="doc-"): {
|
id_: {
|
||||||
"content": content,
|
"content": content,
|
||||||
"content_summary": self._get_content_summary(content),
|
"content_summary": self._get_content_summary(content),
|
||||||
"content_length": len(content),
|
"content_length": len(content),
|
||||||
@@ -589,10 +620,10 @@ class LightRAG:
|
|||||||
"created_at": datetime.now().isoformat(),
|
"created_at": datetime.now().isoformat(),
|
||||||
"updated_at": datetime.now().isoformat(),
|
"updated_at": datetime.now().isoformat(),
|
||||||
}
|
}
|
||||||
for content in unique_contents
|
for id_, content in unique_contents.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
# 3. Filter out already processed documents
|
# 4. Filter out already processed documents
|
||||||
# Get docs ids
|
# Get docs ids
|
||||||
all_new_doc_ids = set(new_docs.keys())
|
all_new_doc_ids = set(new_docs.keys())
|
||||||
# Exclude IDs of documents that are already in progress
|
# Exclude IDs of documents that are already in progress
|
||||||
@@ -604,7 +635,7 @@ class LightRAG:
|
|||||||
logger.info("No new unique documents were found.")
|
logger.info("No new unique documents were found.")
|
||||||
return
|
return
|
||||||
|
|
||||||
# 4. Store status document
|
# 5. Store status document
|
||||||
await self.doc_status.upsert(new_docs)
|
await self.doc_status.upsert(new_docs)
|
||||||
logger.info(f"Stored {len(new_docs)} new unique documents")
|
logger.info(f"Stored {len(new_docs)} new unique documents")
|
||||||
|
|
||||||
@@ -661,8 +692,6 @@ class LightRAG:
|
|||||||
# 4. iterate over batch
|
# 4. iterate over batch
|
||||||
for doc_id_processing_status in docs_batch:
|
for doc_id_processing_status in docs_batch:
|
||||||
doc_id, status_doc = doc_id_processing_status
|
doc_id, status_doc = doc_id_processing_status
|
||||||
# Update status in processing
|
|
||||||
doc_status_id = compute_mdhash_id(status_doc.content, prefix="doc-")
|
|
||||||
# Generate chunks from document
|
# Generate chunks from document
|
||||||
chunks: dict[str, Any] = {
|
chunks: dict[str, Any] = {
|
||||||
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
compute_mdhash_id(dp["content"], prefix="chunk-"): {
|
||||||
@@ -682,7 +711,7 @@ class LightRAG:
|
|||||||
tasks = [
|
tasks = [
|
||||||
self.doc_status.upsert(
|
self.doc_status.upsert(
|
||||||
{
|
{
|
||||||
doc_status_id: {
|
doc_id: {
|
||||||
"status": DocStatus.PROCESSING,
|
"status": DocStatus.PROCESSING,
|
||||||
"updated_at": datetime.now().isoformat(),
|
"updated_at": datetime.now().isoformat(),
|
||||||
"content": status_doc.content,
|
"content": status_doc.content,
|
||||||
@@ -703,7 +732,7 @@ class LightRAG:
|
|||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
await self.doc_status.upsert(
|
await self.doc_status.upsert(
|
||||||
{
|
{
|
||||||
doc_status_id: {
|
doc_id: {
|
||||||
"status": DocStatus.PROCESSED,
|
"status": DocStatus.PROCESSED,
|
||||||
"chunks_count": len(chunks),
|
"chunks_count": len(chunks),
|
||||||
"content": status_doc.content,
|
"content": status_doc.content,
|
||||||
@@ -718,7 +747,7 @@ class LightRAG:
|
|||||||
logger.error(f"Failed to process document {doc_id}: {str(e)}")
|
logger.error(f"Failed to process document {doc_id}: {str(e)}")
|
||||||
await self.doc_status.upsert(
|
await self.doc_status.upsert(
|
||||||
{
|
{
|
||||||
doc_status_id: {
|
doc_id: {
|
||||||
"status": DocStatus.FAILED,
|
"status": DocStatus.FAILED,
|
||||||
"error": str(e),
|
"error": str(e),
|
||||||
"content": status_doc.content,
|
"content": status_doc.content,
|
||||||
|
Reference in New Issue
Block a user