Merge branch 'main' into fix--postgres-impl

This commit is contained in:
zrguo
2025-03-17 15:42:09 +08:00
committed by GitHub
51 changed files with 1584 additions and 787 deletions

View File

@@ -224,7 +224,7 @@ LightRAG supports binding to various LLM/Embedding backends:
Use environment variables `LLM_BINDING` or CLI argument `--llm-binding` to select LLM backend type. Use environment variables `EMBEDDING_BINDING` or CLI argument `--embedding-binding` to select LLM backend type.
### Entity Extraction Configuration
* ENABLE_LLM_CACHE_FOR_EXTRACT: Enable LLM cache for entity extraction (default: false)
* ENABLE_LLM_CACHE_FOR_EXTRACT: Enable LLM cache for entity extraction (default: true)
It's very common to set `ENABLE_LLM_CACHE_FOR_EXTRACT` to true for test environment to reduce the cost of LLM calls.

View File

@@ -141,7 +141,7 @@ Start the LightRAG server using specified options:
lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
```
Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
Replace the `port` number with your desired port number (default is 9621) and `your-secret-key` with a secure key.
## Conclusion

View File

@@ -391,12 +391,24 @@ def create_app(args):
"update_status": update_status,
}
# Custom StaticFiles class to prevent caching of HTML files
class NoCacheStaticFiles(StaticFiles):
async def get_response(self, path: str, scope):
response = await super().get_response(path, scope)
if path.endswith(".html"):
response.headers["Cache-Control"] = (
"no-cache, no-store, must-revalidate"
)
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response
# Webui mount webui/index.html
static_dir = Path(__file__).parent / "webui"
static_dir.mkdir(exist_ok=True)
app.mount(
"/webui",
StaticFiles(directory=static_dir, html=True, check_dir=True),
NoCacheStaticFiles(directory=static_dir, html=True, check_dir=True),
name="webui",
)

View File

@@ -3,6 +3,7 @@ ascii_colors
asyncpg
distro
fastapi
graspologic>=3.4.1
httpcore
httpx
jiter

View File

@@ -364,7 +364,7 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace:
# Inject LLM cache configuration
args.enable_llm_cache_for_extract = get_env_value(
"ENABLE_LLM_CACHE_FOR_EXTRACT", False, bool
"ENABLE_LLM_CACHE_FOR_EXTRACT", True, bool
)
# Select Document loading tool (DOCLING, DEFAULT)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -2,11 +2,14 @@
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate" />
<meta http-equiv="Pragma" content="no-cache" />
<meta http-equiv="Expires" content="0" />
<link rel="icon" type="image/svg+xml" href="./logo.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="./assets/index-BlVvSIic.js"></script>
<link rel="stylesheet" crossorigin href="./assets/index-CH-3l4_Z.css">
<script type="module" crossorigin src="./assets/index-DwcJE583.js"></script>
<link rel="stylesheet" crossorigin href="./assets/index-BV5s8k-a.css">
</head>
<body>
<div id="root"></div>

View File

@@ -432,19 +432,31 @@ class PGVectorStorage(BaseVectorStorage):
def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_entity"]
source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id:
chunk_ids = source_id.split("<SEP>")
else:
chunk_ids = [source_id]
data: dict[str, Any] = {
"workspace": self.db.workspace,
"id": item["__id__"],
"entity_name": item["entity_name"],
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_id": item["source_id"],
"chunk_ids": chunk_ids,
# TODO: add document_id
}
return upsert_sql, data
def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]:
upsert_sql = SQL_TEMPLATES["upsert_relationship"]
source_id = item["source_id"]
if isinstance(source_id, str) and "<SEP>" in source_id:
chunk_ids = source_id.split("<SEP>")
else:
chunk_ids = [source_id]
data: dict[str, Any] = {
"workspace": self.db.workspace,
"id": item["__id__"],
@@ -452,7 +464,7 @@ class PGVectorStorage(BaseVectorStorage):
"target_id": item["tgt_id"],
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_id": item["source_id"],
"chunk_ids": chunk_ids,
# TODO: add document_id
}
return upsert_sql, data
@@ -755,7 +767,7 @@ class PGDocStatusStorage(DocStatusStorage):
result = await self.db.query(sql, params, True)
docs_by_status = {
element["id"]: DocProcessingStatus(
content=result[0]["content"],
content=element["content"],
content_summary=element["content_summary"],
content_length=element["content_length"],
status=element["status"],
@@ -1531,7 +1543,7 @@ TABLES = {
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_id VARCHAR(255) NULL,
chunk_id TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -1545,7 +1557,7 @@ TABLES = {
content_vector VECTOR,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_id VARCHAR(255) NULL,
chunk_id TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -1629,22 +1641,25 @@ SQL_TEMPLATES = {
update_time = CURRENT_TIMESTAMP
""",
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
content_vector, chunk_id)
VALUES ($1, $2, $3, $4, $5, $6)
content_vector, chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6::varchar[])
ON CONFLICT (workspace,id) DO UPDATE
SET entity_name=EXCLUDED.entity_name,
content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
update_time=CURRENT_TIMESTAMP
""",
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
target_id, content, content_vector, chunk_id)
VALUES ($1, $2, $3, $4, $5, $6, $7)
target_id, content, content_vector, chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[])
ON CONFLICT (workspace,id) DO UPDATE
SET source_id=EXCLUDED.source_id,
target_id=EXCLUDED.target_id,
content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector, update_time = CURRENT_TIMESTAMP
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
update_time = CURRENT_TIMESTAMP
""",
# SQL for VectorStorage
# "entities": """SELECT entity_name FROM
@@ -1695,8 +1710,8 @@ SQL_TEMPLATES = {
FROM (
SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_RELATION r
JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids)
WHERE r.workspace=$1
AND r.chunk_id IN (SELECT chunk_id FROM relevant_chunks)
) filtered
WHERE distance>$2
ORDER BY distance DESC
@@ -1710,10 +1725,10 @@ SQL_TEMPLATES = {
)
SELECT entity_name FROM
(
SELECT id, entity_name, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_ENTITY
where workspace=$1
AND chunk_id IN (SELECT chunk_id FROM relevant_chunks)
SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance
FROM LIGHTRAG_VDB_ENTITY e
JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids)
WHERE e.workspace=$1
)
WHERE distance>$2
ORDER BY distance DESC

View File

@@ -214,7 +214,7 @@ class LightRAG:
llm_model_max_token_size: int = field(default=int(os.getenv("MAX_TOKENS", 32768)))
"""Maximum number of tokens allowed per LLM response."""
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 16)))
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
"""Maximum number of concurrent LLM calls."""
llm_model_kwargs: dict[str, Any] = field(default_factory=dict)
@@ -238,7 +238,7 @@ class LightRAG:
# Extensions
# ---
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 20)))
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
"""Maximum number of parallel insert operations."""
addon_params: dict[str, Any] = field(
@@ -553,6 +553,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
@@ -574,6 +575,7 @@ class LightRAG:
Args:
input: Single document string or list of document strings
split_by_character: if split_by_character is not None, split the string by character, if chunk longer than
chunk_token_size, it will be split again by token size.
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
@@ -767,7 +769,6 @@ class LightRAG:
async with pipeline_status_lock:
# Ensure only one worker is processing documents
if not pipeline_status.get("busy", False):
# 先检查是否有需要处理的文档
processing_docs, failed_docs, pending_docs = await asyncio.gather(
self.doc_status.get_docs_by_status(DocStatus.PROCESSING),
self.doc_status.get_docs_by_status(DocStatus.FAILED),
@@ -779,12 +780,10 @@ class LightRAG:
to_process_docs.update(failed_docs)
to_process_docs.update(pending_docs)
# 如果没有需要处理的文档,直接返回,保留 pipeline_status 中的内容不变
if not to_process_docs:
logger.info("No documents to process")
return
# 有文档需要处理,更新 pipeline_status
pipeline_status.update(
{
"busy": True,
@@ -823,7 +822,7 @@ class LightRAG:
for i in range(0, len(to_process_docs), self.max_parallel_insert)
]
log_message = f"Number of batches to process: {len(docs_batches)}."
log_message = f"Processing {len(to_process_docs)} document(s) in {len(docs_batches)} batches"
logger.info(log_message)
# Update pipeline status with current batch information
@@ -832,140 +831,149 @@ class LightRAG:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches: list[Any] = []
# 3. iterate over batches
for batch_idx, docs_batch in enumerate(docs_batches):
# Update current batch in pipeline status (directly, as it's atomic)
pipeline_status["cur_batch"] += 1
async def batch(
batch_idx: int,
docs_batch: list[tuple[str, DocProcessingStatus]],
size_batch: int,
) -> None:
log_message = (
f"Start processing batch {batch_idx + 1} of {size_batch}."
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# 4. iterate over batch
for doc_id_processing_status in docs_batch:
doc_id, status_doc = doc_id_processing_status
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
}
for dp in self.chunking_func(
status_doc.content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
self.tiktoken_model_name,
)
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
split_by_character: str | None,
split_by_character_only: bool,
pipeline_status: dict,
pipeline_status_lock: asyncio.Lock,
) -> None:
"""Process single document"""
try:
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
}
# Process document (text chunks and full docs) in parallel
# Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
}
for dp in self.chunking_func(
status_doc.content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
self.tiktoken_model_name,
)
}
# Process document (text chunks and full docs) in parallel
# Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
}
)
}
)
chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks)
)
chunks_vdb_task = asyncio.create_task(
self.chunks_vdb.upsert(chunks)
)
entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(
chunks, pipeline_status, pipeline_status_lock
)
entity_relation_task = asyncio.create_task(
self._process_entity_relation_graph(
chunks, pipeline_status, pipeline_status_lock
)
)
full_docs_task = asyncio.create_task(
self.full_docs.upsert(
{doc_id: {"content": status_doc.content}}
)
full_docs_task = asyncio.create_task(
self.full_docs.upsert(
{doc_id: {"content": status_doc.content}}
)
)
text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks)
)
tasks = [
doc_status_task,
)
text_chunks_task = asyncio.create_task(
self.text_chunks.upsert(chunks)
)
tasks = [
doc_status_task,
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]
await asyncio.gather(*tasks)
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
except Exception as e:
# Log error and update pipeline status
error_msg = f"Failed to process document {doc_id}: {str(e)}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
# Cancel other tasks as they are no longer meaningful
for task in [
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]
try:
await asyncio.gather(*tasks)
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
except Exception as e:
# Log error and update pipeline status
error_msg = (
f"Failed to process document {doc_id}: {str(e)}"
)
logger.error(error_msg)
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(error_msg)
# Cancel other tasks as they are no longer meaningful
for task in [
chunks_vdb_task,
entity_relation_task,
full_docs_task,
text_chunks_task,
]:
if not task.done():
task.cancel()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
continue
log_message = (
f"Completed batch {batch_idx + 1} of {len(docs_batches)}."
]:
if not task.done():
task.cancel()
# Update document status to failed
await self.doc_status.upsert(
{
doc_id: {
"status": DocStatus.FAILED,
"error": str(e),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
}
}
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
batches.append(batch(batch_idx, docs_batch, len(docs_batches)))
# 3. iterate over batches
total_batches = len(docs_batches)
for batch_idx, docs_batch in enumerate(docs_batches):
current_batch = batch_idx + 1
log_message = (
f"Start processing batch {current_batch} of {total_batches}."
)
logger.info(log_message)
pipeline_status["cur_batch"] = current_batch
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
await asyncio.gather(*batches)
await self._insert_done()
doc_tasks = []
for doc_id, status_doc in docs_batch:
doc_tasks.append(
process_document(
doc_id,
status_doc,
split_by_character,
split_by_character_only,
pipeline_status,
pipeline_status_lock,
)
)
# Process documents in one batch parallelly
await asyncio.gather(*doc_tasks)
await self._insert_done()
log_message = f"Completed batch {current_batch} of {total_batches}."
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@@ -1040,7 +1048,7 @@ class LightRAG:
]
await asyncio.gather(*tasks)
log_message = "All Insert done"
log_message = "In memory DB persist to disk"
logger.info(log_message)
if pipeline_status is not None and pipeline_status_lock is not None: