feat: 增强知识图谱关系的时序性支持

- 为关系和向量数据增加时间戳支持,记录知识获取的时间
- 优化混合查询策略,同时考虑语义相关性和时间顺序
- 增强提示词模板,指导LLM在处理冲突信息时考虑时间因素
This commit is contained in:
Magic_yuan
2024-12-29 15:25:57 +08:00
parent b8b371f65a
commit 4c950cf4ce
3 changed files with 76 additions and 10 deletions

View File

@@ -30,6 +30,7 @@ from .base import (
QueryParam,
)
from .prompt import GRAPH_FIELD_SEP, PROMPTS
import time
def chunking_by_token_size(
@@ -128,6 +129,9 @@ async def _handle_single_relationship_extraction(
description=edge_description,
keywords=edge_keywords,
source_id=edge_source_id,
metadata={
"created_at": time.time()
}
)
@@ -445,6 +449,9 @@ async def extract_entities(
+ dp["src_id"]
+ dp["tgt_id"]
+ dp["description"],
"metadata": {
"created_at": dp.get("metadata", {}).get("created_at", time.time())
}
}
for dp in all_relationships_data
}
@@ -733,9 +740,13 @@ async def _get_node_data(
entities_context = list_of_list_to_csv(entites_section_list)
relations_section_list = [
["id", "source", "target", "description", "keywords", "weight", "rank"]
["id", "source", "target", "description", "keywords", "weight", "rank", "created_at"]
]
for i, e in enumerate(use_relations):
created_at = e.get("created_at", "未知")
# 转换时间戳为可读格式
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
relations_section_list.append(
[
i,
@@ -745,6 +756,7 @@ async def _get_node_data(
e["keywords"],
e["weight"],
e["rank"],
created_at
]
)
relations_context = list_of_list_to_csv(relations_section_list)
@@ -882,6 +894,8 @@ async def _get_edge_data(
if not len(results):
return "", "", ""
# 从 KV 存储中获取完整的关系信息
edge_ids = [r["id"] for r in results]
edge_datas = await asyncio.gather(
*[knowledge_graph_inst.get_edge(r["src_id"], r["tgt_id"]) for r in results]
)
@@ -892,7 +906,13 @@ async def _get_edge_data(
*[knowledge_graph_inst.edge_degree(r["src_id"], r["tgt_id"]) for r in results]
)
edge_datas = [
{"src_id": k["src_id"], "tgt_id": k["tgt_id"], "rank": d, **v}
{
"src_id": k["src_id"],
"tgt_id": k["tgt_id"],
"rank": d,
"created_at": k.get("__created_at__", None), # 从 KV 存储中获取时间元数据
**v
}
for k, v, d in zip(results, edge_datas, edge_degree)
if v is not None
]
@@ -916,9 +936,13 @@ async def _get_edge_data(
)
relations_section_list = [
["id", "source", "target", "description", "keywords", "weight", "rank"]
["id", "source", "target", "description", "keywords", "weight", "rank", "created_at"]
]
for i, e in enumerate(edge_datas):
created_at = e.get("created_at", "未知")
# 转换时间戳为可读格式
if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
relations_section_list.append(
[
i,
@@ -928,6 +952,7 @@ async def _get_edge_data(
e["keywords"],
e["weight"],
e["rank"],
created_at
]
)
relations_context = list_of_list_to_csv(relations_section_list)
@@ -1259,9 +1284,15 @@ async def mix_kg_vector_query(
chunks_ids = [r["id"] for r in results]
chunks = await text_chunks_db.get_by_ids(chunks_ids)
valid_chunks = [
chunk for chunk in chunks if chunk is not None and "content" in chunk
]
valid_chunks = []
for chunk, result in zip(chunks, results):
if chunk is not None and "content" in chunk:
# 合并 chunk 内容和时间元数据
chunk_with_time = {
"content": chunk["content"],
"created_at": result.get("created_at", None)
}
valid_chunks.append(chunk_with_time)
if not valid_chunks:
return None
@@ -1275,7 +1306,15 @@ async def mix_kg_vector_query(
if not maybe_trun_chunks:
return None
return "\n--New Chunk--\n".join([c["content"] for c in maybe_trun_chunks])
# 在内容中包含时间信息
formatted_chunks = []
for c in maybe_trun_chunks:
chunk_text = c["content"]
if c["created_at"]:
chunk_text = f"[Created at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(c['created_at']))}]\n{chunk_text}"
formatted_chunks.append(chunk_text)
return "\n--New Chunk--\n".join(formatted_chunks)
except Exception as e:
logger.error(f"Error in get_vector_context: {e}")
return None