From ceae2eb92ddae51136160884a6fc79bb0f4c7b0d Mon Sep 17 00:00:00 2001 From: Mykola Chaban Date: Thu, 13 Mar 2025 13:45:09 +0200 Subject: [PATCH 1/5] fixed issue with convert the age query to the dictionary; refactored solution of storing chunk ids; --- lightrag/kg/postgres_impl.py | 51 +++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 49d462f6..693ea037 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -432,19 +432,31 @@ class PGVectorStorage(BaseVectorStorage): def _upsert_entities(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_entity"] + source_id = item["source_id"] + if isinstance(source_id, str) and "" in source_id: + chunk_ids = source_id.split("") + else: + chunk_ids = [source_id] + data: dict[str, Any] = { "workspace": self.db.workspace, "id": item["__id__"], "entity_name": item["entity_name"], "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), - "chunk_id": item["source_id"], + "chunk_ids": chunk_ids, # TODO: add document_id } return upsert_sql, data def _upsert_relationships(self, item: dict[str, Any]) -> tuple[str, dict[str, Any]]: upsert_sql = SQL_TEMPLATES["upsert_relationship"] + source_id = item["source_id"] + if isinstance(source_id, str) and "" in source_id: + chunk_ids = source_id.split("") + else: + chunk_ids = [source_id] + data: dict[str, Any] = { "workspace": self.db.workspace, "id": item["__id__"], @@ -452,7 +464,7 @@ class PGVectorStorage(BaseVectorStorage): "target_id": item["tgt_id"], "content": item["content"], "content_vector": json.dumps(item["__vector__"].tolist()), - "chunk_id": item["source_id"], + "chunk_ids": chunk_ids, # TODO: add document_id } return upsert_sql, data @@ -950,10 +962,14 @@ class PGGraphStorage(BaseGraphStorage): vertices.get(edge["end_id"], {}), ) else: - if v is None or (v.count("{") < 1 and v.count("[") < 1): + if v is None: d[k] = v + elif isinstance(v, str) and (v.count("{") < 1 and v.count("[") < 1): + d[k] = v + elif isinstance(v, str): + d[k] = json.loads(v) else: - d[k] = json.loads(v) if isinstance(v, str) else v + d[k] = v return d @@ -1556,7 +1572,7 @@ TABLES = { content_vector VECTOR, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, - chunk_id VARCHAR(255) NULL, + chunk_ids VARCHAR(255)[] NULL, CONSTRAINT LIGHTRAG_VDB_ENTITY_PK PRIMARY KEY (workspace, id) )""" }, @@ -1570,7 +1586,7 @@ TABLES = { content_vector VECTOR, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, update_time TIMESTAMP, - chunk_id VARCHAR(255) NULL, + chunk_ids VARCHAR(255)[] NULL, CONSTRAINT LIGHTRAG_VDB_RELATION_PK PRIMARY KEY (workspace, id) )""" }, @@ -1654,22 +1670,25 @@ SQL_TEMPLATES = { update_time = CURRENT_TIMESTAMP """, "upsert_entity": """INSERT INTO LIGHTRAG_VDB_ENTITY (workspace, id, entity_name, content, - content_vector, chunk_id) - VALUES ($1, $2, $3, $4, $5, $6) + content_vector, chunk_ids) + VALUES ($1, $2, $3, $4, $5, $6::varchar[]) ON CONFLICT (workspace,id) DO UPDATE SET entity_name=EXCLUDED.entity_name, content=EXCLUDED.content, content_vector=EXCLUDED.content_vector, + chunk_ids=EXCLUDED.chunk_ids, update_time=CURRENT_TIMESTAMP """, "upsert_relationship": """INSERT INTO LIGHTRAG_VDB_RELATION (workspace, id, source_id, - target_id, content, content_vector, chunk_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) + target_id, content, content_vector, chunk_ids) + VALUES ($1, $2, $3, $4, $5, $6, $7::varchar[]) ON CONFLICT (workspace,id) DO UPDATE SET source_id=EXCLUDED.source_id, target_id=EXCLUDED.target_id, content=EXCLUDED.content, - content_vector=EXCLUDED.content_vector, update_time = CURRENT_TIMESTAMP + content_vector=EXCLUDED.content_vector, + chunk_ids=EXCLUDED.chunk_ids, + update_time = CURRENT_TIMESTAMP """, # SQL for VectorStorage # "entities": """SELECT entity_name FROM @@ -1720,8 +1739,8 @@ SQL_TEMPLATES = { FROM ( SELECT r.id, r.source_id, r.target_id, 1 - (r.content_vector <=> '[{embedding_string}]'::vector) as distance FROM LIGHTRAG_VDB_RELATION r + JOIN relevant_chunks c ON c.chunk_id = ANY(r.chunk_ids) WHERE r.workspace=$1 - AND r.chunk_id IN (SELECT chunk_id FROM relevant_chunks) ) filtered WHERE distance>$2 ORDER BY distance DESC @@ -1735,10 +1754,10 @@ SQL_TEMPLATES = { ) SELECT entity_name FROM ( - SELECT id, entity_name, 1 - (content_vector <=> '[{embedding_string}]'::vector) as distance - FROM LIGHTRAG_VDB_ENTITY - where workspace=$1 - AND chunk_id IN (SELECT chunk_id FROM relevant_chunks) + SELECT e.id, e.entity_name, 1 - (e.content_vector <=> '[{embedding_string}]'::vector) as distance + FROM LIGHTRAG_VDB_ENTITY e + JOIN relevant_chunks c ON c.chunk_id = ANY(e.chunk_ids) + WHERE e.workspace=$1 ) WHERE distance>$2 ORDER BY distance DESC From edc95126de20e1a53eed3d952775c6fd0d729525 Mon Sep 17 00:00:00 2001 From: jofoks Date: Thu, 13 Mar 2025 11:30:52 -0700 Subject: [PATCH 2/5] Fixed some query parsing issues --- lightrag/kg/postgres_impl.py | 119 ++++++++++++++--------------------- 1 file changed, 47 insertions(+), 72 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 49d462f6..b350710c 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -950,10 +950,7 @@ class PGGraphStorage(BaseGraphStorage): vertices.get(edge["end_id"], {}), ) else: - if v is None or (v.count("{") < 1 and v.count("[") < 1): - d[k] = v - else: - d[k] = json.loads(v) if isinstance(v, str) else v + d[k] = json.loads(v) if isinstance(v, str) and ("{" in v or "[" in v) else v return d @@ -1395,9 +1392,7 @@ class PGGraphStorage(BaseGraphStorage): embed_func = self._node_embed_algorithms[algorithm] return await embed_func() - async def get_knowledge_graph( - self, node_label: str, max_depth: int = 5 - ) -> KnowledgeGraph: + async def get_knowledge_graph(self, node_label: str, max_depth: int = 5) -> KnowledgeGraph: """ Retrieve a subgraph containing the specified node and its neighbors up to the specified depth. @@ -1410,29 +1405,22 @@ class PGGraphStorage(BaseGraphStorage): """ MAX_GRAPH_NODES = 1000 + # Build the query based on whether we want the full graph or a specific subgraph. if node_label == "*": - query = """SELECT * FROM cypher('%s', $$ - MATCH (n:Entity) - OPTIONAL MATCH (n)-[r]->(m:Entity) - RETURN n, r, m - LIMIT %d - $$) AS (n agtype, r agtype, m agtype)""" % ( - self.graph_name, - MAX_GRAPH_NODES, - ) + query = f"""SELECT * FROM cypher('{self.graph_name}', $$ + MATCH (n:Entity) + OPTIONAL MATCH (n)-[r]->(m:Entity) + RETURN n, r, m + LIMIT {MAX_GRAPH_NODES} + $$) AS (n agtype, r agtype, m agtype)""" else: - encoded_node_label = self._encode_graph_label(node_label.strip('"')) - query = """SELECT * FROM cypher('%s', $$ - MATCH (n:Entity {node_id: "%s"}) - OPTIONAL MATCH p = (n)-[*..%d]-(m) - RETURN nodes(p) AS nodes, relationships(p) AS relationships - LIMIT %d - $$) AS (nodes agtype, relationships agtype)""" % ( - self.graph_name, - encoded_node_label, - max_depth, - MAX_GRAPH_NODES, - ) + encoded_label = self._encode_graph_label(node_label.strip('"')) + query = f"""SELECT * FROM cypher('{self.graph_name}', $$ + MATCH (n:Entity {{node_id: "{encoded_label}"}}) + OPTIONAL MATCH p = (n)-[*..{max_depth}]-(m) + RETURN nodes(p) AS nodes, relationships(p) AS relationships + LIMIT {MAX_GRAPH_NODES} + $$) AS (nodes agtype, relationships agtype)""" results = await self._query(query) @@ -1440,61 +1428,48 @@ class PGGraphStorage(BaseGraphStorage): edges = [] unique_edge_ids = set() - for result in results: - if node_label == "*": - if result["n"]: - node = result["n"] - node_id = self._decode_graph_label(node["node_id"]) - if node_id not in nodes: - nodes[node_id] = node + def add_node(node_data: dict): + node_id = self._decode_graph_label(node_data["node_id"]) + if node_id not in nodes: + nodes[node_id] = node_data - if result["m"]: - node = result["m"] - node_id = self._decode_graph_label(node["node_id"]) - if node_id not in nodes: - nodes[node_id] = node - if result["r"]: - edge = result["r"] - src_id = self._decode_graph_label(edge["start_id"]) - tgt_id = self._decode_graph_label(edge["end_id"]) - edges.append((src_id, tgt_id)) - else: - if result["nodes"]: - for node in result["nodes"]: - node_id = self._decode_graph_label(node["node_id"]) - if node_id not in nodes: - nodes[node_id] = node + def add_edge(edge_data: list): + src_id = self._decode_graph_label(edge_data[0]["node_id"]) + tgt_id = self._decode_graph_label(edge_data[2]["node_id"]) + edge_key = f"{src_id},{tgt_id}" + if edge_key not in unique_edge_ids: + unique_edge_ids.add(edge_key) + edges.append((edge_key, src_id, tgt_id, {"source": edge_data[0], "target": edge_data[2]})) - if result["relationships"]: - for edge in result["relationships"]: # src --DIRECTED--> target - src_id = self._decode_graph_label(edge[0]["node_id"]) - tgt_id = self._decode_graph_label(edge[2]["node_id"]) - id = src_id + "," + tgt_id - if id in unique_edge_ids: - continue - else: - unique_edge_ids.add(id) - edges.append( - (id, src_id, tgt_id, {"source": edge[0], "target": edge[2]}) - ) + # Process the query results. + if node_label == "*": + for result in results: + if result.get("n"): + add_node(result["n"]) + if result.get("m"): + add_node(result["m"]) + if result.get("r"): + add_edge(result["r"]) + else: + for result in results: + for node in result.get("nodes", []): + add_node(node) + for edge in result.get("relationships", []): + add_edge(edge) + # Construct and return the KnowledgeGraph. kg = KnowledgeGraph( nodes=[ - KnowledgeGraphNode( - id=node_id, labels=[node_id], properties=nodes[node_id] - ) - for node_id in nodes + KnowledgeGraphNode(id=node_id, labels=[node_id], properties=node_data) + for node_id, node_data in nodes.items() ], edges=[ - KnowledgeGraphEdge( - id=id, type="DIRECTED", source=src, target=tgt, properties=props - ) - for id, src, tgt, props in edges + KnowledgeGraphEdge(id=edge_id, type="DIRECTED", source=src, target=tgt, properties=props) + for edge_id, src, tgt, props in edges ], ) return kg - async def drop(self) -> None: """Drop the storage""" drop_sql = SQL_TEMPLATES["drop_vdb_entity"] From 4e58c7d7da236ce662633ea1fd7025b28b6d67be Mon Sep 17 00:00:00 2001 From: pengjunfeng11 <34857167+pengjunfeng11@users.noreply.github.com> Date: Mon, 17 Mar 2025 10:47:17 +0800 Subject: [PATCH 3/5] Update postgres_impl.py There is aBUG. When lighttrag uses the postgres@15 version, there will be an error.The BUG detail as follow: PostgreSQL database, error:subquery in FROM must have an alias HINT: For example, FROM (SELECT ...) [AS] foo. It roughly means that there is an error in SQL template, and you need to use the alias alias to declare the content of the subquery. --- lightrag/kg/postgres_impl.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 6a3e7c5f..99e4f5c4 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -1739,7 +1739,7 @@ SQL_TEMPLATES = { FROM LIGHTRAG_VDB_ENTITY where workspace=$1 AND chunk_id IN (SELECT chunk_id FROM relevant_chunks) - ) + ) as chunk_distances WHERE distance>$2 ORDER BY distance DESC LIMIT $3 @@ -1756,7 +1756,7 @@ SQL_TEMPLATES = { FROM LIGHTRAG_DOC_CHUNKS where workspace=$1 AND id IN (SELECT chunk_id FROM relevant_chunks) - ) + ) as chunk_distances WHERE distance>$2 ORDER BY distance DESC LIMIT $3 From 3df20ae78722ea9edc65a1219daf22fd75faa1b6 Mon Sep 17 00:00:00 2001 From: zrguo Date: Mon, 17 Mar 2025 15:59:54 +0800 Subject: [PATCH 4/5] fix lint --- lightrag/kg/postgres_impl.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 2b6f10b2..a2e9be38 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -437,7 +437,7 @@ class PGVectorStorage(BaseVectorStorage): chunk_ids = source_id.split("") else: chunk_ids = [source_id] - + data: dict[str, Any] = { "workspace": self.db.workspace, "id": item["__id__"], @@ -456,7 +456,7 @@ class PGVectorStorage(BaseVectorStorage): chunk_ids = source_id.split("") else: chunk_ids = [source_id] - + data: dict[str, Any] = { "workspace": self.db.workspace, "id": item["__id__"], @@ -962,7 +962,11 @@ class PGGraphStorage(BaseGraphStorage): vertices.get(edge["end_id"], {}), ) else: - d[k] = json.loads(v) if isinstance(v, str) and ("{" in v or "[" in v) else v + d[k] = ( + json.loads(v) + if isinstance(v, str) and ("{" in v or "[" in v) + else v + ) return d @@ -1404,7 +1408,9 @@ class PGGraphStorage(BaseGraphStorage): embed_func = self._node_embed_algorithms[algorithm] return await embed_func() - async def get_knowledge_graph(self, node_label: str, max_depth: int = 5) -> KnowledgeGraph: + async def get_knowledge_graph( + self, node_label: str, max_depth: int = 5 + ) -> KnowledgeGraph: """ Retrieve a subgraph containing the specified node and its neighbors up to the specified depth. @@ -1451,7 +1457,14 @@ class PGGraphStorage(BaseGraphStorage): edge_key = f"{src_id},{tgt_id}" if edge_key not in unique_edge_ids: unique_edge_ids.add(edge_key) - edges.append((edge_key, src_id, tgt_id, {"source": edge_data[0], "target": edge_data[2]})) + edges.append( + ( + edge_key, + src_id, + tgt_id, + {"source": edge_data[0], "target": edge_data[2]}, + ) + ) # Process the query results. if node_label == "*": @@ -1476,12 +1489,19 @@ class PGGraphStorage(BaseGraphStorage): for node_id, node_data in nodes.items() ], edges=[ - KnowledgeGraphEdge(id=edge_id, type="DIRECTED", source=src, target=tgt, properties=props) + KnowledgeGraphEdge( + id=edge_id, + type="DIRECTED", + source=src, + target=tgt, + properties=props, + ) for edge_id, src, tgt, props in edges ], ) return kg + async def drop(self) -> None: """Drop the storage""" drop_sql = SQL_TEMPLATES["drop_vdb_entity"] From 60dd13f17e43624cf0b6d5d2d4c63ed267957765 Mon Sep 17 00:00:00 2001 From: zrguo Date: Mon, 17 Mar 2025 16:58:04 +0800 Subject: [PATCH 5/5] fix continue prompt format error --- lightrag/operate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index 1815f308..d062ae73 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -404,7 +404,7 @@ async def extract_entities( language=language, ) - continue_prompt = PROMPTS["entity_continue_extraction"] + continue_prompt = PROMPTS["entity_continue_extraction"].format(**context_base) if_loop_prompt = PROMPTS["entity_if_loop_extraction"] processed_chunks = 0