feat: 增强知识图谱关系的时序性支持

- 为关系和向量数据增加时间戳支持,记录知识获取的时间
- 优化混合查询策略,同时考虑语义相关性和时间顺序
- 增强提示词模板,指导LLM在处理冲突信息时考虑时间因素
This commit is contained in:
Magic_yuan
2024-12-29 15:37:34 +08:00
parent 4c950cf4ce
commit 7b91dc7fd8
2 changed files with 40 additions and 26 deletions

View File

@@ -129,9 +129,7 @@ async def _handle_single_relationship_extraction(
description=edge_description, description=edge_description,
keywords=edge_keywords, keywords=edge_keywords,
source_id=edge_source_id, source_id=edge_source_id,
metadata={ metadata={"created_at": time.time()},
"created_at": time.time()
}
) )
@@ -451,7 +449,7 @@ async def extract_entities(
+ dp["description"], + dp["description"],
"metadata": { "metadata": {
"created_at": dp.get("metadata", {}).get("created_at", time.time()) "created_at": dp.get("metadata", {}).get("created_at", time.time())
} },
} }
for dp in all_relationships_data for dp in all_relationships_data
} }
@@ -740,11 +738,20 @@ async def _get_node_data(
entities_context = list_of_list_to_csv(entites_section_list) entities_context = list_of_list_to_csv(entites_section_list)
relations_section_list = [ relations_section_list = [
["id", "source", "target", "description", "keywords", "weight", "rank", "created_at"] [
"id",
"source",
"target",
"description",
"keywords",
"weight",
"rank",
"created_at",
]
] ]
for i, e in enumerate(use_relations): for i, e in enumerate(use_relations):
created_at = e.get("created_at", "未知") created_at = e.get("created_at", "UNKNOWN")
# 转换时间戳为可读格式 # Convert timestamp to readable format
if isinstance(created_at, (int, float)): if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at)) created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
relations_section_list.append( relations_section_list.append(
@@ -756,7 +763,7 @@ async def _get_node_data(
e["keywords"], e["keywords"],
e["weight"], e["weight"],
e["rank"], e["rank"],
created_at created_at,
] ]
) )
relations_context = list_of_list_to_csv(relations_section_list) relations_context = list_of_list_to_csv(relations_section_list)
@@ -894,8 +901,6 @@ async def _get_edge_data(
if not len(results): if not len(results):
return "", "", "" return "", "", ""
# 从 KV 存储中获取完整的关系信息
edge_ids = [r["id"] for r in results]
edge_datas = await asyncio.gather( edge_datas = await asyncio.gather(
*[knowledge_graph_inst.get_edge(r["src_id"], r["tgt_id"]) for r in results] *[knowledge_graph_inst.get_edge(r["src_id"], r["tgt_id"]) for r in results]
) )
@@ -907,11 +912,11 @@ async def _get_edge_data(
) )
edge_datas = [ edge_datas = [
{ {
"src_id": k["src_id"], "src_id": k["src_id"],
"tgt_id": k["tgt_id"], "tgt_id": k["tgt_id"],
"rank": d, "rank": d,
"created_at": k.get("__created_at__", None), # 从 KV 存储中获取时间元数据 "created_at": k.get("__created_at__", None), # 从 KV 存储中获取时间元数据
**v **v,
} }
for k, v, d in zip(results, edge_datas, edge_degree) for k, v, d in zip(results, edge_datas, edge_degree)
if v is not None if v is not None
@@ -936,11 +941,20 @@ async def _get_edge_data(
) )
relations_section_list = [ relations_section_list = [
["id", "source", "target", "description", "keywords", "weight", "rank", "created_at"] [
"id",
"source",
"target",
"description",
"keywords",
"weight",
"rank",
"created_at",
]
] ]
for i, e in enumerate(edge_datas): for i, e in enumerate(edge_datas):
created_at = e.get("created_at", "未知") created_at = e.get("created_at", "Unknown")
# 转换时间戳为可读格式 # Convert timestamp to readable format
if isinstance(created_at, (int, float)): if isinstance(created_at, (int, float)):
created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at)) created_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(created_at))
relations_section_list.append( relations_section_list.append(
@@ -952,7 +966,7 @@ async def _get_edge_data(
e["keywords"], e["keywords"],
e["weight"], e["weight"],
e["rank"], e["rank"],
created_at created_at,
] ]
) )
relations_context = list_of_list_to_csv(relations_section_list) relations_context = list_of_list_to_csv(relations_section_list)
@@ -1287,10 +1301,10 @@ async def mix_kg_vector_query(
valid_chunks = [] valid_chunks = []
for chunk, result in zip(chunks, results): for chunk, result in zip(chunks, results):
if chunk is not None and "content" in chunk: if chunk is not None and "content" in chunk:
# 合并 chunk 内容和时间元数据 # Merge chunk content and time metadata
chunk_with_time = { chunk_with_time = {
"content": chunk["content"], "content": chunk["content"],
"created_at": result.get("created_at", None) "created_at": result.get("created_at", None),
} }
valid_chunks.append(chunk_with_time) valid_chunks.append(chunk_with_time)
@@ -1306,7 +1320,7 @@ async def mix_kg_vector_query(
if not maybe_trun_chunks: if not maybe_trun_chunks:
return None return None
# 在内容中包含时间信息 # Include time information in content
formatted_chunks = [] formatted_chunks = []
for c in maybe_trun_chunks: for c in maybe_trun_chunks:
chunk_text = c["content"] chunk_text = c["content"]

View File

@@ -88,7 +88,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
if not len(data): if not len(data):
logger.warning("You insert an empty data to vector DB") logger.warning("You insert an empty data to vector DB")
return [] return []
current_time = time.time() current_time = time.time()
list_data = [ list_data = [
{ {
@@ -137,11 +137,11 @@ class NanoVectorDBStorage(BaseVectorStorage):
) )
results = [ results = [
{ {
**dp, **dp,
"id": dp["__id__"], "id": dp["__id__"],
"distance": dp["__metrics__"], "distance": dp["__metrics__"],
"created_at": dp.get("__created_at__") "created_at": dp.get("__created_at__"),
} }
for dp in results for dp in results
] ]
return results return results