Merge branch 'main' into graph-viewer-webui

This commit is contained in:
ArnoChen
2025-02-10 13:56:25 +08:00
12 changed files with 344 additions and 225 deletions

View File

@@ -1,28 +1,10 @@
import asyncio
import os
from tqdm.asyncio import tqdm as tqdm_async
from dataclasses import asdict, dataclass, field
from datetime import datetime
from functools import partial
from typing import Any, Callable, Coroutine, Optional, Type, Union, cast
from .operate import (
chunking_by_token_size,
extract_entities,
extract_keywords_only,
kg_query,
kg_query_with_keywords,
mix_kg_vector_query,
naive_query,
)
from typing import Any, Callable, Optional, Type, Union, cast
from .utils import (
EmbeddingFunc,
compute_mdhash_id,
limit_async_func_call,
convert_response_to_json,
logger,
set_logger,
)
from .base import (
BaseGraphStorage,
BaseKVStorage,
@@ -33,10 +15,25 @@ from .base import (
QueryParam,
StorageNameSpace,
)
from .namespace import NameSpace, make_namespace
from .operate import (
chunking_by_token_size,
extract_entities,
extract_keywords_only,
kg_query,
kg_query_with_keywords,
mix_kg_vector_query,
naive_query,
)
from .prompt import GRAPH_FIELD_SEP
from .utils import (
EmbeddingFunc,
compute_mdhash_id,
convert_response_to_json,
limit_async_func_call,
logger,
set_logger,
)
STORAGES = {
"NetworkXStorage": ".kg.networkx_impl",
@@ -62,12 +59,12 @@ STORAGES = {
"GremlinStorage": ".kg.gremlin_impl",
"PGDocStatusStorage": ".kg.postgres_impl",
"FaissVectorDBStorage": ".kg.faiss_impl",
"QdrantVectorDBStorage": ".kg.qdrant_impl",
}
def lazy_external_import(module_name: str, class_name: str):
"""Lazily import a class from an external module based on the package of the caller."""
# Get the caller's module and package
import inspect
@@ -113,7 +110,7 @@ def always_get_an_event_loop() -> asyncio.AbstractEventLoop:
@dataclass
class LightRAG:
working_dir: str = field(
default_factory=lambda: f"./lightrag_cache_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
default_factory=lambda: f'./lightrag_cache_{datetime.now().strftime("%Y-%m-%d-%H:%M:%S")}'
)
# Default not to use embedding cache
embedding_cache_config: dict = field(
@@ -412,7 +409,7 @@ class LightRAG:
doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-")
new_docs = {doc_key: {"content": full_text.strip()}}
_add_doc_keys = await self.full_docs.filter_keys([doc_key])
_add_doc_keys = await self.full_docs.filter_keys(set(doc_key))
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
if not len(new_docs):
logger.warning("This document is already in the storage.")
@@ -421,7 +418,7 @@ class LightRAG:
update_storage = True
logger.info(f"[New Docs] inserting {len(new_docs)} docs")
inserting_chunks = {}
inserting_chunks: dict[str, Any] = {}
for chunk_text in text_chunks:
chunk_text_stripped = chunk_text.strip()
chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-")
@@ -431,37 +428,22 @@ class LightRAG:
"full_doc_id": doc_key,
}
_add_chunk_keys = await self.text_chunks.filter_keys(
list(inserting_chunks.keys())
)
doc_ids = set(inserting_chunks.keys())
add_chunk_keys = await self.text_chunks.filter_keys(doc_ids)
inserting_chunks = {
k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys
k: v for k, v in inserting_chunks.items() if k in add_chunk_keys
}
if not len(inserting_chunks):
logger.warning("All chunks are already in the storage.")
return
logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
await self.chunks_vdb.upsert(inserting_chunks)
logger.info("[Entity Extraction]...")
maybe_new_kg = await extract_entities(
inserting_chunks,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entity_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
global_config=asdict(self),
)
if maybe_new_kg is None:
logger.warning("No new entities and relationships found")
return
else:
self.chunk_entity_relation_graph = maybe_new_kg
await self.full_docs.upsert(new_docs)
await self.text_chunks.upsert(inserting_chunks)
tasks = [
self.chunks_vdb.upsert(inserting_chunks),
self._process_entity_relation_graph(inserting_chunks),
self.full_docs.upsert(new_docs),
self.text_chunks.upsert(inserting_chunks),
]
await asyncio.gather(*tasks)
finally:
if update_storage:
@@ -496,15 +478,12 @@ class LightRAG:
}
# 3. Filter out already processed documents
add_doc_keys: set[str] = set()
# Get docs ids
in_process_keys = list(new_docs.keys())
# Get in progress docs ids
excluded_ids = await self.doc_status.get_by_ids(in_process_keys)
# Exclude already in process
add_doc_keys = new_docs.keys() - excluded_ids
# Filter
new_docs = {k: v for k, v in new_docs.items() if k in add_doc_keys}
all_new_doc_ids = set(new_docs.keys())
# Exclude IDs of documents that are already in progress
unique_new_doc_ids = await self.doc_status.filter_keys(all_new_doc_ids)
# Filter new_docs to only include documents with unique IDs
new_docs = {doc_id: new_docs[doc_id] for doc_id in unique_new_doc_ids}
if not new_docs:
logger.info("All documents have been processed or are duplicates")
@@ -535,47 +514,32 @@ class LightRAG:
# Fetch failed documents
failed_docs = await self.doc_status.get_failed_docs()
to_process_docs.update(failed_docs)
pending_docs = await self.doc_status.get_pending_docs()
to_process_docs.update(pending_docs)
pendings_docs = await self.doc_status.get_pending_docs()
to_process_docs.update(pendings_docs)
if not to_process_docs:
logger.info("All documents have been processed or are duplicates")
return
to_process_docs_ids = list(to_process_docs.keys())
# Get allready processed documents (text chunks and full docs)
text_chunks_processed_doc_ids = await self.text_chunks.filter_keys(
to_process_docs_ids
)
full_docs_processed_doc_ids = await self.full_docs.filter_keys(
to_process_docs_ids
)
# 2. split docs into chunks, insert chunks, update doc status
batch_size = self.addon_params.get("insert_batch_size", 10)
batch_docs_list = [
docs_batches = [
list(to_process_docs.items())[i : i + batch_size]
for i in range(0, len(to_process_docs), batch_size)
]
logger.info(f"Number of batches to process: {len(docs_batches)}.")
# 3. iterate over batches
tasks: dict[str, list[Coroutine[Any, Any, None]]] = {}
for batch_idx, ids_doc_processing_status in tqdm_async(
enumerate(batch_docs_list),
desc="Process Batches",
):
for batch_idx, docs_batch in enumerate(docs_batches):
# 4. iterate over batch
for id_doc_processing_status in tqdm_async(
ids_doc_processing_status,
desc=f"Process Batch {batch_idx}",
):
id_doc, status_doc = id_doc_processing_status
for doc_id_processing_status in docs_batch:
doc_id, status_doc = doc_id_processing_status
# Update status in processing
doc_status_id = compute_mdhash_id(status_doc.content, prefix="doc-")
await self.doc_status.upsert(
{
id_doc: {
doc_status_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"content_summary": status_doc.content_summary,
@@ -588,7 +552,7 @@ class LightRAG:
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": id_doc_processing_status,
"full_doc_id": doc_id,
}
for dp in self.chunking_func(
status_doc.content,
@@ -600,50 +564,39 @@ class LightRAG:
)
}
# Ensure chunk insertion and graph processing happen sequentially, not in parallel
await self.chunks_vdb.upsert(chunks)
await self._process_entity_relation_graph(chunks)
tasks[id_doc] = []
# Check if document already processed the doc
if id_doc not in full_docs_processed_doc_ids:
tasks[id_doc].append(
self.full_docs.upsert({id_doc: {"content": status_doc.content}})
)
# Check if chunks already processed the doc
if id_doc not in text_chunks_processed_doc_ids:
tasks[id_doc].append(self.text_chunks.upsert(chunks))
# Process document (text chunks and full docs) in parallel
for id_doc_processing_status, task in tasks.items():
try:
await asyncio.gather(*task)
await self.doc_status.upsert(
{
id_doc_processing_status: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"updated_at": datetime.now().isoformat(),
}
tasks = [
self.chunks_vdb.upsert(chunks),
self._process_entity_relation_graph(chunks),
self.full_docs.upsert({doc_id: {"content": status_doc.content}}),
self.text_chunks.upsert(chunks),
]
try:
await asyncio.gather(*tasks)
await self.doc_status.upsert(
{
doc_status_id: {
"status": DocStatus.PROCESSED,
"chunks_count": len(chunks),
"updated_at": datetime.now().isoformat(),
}
)
await self._insert_done()
}
)
await self._insert_done()
except Exception as e:
logger.error(
f"Failed to process document {id_doc_processing_status}: {str(e)}"
)
await self.doc_status.upsert(
{
id_doc_processing_status: {
"status": DocStatus.FAILED,
"error": str(e),
"updated_at": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Failed to process document {doc_id}: {str(e)}")
await self.doc_status.upsert(
{
doc_status_id: {
"status": DocStatus.FAILED,
"error": str(e),
"updated_at": datetime.now().isoformat(),
}
)
continue
}
)
continue
logger.info(f"Completed batch {batch_idx + 1} of {len(docs_batches)}.")
async def _process_entity_relation_graph(self, chunk: dict[str, Any]) -> None:
try:
@@ -656,8 +609,9 @@ class LightRAG:
global_config=asdict(self),
)
if new_kg is None:
logger.info("No entities or relationships extracted!")
logger.info("No new entities or relationships extracted.")
else:
logger.info("New entities or relationships extracted.")
self.chunk_entity_relation_graph = new_kg
except Exception as e:
@@ -895,7 +849,6 @@ class LightRAG:
1. Extract keywords from the 'query' using new function in operate.py.
2. Then run the standard aquery() flow with the final prompt (formatted_question).
"""
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.aquery_with_separate_keyword_extraction(query, prompt, param)
@@ -908,7 +861,6 @@ class LightRAG:
1. Calls extract_keywords_only to get HL/LL keywords from 'query'.
2. Then calls kg_query(...) or naive_query(...), etc. as the main query, while also injecting the newly extracted keywords if needed.
"""
# ---------------------
# STEP 1: Keyword Extraction
# ---------------------