Refactor direct client/graph access to reduce redundant get calls in vector/graph ops
This commit is contained in:
@@ -97,8 +97,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
for i, d in enumerate(list_data):
|
for i, d in enumerate(list_data):
|
||||||
d["__vector__"] = embeddings[i]
|
d["__vector__"] = embeddings[i]
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
results = self._get_client().upsert(datas=list_data)
|
||||||
results = client.upsert(datas=list_data)
|
|
||||||
return results
|
return results
|
||||||
else:
|
else:
|
||||||
# sometimes the embedding is not returned correctly. just log it.
|
# sometimes the embedding is not returned correctly. just log it.
|
||||||
@@ -112,8 +111,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
embedding = embedding[0]
|
embedding = embedding[0]
|
||||||
|
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
results = self._get_client().query(
|
||||||
results = client.query(
|
|
||||||
query=embedding,
|
query=embedding,
|
||||||
top_k=top_k,
|
top_k=top_k,
|
||||||
better_than_threshold=self.cosine_better_than_threshold,
|
better_than_threshold=self.cosine_better_than_threshold,
|
||||||
@@ -131,8 +129,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def client_storage(self):
|
def client_storage(self):
|
||||||
client = self._get_client()
|
return getattr(self._get_client(), "_NanoVectorDB__storage")
|
||||||
return getattr(client, "_NanoVectorDB__storage")
|
|
||||||
|
|
||||||
async def delete(self, ids: list[str]):
|
async def delete(self, ids: list[str]):
|
||||||
"""Delete vectors with specified IDs
|
"""Delete vectors with specified IDs
|
||||||
@@ -142,8 +139,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
self._get_client().delete(ids)
|
||||||
client.delete(ids)
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Successfully deleted {len(ids)} vectors from {self.namespace}"
|
f"Successfully deleted {len(ids)} vectors from {self.namespace}"
|
||||||
)
|
)
|
||||||
@@ -158,10 +154,9 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
)
|
)
|
||||||
|
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
|
||||||
# Check if the entity exists
|
# Check if the entity exists
|
||||||
if client.get([entity_id]):
|
if self._get_client().get([entity_id]):
|
||||||
client.delete([entity_id])
|
self._get_client().delete([entity_id])
|
||||||
logger.debug(f"Successfully deleted entity {entity_name}")
|
logger.debug(f"Successfully deleted entity {entity_name}")
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Entity {entity_name} not found in storage")
|
logger.debug(f"Entity {entity_name} not found in storage")
|
||||||
@@ -171,8 +166,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
async def delete_entity_relation(self, entity_name: str) -> None:
|
async def delete_entity_relation(self, entity_name: str) -> None:
|
||||||
try:
|
try:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
storage = getattr(self._get_client(), "_NanoVectorDB__storage")
|
||||||
storage = getattr(client, "_NanoVectorDB__storage")
|
|
||||||
relations = [
|
relations = [
|
||||||
dp
|
dp
|
||||||
for dp in storage["data"]
|
for dp in storage["data"]
|
||||||
@@ -184,7 +178,7 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
ids_to_delete = [relation["__id__"] for relation in relations]
|
ids_to_delete = [relation["__id__"] for relation in relations]
|
||||||
|
|
||||||
if ids_to_delete:
|
if ids_to_delete:
|
||||||
client.delete(ids_to_delete)
|
self._get_client().delete(ids_to_delete)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Deleted {len(ids_to_delete)} relations for {entity_name}"
|
f"Deleted {len(ids_to_delete)} relations for {entity_name}"
|
||||||
)
|
)
|
||||||
@@ -195,5 +189,4 @@ class NanoVectorDBStorage(BaseVectorStorage):
|
|||||||
|
|
||||||
async def index_done_callback(self) -> None:
|
async def index_done_callback(self) -> None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
client = self._get_client()
|
self._get_client().save()
|
||||||
client.save()
|
|
||||||
|
@@ -115,65 +115,54 @@ class NetworkXStorage(BaseGraphStorage):
|
|||||||
|
|
||||||
async def index_done_callback(self) -> None:
|
async def index_done_callback(self) -> None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
NetworkXStorage.write_nx_graph(self._get_graph(), self._graphml_xml_file)
|
||||||
NetworkXStorage.write_nx_graph(graph, self._graphml_xml_file)
|
|
||||||
|
|
||||||
async def has_node(self, node_id: str) -> bool:
|
async def has_node(self, node_id: str) -> bool:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().has_node(node_id)
|
||||||
return graph.has_node(node_id)
|
|
||||||
|
|
||||||
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
|
async def has_edge(self, source_node_id: str, target_node_id: str) -> bool:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().has_edge(source_node_id, target_node_id)
|
||||||
return graph.has_edge(source_node_id, target_node_id)
|
|
||||||
|
|
||||||
async def get_node(self, node_id: str) -> dict[str, str] | None:
|
async def get_node(self, node_id: str) -> dict[str, str] | None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().nodes.get(node_id)
|
||||||
return graph.nodes.get(node_id)
|
|
||||||
|
|
||||||
async def node_degree(self, node_id: str) -> int:
|
async def node_degree(self, node_id: str) -> int:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().degree(node_id)
|
||||||
return graph.degree(node_id)
|
|
||||||
|
|
||||||
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
|
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().degree(src_id) + self._get_graph().degree(tgt_id)
|
||||||
return graph.degree(src_id) + graph.degree(tgt_id)
|
|
||||||
|
|
||||||
async def get_edge(
|
async def get_edge(
|
||||||
self, source_node_id: str, target_node_id: str
|
self, source_node_id: str, target_node_id: str
|
||||||
) -> dict[str, str] | None:
|
) -> dict[str, str] | None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
return self._get_graph().edges.get((source_node_id, target_node_id))
|
||||||
return graph.edges.get((source_node_id, target_node_id))
|
|
||||||
|
|
||||||
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
|
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]] | None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
if self._get_graph().has_node(source_node_id):
|
||||||
if graph.has_node(source_node_id):
|
return list(self._get_graph().edges(source_node_id))
|
||||||
return list(graph.edges(source_node_id))
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
|
async def upsert_node(self, node_id: str, node_data: dict[str, str]) -> None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
self._get_graph().add_node(node_id, **node_data)
|
||||||
graph.add_node(node_id, **node_data)
|
|
||||||
|
|
||||||
async def upsert_edge(
|
async def upsert_edge(
|
||||||
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
|
self, source_node_id: str, target_node_id: str, edge_data: dict[str, str]
|
||||||
) -> None:
|
) -> None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
self._get_graph().add_edge(source_node_id, target_node_id, **edge_data)
|
||||||
graph.add_edge(source_node_id, target_node_id, **edge_data)
|
|
||||||
|
|
||||||
async def delete_node(self, node_id: str) -> None:
|
async def delete_node(self, node_id: str) -> None:
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
if self._get_graph().has_node(node_id):
|
||||||
if graph.has_node(node_id):
|
self._get_graph().remove_node(node_id)
|
||||||
graph.remove_node(node_id)
|
|
||||||
logger.debug(f"Node {node_id} deleted from the graph.")
|
logger.debug(f"Node {node_id} deleted from the graph.")
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Node {node_id} not found in the graph for deletion.")
|
logger.warning(f"Node {node_id} not found in the graph for deletion.")
|
||||||
@@ -227,9 +216,8 @@ class NetworkXStorage(BaseGraphStorage):
|
|||||||
[label1, label2, ...] # Alphabetically sorted label list
|
[label1, label2, ...] # Alphabetically sorted label list
|
||||||
"""
|
"""
|
||||||
with self._storage_lock:
|
with self._storage_lock:
|
||||||
graph = self._get_graph()
|
|
||||||
labels = set()
|
labels = set()
|
||||||
for node in graph.nodes():
|
for node in self._get_graph().nodes():
|
||||||
labels.add(str(node)) # Add node id as a label
|
labels.add(str(node)) # Add node id as a label
|
||||||
|
|
||||||
# Return sorted list
|
# Return sorted list
|
||||||
|
Reference in New Issue
Block a user