Merge pull request #1103 from HKUDS/citation

Adding support for handling file paths in document processing.
This commit is contained in:
zrguo
2025-03-18 15:24:55 +08:00
committed by GitHub
5 changed files with 217 additions and 47 deletions

View File

@@ -257,6 +257,8 @@ class DocProcessingStatus:
"""First 100 chars of document content, used for preview"""
content_length: int
"""Total length of document"""
file_path: str
"""File path of the document"""
status: DocStatus
"""Current processing status"""
created_at: str

View File

@@ -423,6 +423,7 @@ class PGVectorStorage(BaseVectorStorage):
"full_doc_id": item["full_doc_id"],
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"file_path": item["file_path"],
}
except Exception as e:
logger.error(f"Error to prepare upsert,\nsql: {e}\nitem: {item}")
@@ -445,6 +446,7 @@ class PGVectorStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids,
"file_path": item["file_path"],
# TODO: add document_id
}
return upsert_sql, data
@@ -465,6 +467,7 @@ class PGVectorStorage(BaseVectorStorage):
"content": item["content"],
"content_vector": json.dumps(item["__vector__"].tolist()),
"chunk_ids": chunk_ids,
"file_path": item["file_path"],
# TODO: add document_id
}
return upsert_sql, data
@@ -740,6 +743,7 @@ class PGDocStatusStorage(DocStatusStorage):
chunks_count=result[0]["chunks_count"],
created_at=result[0]["created_at"],
updated_at=result[0]["updated_at"],
file_path=result[0]["file_path"],
)
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:
@@ -774,6 +778,7 @@ class PGDocStatusStorage(DocStatusStorage):
created_at=element["created_at"],
updated_at=element["updated_at"],
chunks_count=element["chunks_count"],
file_path=element["file_path"],
)
for element in result
}
@@ -793,14 +798,15 @@ class PGDocStatusStorage(DocStatusStorage):
if not data:
return
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status)
values($1,$2,$3,$4,$5,$6,$7)
sql = """insert into LIGHTRAG_DOC_STATUS(workspace,id,content,content_summary,content_length,chunks_count,status,file_path)
values($1,$2,$3,$4,$5,$6,$7,$8)
on conflict(id,workspace) do update set
content = EXCLUDED.content,
content_summary = EXCLUDED.content_summary,
content_length = EXCLUDED.content_length,
chunks_count = EXCLUDED.chunks_count,
status = EXCLUDED.status,
file_path = EXCLUDED.file_path,
updated_at = CURRENT_TIMESTAMP"""
for k, v in data.items():
# chunks_count is optional
@@ -814,6 +820,7 @@ class PGDocStatusStorage(DocStatusStorage):
"content_length": v["content_length"],
"chunks_count": v["chunks_count"] if "chunks_count" in v else -1,
"status": v["status"],
"file_path": v["file_path"],
},
)
@@ -1549,6 +1556,7 @@ TABLES = {
tokens INTEGER,
content TEXT,
content_vector VECTOR,
file_path VARCHAR(256),
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
@@ -1564,6 +1572,7 @@ TABLES = {
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_id TEXT NULL,
file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -1578,6 +1587,7 @@ TABLES = {
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP,
chunk_id TEXT NULL,
file_path TEXT NULL,
CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id)
)"""
},
@@ -1602,6 +1612,7 @@ TABLES = {
content_length int4 NULL,
chunks_count int4 NULL,
status varchar(64) NULL,
file_path TEXT NULL,
created_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
updated_at timestamp DEFAULT CURRENT_TIMESTAMP NULL,
CONSTRAINT LIGHTRAG_DOC_STATUS_PK PRIMARY KEY (workspace, id)
@@ -1650,35 +1661,38 @@ SQL_TEMPLATES = {
update_time = CURRENT_TIMESTAMP
""",
"upsert_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector)
VALUES ($1, $2, $3, $4, $5, $6, $7)
chunk_order_index, full_doc_id, content, content_vector, file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
full_doc_id=EXCLUDED.full_doc_id,
content = EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP
""",
"upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content,
content_vector, chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6::varchar[])
content_vector, chunk_ids, file_path)
VALUES ($1, $2, $3, $4, $5, $6::varchar[], $7::varchar[])
ON CONFLICT (workspace,id) DO UPDATE
SET entity_name=EXCLUDED.entity_name,
content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path,
update_time=CURRENT_TIMESTAMP
""",
"upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id,
target_id, content, content_vector, chunk_ids)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[])
target_id, content, content_vector, chunk_ids, file_path)
VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[], $8::varchar[])
ON CONFLICT (workspace,id) DO UPDATE
SET source_id=EXCLUDED.source_id,
target_id=EXCLUDED.target_id,
content=EXCLUDED.content,
content_vector=EXCLUDED.content_vector,
chunk_ids=EXCLUDED.chunk_ids,
file_path=EXCLUDED.file_path,
update_time = CURRENT_TIMESTAMP
""",
# SQL for VectorStorage

View File

@@ -389,20 +389,21 @@ class LightRAG:
self.namespace_prefix, NameSpace.VECTOR_STORE_ENTITIES
),
embedding_func=self.embedding_func,
meta_fields={"entity_name", "source_id", "content"},
meta_fields={"entity_name", "source_id", "content", "file_path"},
)
self.relationships_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.VECTOR_STORE_RELATIONSHIPS
),
embedding_func=self.embedding_func,
meta_fields={"src_id", "tgt_id", "source_id", "content"},
meta_fields={"src_id", "tgt_id", "source_id", "content", "file_path"},
)
self.chunks_vdb: BaseVectorStorage = self.vector_db_storage_cls( # type: ignore
namespace=make_namespace(
self.namespace_prefix, NameSpace.VECTOR_STORE_CHUNKS
),
embedding_func=self.embedding_func,
meta_fields={"full_doc_id", "content", "file_path"},
)
# Initialize document status storage
@@ -547,6 +548,7 @@ class LightRAG:
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
) -> None:
"""Sync Insert documents with checkpoint support
@@ -557,10 +559,13 @@ class LightRAG:
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: single string of the document ID or list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: single string of the file path or list of file paths, used for citation
"""
loop = always_get_an_event_loop()
loop.run_until_complete(
self.ainsert(input, split_by_character, split_by_character_only, ids)
self.ainsert(
input, split_by_character, split_by_character_only, ids, file_paths
)
)
async def ainsert(
@@ -569,6 +574,7 @@ class LightRAG:
split_by_character: str | None = None,
split_by_character_only: bool = False,
ids: str | list[str] | None = None,
file_paths: str | list[str] | None = None,
) -> None:
"""Async Insert documents with checkpoint support
@@ -579,8 +585,9 @@ class LightRAG:
split_by_character_only: if split_by_character_only is True, split the string by character only, when
split_by_character is None, this parameter is ignored.
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: list of file paths corresponding to each document, used for citation
"""
await self.apipeline_enqueue_documents(input, ids)
await self.apipeline_enqueue_documents(input, ids, file_paths)
await self.apipeline_process_enqueue_documents(
split_by_character, split_by_character_only
)
@@ -654,7 +661,10 @@ class LightRAG:
await self._insert_done()
async def apipeline_enqueue_documents(
self, input: str | list[str], ids: list[str] | None = None
self,
input: str | list[str],
ids: list[str] | None = None,
file_paths: str | list[str] | None = None,
) -> None:
"""
Pipeline for Processing Documents
@@ -664,11 +674,30 @@ class LightRAG:
3. Generate document initial status
4. Filter out already processed documents
5. Enqueue document in status
Args:
input: Single document string or list of document strings
ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated
file_paths: list of file paths corresponding to each document, used for citation
"""
if isinstance(input, str):
input = [input]
if isinstance(ids, str):
ids = [ids]
if isinstance(file_paths, str):
file_paths = [file_paths]
# If file_paths is provided, ensure it matches the number of documents
if file_paths is not None:
if isinstance(file_paths, str):
file_paths = [file_paths]
if len(file_paths) != len(input):
raise ValueError(
"Number of file paths must match the number of documents"
)
else:
# If no file paths provided, use placeholder
file_paths = ["unknown_source"] * len(input)
# 1. Validate ids if provided or generate MD5 hash IDs
if ids is not None:
@@ -681,32 +710,59 @@ class LightRAG:
raise ValueError("IDs must be unique")
# Generate contents dict of IDs provided by user and documents
contents = {id_: doc for id_, doc in zip(ids, input)}
contents = {
id_: {"content": doc, "file_path": path}
for id_, doc, path in zip(ids, input, file_paths)
}
else:
# Clean input text and remove duplicates
input = list(set(clean_text(doc) for doc in input))
# Generate contents dict of MD5 hash IDs and documents
contents = {compute_mdhash_id(doc, prefix="doc-"): doc for doc in input}
cleaned_input = [
(clean_text(doc), path) for doc, path in zip(input, file_paths)
]
unique_content_with_paths = {}
# Keep track of unique content and their paths
for content, path in cleaned_input:
if content not in unique_content_with_paths:
unique_content_with_paths[content] = path
# Generate contents dict of MD5 hash IDs and documents with paths
contents = {
compute_mdhash_id(content, prefix="doc-"): {
"content": content,
"file_path": path,
}
for content, path in unique_content_with_paths.items()
}
# 2. Remove duplicate contents
unique_contents = {
id_: content
for content, id_ in {
content: id_ for id_, content in contents.items()
}.items()
unique_contents = {}
for id_, content_data in contents.items():
content = content_data["content"]
file_path = content_data["file_path"]
if content not in unique_contents:
unique_contents[content] = (id_, file_path)
# Reconstruct contents with unique content
contents = {
id_: {"content": content, "file_path": file_path}
for content, (id_, file_path) in unique_contents.items()
}
# 3. Generate document initial status
new_docs: dict[str, Any] = {
id_: {
"content": content,
"content_summary": get_content_summary(content),
"content_length": len(content),
"status": DocStatus.PENDING,
"content": content_data["content"],
"content_summary": get_content_summary(content_data["content"]),
"content_length": len(content_data["content"]),
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"file_path": content_data[
"file_path"
], # Store file path in document status
}
for id_, content in unique_contents.items()
for id_, content_data in contents.items()
}
# 4. Filter out already processed documents
@@ -841,11 +897,15 @@ class LightRAG:
) -> None:
"""Process single document"""
try:
# Get file path from status document
file_path = getattr(status_doc, "file_path", "unknown_source")
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
"full_doc_id": doc_id,
"file_path": file_path, # Add file path to each chunk
}
for dp in self.chunking_func(
status_doc.content,
@@ -856,6 +916,7 @@ class LightRAG:
self.tiktoken_model_name,
)
}
# Process document (text chunks and full docs) in parallel
# Create tasks with references for potential cancellation
doc_status_task = asyncio.create_task(
@@ -863,11 +924,13 @@ class LightRAG:
{
doc_id: {
"status": DocStatus.PROCESSING,
"updated_at": datetime.now().isoformat(),
"chunks_count": len(chunks),
"content": status_doc.content,
"content_summary": status_doc.content_summary,
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
)
@@ -906,6 +969,7 @@ class LightRAG:
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
)
@@ -937,6 +1001,7 @@ class LightRAG:
"content_length": status_doc.content_length,
"created_at": status_doc.created_at,
"updated_at": datetime.now().isoformat(),
"file_path": file_path,
}
}
)
@@ -1063,7 +1128,10 @@ class LightRAG:
loop.run_until_complete(self.ainsert_custom_kg(custom_kg, full_doc_id))
async def ainsert_custom_kg(
self, custom_kg: dict[str, Any], full_doc_id: str = None
self,
custom_kg: dict[str, Any],
full_doc_id: str = None,
file_path: str = "custom_kg",
) -> None:
update_storage = False
try:
@@ -1093,6 +1161,7 @@ class LightRAG:
"full_doc_id": full_doc_id
if full_doc_id is not None
else source_id,
"file_path": file_path, # Add file path
"status": DocStatus.PROCESSED,
}
all_chunks_data[chunk_id] = chunk_entry
@@ -1197,6 +1266,7 @@ class LightRAG:
"source_id": dp["source_id"],
"description": dp["description"],
"entity_type": dp["entity_type"],
"file_path": file_path, # Add file path
}
for dp in all_entities_data
}
@@ -1212,6 +1282,7 @@ class LightRAG:
"keywords": dp["keywords"],
"description": dp["description"],
"weight": dp["weight"],
"file_path": file_path, # Add file path
}
for dp in all_relationships_data
}
@@ -2220,7 +2291,6 @@ class LightRAG:
"""Synchronously create a new entity.
Creates a new entity in the knowledge graph and adds it to the vector database.
Args:
entity_name: Name of the new entity
entity_data: Dictionary containing entity attributes, e.g. {"description": "description", "entity_type": "type"}

View File

@@ -138,6 +138,7 @@ async def _handle_entity_relation_summary(
async def _handle_single_entity_extraction(
record_attributes: list[str],
chunk_key: str,
file_path: str = "unknown_source",
):
if len(record_attributes) < 4 or record_attributes[0] != '"entity"':
return None
@@ -171,13 +172,14 @@ async def _handle_single_entity_extraction(
entity_type=entity_type,
description=entity_description,
source_id=chunk_key,
metadata={"created_at": time.time()},
metadata={"created_at": time.time(), "file_path": file_path},
)
async def _handle_single_relationship_extraction(
record_attributes: list[str],
chunk_key: str,
file_path: str = "unknown_source",
):
if len(record_attributes) < 5 or record_attributes[0] != '"relationship"':
return None
@@ -199,7 +201,7 @@ async def _handle_single_relationship_extraction(
description=edge_description,
keywords=edge_keywords,
source_id=edge_source_id,
metadata={"created_at": time.time()},
metadata={"created_at": time.time(), "file_path": file_path},
)
@@ -213,6 +215,7 @@ async def _merge_nodes_then_upsert(
already_entity_types = []
already_source_ids = []
already_description = []
already_file_paths = []
already_node = await knowledge_graph_inst.get_node(entity_name)
if already_node is not None:
@@ -220,6 +223,11 @@ async def _merge_nodes_then_upsert(
already_source_ids.extend(
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
)
already_file_paths.extend(
split_string_by_multi_markers(
already_node["metadata"]["file_path"], [GRAPH_FIELD_SEP]
)
)
already_description.append(already_node["description"])
entity_type = sorted(
@@ -235,6 +243,10 @@ async def _merge_nodes_then_upsert(
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
file_path = GRAPH_FIELD_SEP.join(
set([dp["metadata"]["file_path"] for dp in nodes_data] + already_file_paths)
)
print(f"file_path: {file_path}")
description = await _handle_entity_relation_summary(
entity_name, description, global_config
)
@@ -243,6 +255,7 @@ async def _merge_nodes_then_upsert(
entity_type=entity_type,
description=description,
source_id=source_id,
file_path=file_path,
)
await knowledge_graph_inst.upsert_node(
entity_name,
@@ -263,6 +276,7 @@ async def _merge_edges_then_upsert(
already_source_ids = []
already_description = []
already_keywords = []
already_file_paths = []
if await knowledge_graph_inst.has_edge(src_id, tgt_id):
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
@@ -279,6 +293,14 @@ async def _merge_edges_then_upsert(
)
)
# Get file_path with empty string default if missing or None
if already_edge.get("file_path") is not None:
already_file_paths.extend(
split_string_by_multi_markers(
already_edge["metadata"]["file_path"], [GRAPH_FIELD_SEP]
)
)
# Get description with empty string default if missing or None
if already_edge.get("description") is not None:
already_description.append(already_edge["description"])
@@ -315,6 +337,16 @@ async def _merge_edges_then_upsert(
+ already_source_ids
)
)
file_path = GRAPH_FIELD_SEP.join(
set(
[
dp["metadata"]["file_path"]
for dp in edges_data
if dp.get("metadata", {}).get("file_path")
]
+ already_file_paths
)
)
for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)):
@@ -325,6 +357,7 @@ async def _merge_edges_then_upsert(
"source_id": source_id,
"description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
},
)
description = await _handle_entity_relation_summary(
@@ -338,6 +371,7 @@ async def _merge_edges_then_upsert(
description=description,
keywords=keywords,
source_id=source_id,
file_path=file_path,
),
)
@@ -347,6 +381,7 @@ async def _merge_edges_then_upsert(
description=description,
keywords=keywords,
source_id=source_id,
file_path=file_path,
)
return edge_data
@@ -456,11 +491,14 @@ async def extract_entities(
else:
return await use_llm_func(input_text)
async def _process_extraction_result(result: str, chunk_key: str):
async def _process_extraction_result(
result: str, chunk_key: str, file_path: str = "unknown_source"
):
"""Process a single extraction result (either initial or gleaning)
Args:
result (str): The extraction result to process
chunk_key (str): The chunk key for source tracking
file_path (str): The file path for citation
Returns:
tuple: (nodes_dict, edges_dict) containing the extracted entities and relationships
"""
@@ -482,14 +520,14 @@ async def extract_entities(
)
if_entities = await _handle_single_entity_extraction(
record_attributes, chunk_key
record_attributes, chunk_key, file_path
)
if if_entities is not None:
maybe_nodes[if_entities["entity_name"]].append(if_entities)
continue
if_relation = await _handle_single_relationship_extraction(
record_attributes, chunk_key
record_attributes, chunk_key, file_path
)
if if_relation is not None:
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
@@ -508,6 +546,8 @@ async def extract_entities(
chunk_key = chunk_key_dp[0]
chunk_dp = chunk_key_dp[1]
content = chunk_dp["content"]
# Get file path from chunk data or use default
file_path = chunk_dp.get("file_path", "unknown_source")
# Get initial extraction
hint_prompt = entity_extract_prompt.format(
@@ -517,9 +557,9 @@ async def extract_entities(
final_result = await _user_llm_func_with_cache(hint_prompt)
history = pack_user_ass_to_openai_messages(hint_prompt, final_result)
# Process initial extraction
# Process initial extraction with file path
maybe_nodes, maybe_edges = await _process_extraction_result(
final_result, chunk_key
final_result, chunk_key, file_path
)
# Process additional gleaning results
@@ -530,9 +570,9 @@ async def extract_entities(
history += pack_user_ass_to_openai_messages(continue_prompt, glean_result)
# Process gleaning result separately
# Process gleaning result separately with file path
glean_nodes, glean_edges = await _process_extraction_result(
glean_result, chunk_key
glean_result, chunk_key, file_path
)
# Merge results
@@ -637,8 +677,10 @@ async def extract_entities(
"entity_type": dp["entity_type"],
"content": f"{dp['entity_name']}\n{dp['description']}",
"source_id": dp["source_id"],
"file_path": dp.get("file_path", "unknown_source"),
"metadata": {
"created_at": dp.get("metadata", {}).get("created_at", time.time())
"created_at": dp.get("created_at", time.time()),
"file_path": dp.get("file_path", "unknown_source"),
},
}
for dp in all_entities_data
@@ -653,8 +695,10 @@ async def extract_entities(
"keywords": dp["keywords"],
"content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
"source_id": dp["source_id"],
"file_path": dp.get("file_path", "unknown_source"),
"metadata": {
"created_at": dp.get("metadata", {}).get("created_at", time.time())
"created_at": dp.get("created_at", time.time()),
"file_path": dp.get("file_path", "unknown_source"),
},
}
for dp in all_relationships_data
@@ -1232,12 +1276,20 @@ async def _get_node_data(
"description",
"rank",
"created_at",
"file_path",
]
]
for i, n in enumerate(node_datas):
created_at = n.get("created_at", "UNKNOWN")
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from metadata or directly from node data
file_path = n.get("file_path", "unknown_source")
if not file_path or file_path == "unknown_source":
# Try to get from metadata
file_path = n.get("metadata", {}).get("file_path", "unknown_source")
entites_section_list.append(
[
i,
@@ -1246,6 +1298,7 @@ async def _get_node_data(
n.get("description", "UNKNOWN"),
n["rank"],
created_at,
file_path,
]
)
entities_context = list_of_list_to_csv(entites_section_list)
@@ -1260,6 +1313,7 @@ async def _get_node_data(
"weight",
"rank",
"created_at",
"file_path",
]
]
for i, e in enumerate(use_relations):
@@ -1267,6 +1321,13 @@ async def _get_node_data(
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from metadata or directly from edge data
file_path = e.get("file_path", "unknown_source")
if not file_path or file_path == "unknown_source":
# Try to get from metadata
file_path = e.get("metadata", {}).get("file_path", "unknown_source")
relations_section_list.append(
[
i,
@@ -1277,6 +1338,7 @@ async def _get_node_data(
e["weight"],
e["rank"],
created_at,
file_path,
]
)
relations_context = list_of_list_to_csv(relations_section_list)
@@ -1492,6 +1554,7 @@ async def _get_edge_data(
"weight",
"rank",
"created_at",
"file_path",
]
]
for i, e in enumerate(edge_datas):
@@ -1499,6 +1562,13 @@ async def _get_edge_data(
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from metadata or directly from edge data
file_path = e.get("file_path", "unknown_source")
if not file_path or file_path == "unknown_source":
# Try to get from metadata
file_path = e.get("metadata", {}).get("file_path", "unknown_source")
relations_section_list.append(
[
i,
@@ -1509,16 +1579,26 @@ async def _get_edge_data(
e["weight"],
e["rank"],
created_at,
file_path,
]
)
relations_context = list_of_list_to_csv(relations_section_list)
entites_section_list = [["id", "entity", "type", "description", "rank"]]
entites_section_list = [
["id", "entity", "type", "description", "rank", "created_at", "file_path"]
]
for i, n in enumerate(use_entities):
created_at = e.get("created_at", "Unknown")
created_at = n.get("created_at", "Unknown")
# Convert timestamp to readable format
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
# Get file path from metadata or directly from node data
file_path = n.get("file_path", "unknown_source")
if not file_path or file_path == "unknown_source":
# Try to get from metadata
file_path = n.get("metadata", {}).get("file_path", "unknown_source")
entites_section_list.append(
[
i,
@@ -1527,6 +1607,7 @@ async def _get_edge_data(
n.get("description", "UNKNOWN"),
n["rank"],
created_at,
file_path,
]
)
entities_context = list_of_list_to_csv(entites_section_list)
@@ -1882,13 +1963,14 @@ async def kg_query_with_keywords(
len_of_prompts = len(encode_string_by_tiktoken(query + sys_prompt))
logger.debug(f"[kg_query_with_keywords]Prompt Tokens: {len_of_prompts}")
# 6. Generate response
response = await use_model_func(
query,
system_prompt=sys_prompt,
stream=query_param.stream,
)
# 清理响应内容
# Clean up response content
if isinstance(response, str) and len(response) > len(sys_prompt):
response = (
response.replace(sys_prompt, "")

View File

@@ -61,7 +61,7 @@ Text:
```
while Alex clenched his jaw, the buzz of frustration dull against the backdrop of Taylor's authoritarian certainty. It was this competitive undercurrent that kept him alert, the sense that his and Jordan's shared commitment to discovery was an unspoken rebellion against Cruz's narrowing vision of control and order.
Then Taylor did something unexpected. They paused beside Jordan and, for a moment, observed the device with something akin to reverence. If this tech can be understood..." Taylor said, their voice quieter, "It could change the game for us. For all of us.
Then Taylor did something unexpected. They paused beside Jordan and, for a moment, observed the device with something akin to reverence. "If this tech can be understood..." Taylor said, their voice quieter, "It could change the game for us. For all of us."
The underlying dismissal earlier seemed to falter, replaced by a glimpse of reluctant respect for the gravity of what lay in their hands. Jordan looked up, and for a fleeting heartbeat, their eyes locked with Taylor's, a wordless clash of wills softening into an uneasy truce.
@@ -92,7 +92,7 @@ Among the hardest hit, Nexon Technologies saw its stock plummet by 7.8% after re
Meanwhile, commodity markets reflected a mixed sentiment. Gold futures rose by 1.5%, reaching $2,080 per ounce, as investors sought safe-haven assets. Crude oil prices continued their rally, climbing to $87.60 per barrel, supported by supply constraints and strong demand.
Financial experts are closely watching the Federal Reserves next move, as speculation grows over potential rate hikes. The upcoming policy announcement is expected to influence investor confidence and overall market stability.
Financial experts are closely watching the Federal Reserve's next move, as speculation grows over potential rate hikes. The upcoming policy announcement is expected to influence investor confidence and overall market stability.
```
Output:
@@ -222,6 +222,7 @@ When handling relationships with timestamps:
- Use markdown formatting with appropriate section headings
- Please respond in the same language as the user's question.
- Ensure the response maintains continuity with the conversation history.
- List up to 5 most important reference sources at the end under "References" section. Clearly indicating whether each source is from Knowledge Graph (KG) or Vector Data (DC), and include the file path if available, in the following format: [KG/DC] Source content (File: file_path)
- If you don't know the answer, just say so.
- Do not make anything up. Do not include information not provided by the Knowledge Base."""
@@ -319,6 +320,7 @@ When handling content with timestamps:
- Use markdown formatting with appropriate section headings
- Please respond in the same language as the user's question.
- Ensure the response maintains continuity with the conversation history.
- List up to 5 most important reference sources at the end under "References" section. Clearly indicating whether each source is from Knowledge Graph (KG) or Vector Data (DC), and include the file path if available, in the following format: [KG/DC] Source content (File: file_path)
- If you don't know the answer, just say so.
- Do not include information not provided by the Document Chunks."""
@@ -378,8 +380,8 @@ When handling information with timestamps:
- Use markdown formatting with appropriate section headings
- Please respond in the same language as the user's question.
- Ensure the response maintains continuity with the conversation history.
- Organize answer in sesctions focusing on one main point or aspect of the answer
- Organize answer in sections focusing on one main point or aspect of the answer
- Use clear and descriptive section titles that reflect the content
- List up to 5 most important reference sources at the end under "References" sesction. Clearly indicating whether each source is from Knowledge Graph (KG) or Vector Data (DC), in the following format: [KG/DC] Source content
- List up to 5 most important reference sources at the end under "References" section. Clearly indicating whether each source is from Knowledge Graph (KG) or Vector Data (DC), and include the file path if available, in the following format: [KG/DC] Source content (File: file_path)
- If you don't know the answer, just say so. Do not make anything up.
- Do not include information not provided by the Data Sources."""