Convert parallel queries to serial execution

This commit is contained in:
yangdx
2025-04-16 17:55:49 +08:00
parent 2c7e8a5526
commit 0afe35a9fd
2 changed files with 27 additions and 23 deletions

View File

@@ -142,6 +142,9 @@ class PostgreSQLDB:
with_age: bool = False, with_age: bool = False,
graph_name: str | None = None, graph_name: str | None = None,
) -> dict[str, Any] | None | list[dict[str, Any]]: ) -> dict[str, Any] | None | list[dict[str, Any]]:
# start_time = time.time()
# logger.info(f"PostgreSQL, Querying:\n{sql}")
async with self.pool.acquire() as connection: # type: ignore async with self.pool.acquire() as connection: # type: ignore
if with_age and graph_name: if with_age and graph_name:
await self.configure_age(connection, graph_name) # type: ignore await self.configure_age(connection, graph_name) # type: ignore
@@ -166,6 +169,11 @@ class PostgreSQLDB:
data = dict(zip(columns, rows[0])) data = dict(zip(columns, rows[0]))
else: else:
data = None data = None
# query_time = time.time() - start_time
# logger.info(f"PostgreSQL, Query result len: {len(data)}")
# logger.info(f"PostgreSQL, Query execution time: {query_time:.4f}s")
return data return data
except Exception as e: except Exception as e:
logger.error(f"PostgreSQL database, error:{e}") logger.error(f"PostgreSQL database, error:{e}")

View File

@@ -1255,21 +1255,19 @@ async def _build_query_context(
query_param, query_param,
) )
else: # hybrid mode else: # hybrid mode
ll_data, hl_data = await asyncio.gather( ll_data = await _get_node_data(
_get_node_data( ll_keywords,
ll_keywords, knowledge_graph_inst,
knowledge_graph_inst, entities_vdb,
entities_vdb, text_chunks_db,
text_chunks_db, query_param,
query_param, )
), hl_data = await _get_edge_data(
_get_edge_data( hl_keywords,
hl_keywords, knowledge_graph_inst,
knowledge_graph_inst, relationships_vdb,
relationships_vdb, text_chunks_db,
text_chunks_db, query_param,
query_param,
),
) )
( (
@@ -1351,13 +1349,11 @@ async def _get_node_data(
if n is not None if n is not None
] # what is this text_chunks_db doing. dont remember it in airvx. check the diagram. ] # what is this text_chunks_db doing. dont remember it in airvx. check the diagram.
# get entitytext chunk # get entitytext chunk
use_text_units, use_relations = await asyncio.gather( use_text_units = await _find_most_related_text_unit_from_entities(
_find_most_related_text_unit_from_entities( node_datas, query_param, text_chunks_db, knowledge_graph_inst
node_datas, query_param, text_chunks_db, knowledge_graph_inst )
), use_relations = await _find_most_related_edges_from_entities(
_find_most_related_edges_from_entities( node_datas, query_param, knowledge_graph_inst
node_datas, query_param, knowledge_graph_inst
),
) )
len_node_datas = len(node_datas) len_node_datas = len(node_datas)
@@ -1502,7 +1498,7 @@ async def _find_most_related_text_unit_from_entities(
all_text_units_lookup[c_id] = index all_text_units_lookup[c_id] = index
tasks.append((c_id, index, this_edges)) tasks.append((c_id, index, this_edges))
# Process in batches of 25 tasks at a time to avoid overwhelming resources # Process in batches tasks at a time to avoid overwhelming resources
batch_size = 5 batch_size = 5
results = [] results = []