Merge pull request #837 from ParisNeo/main
Fixes and Enhancements for PostgreSQL and JSON Document Storage
This commit is contained in:
@@ -130,7 +130,7 @@ Replace placeholders like `your_role_name`, `your_password`, and `your_database`
|
|||||||
Start the LightRAG server using specified options:
|
Start the LightRAG server using specified options:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
lightrag-server --port 9626 --key sk-SL1 --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
|
lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
|
||||||
```
|
```
|
||||||
|
|
||||||
Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
|
Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
|
||||||
|
@@ -68,3 +68,7 @@ class JsonDocStatusStorage(DocStatusStorage):
|
|||||||
for doc_id in doc_ids:
|
for doc_id in doc_ids:
|
||||||
self._data.pop(doc_id, None)
|
self._data.pop(doc_id, None)
|
||||||
await self.index_done_callback()
|
await self.index_done_callback()
|
||||||
|
|
||||||
|
async def drop(self) -> None:
|
||||||
|
"""Drop the storage"""
|
||||||
|
self._data.clear()
|
||||||
|
@@ -263,8 +263,8 @@ class PGKVStorage(BaseKVStorage):
|
|||||||
exist_keys = [key["id"] for key in res]
|
exist_keys = [key["id"] for key in res]
|
||||||
else:
|
else:
|
||||||
exist_keys = []
|
exist_keys = []
|
||||||
data = set([s for s in keys if s not in exist_keys])
|
new_keys = set([s for s in keys if s not in exist_keys])
|
||||||
return data
|
return new_keys
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"PostgreSQL database error: {e}")
|
logger.error(f"PostgreSQL database error: {e}")
|
||||||
print(sql)
|
print(sql)
|
||||||
@@ -301,6 +301,11 @@ class PGKVStorage(BaseKVStorage):
|
|||||||
# PG handles persistence automatically
|
# PG handles persistence automatically
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def drop(self) -> None:
|
||||||
|
"""Drop the storage"""
|
||||||
|
drop_sql = SQL_TEMPLATES["drop_all"]
|
||||||
|
await self.db.execute(drop_sql)
|
||||||
|
|
||||||
|
|
||||||
@final
|
@final
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -432,16 +437,26 @@ class PGVectorStorage(BaseVectorStorage):
|
|||||||
@dataclass
|
@dataclass
|
||||||
class PGDocStatusStorage(DocStatusStorage):
|
class PGDocStatusStorage(DocStatusStorage):
|
||||||
async def filter_keys(self, keys: set[str]) -> set[str]:
|
async def filter_keys(self, keys: set[str]) -> set[str]:
|
||||||
"""Return keys that don't exist in storage"""
|
"""Filter out duplicated content"""
|
||||||
keys = ",".join([f"'{_id}'" for _id in keys])
|
sql = SQL_TEMPLATES["filter_keys"].format(
|
||||||
sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace='{self.db.workspace}' AND id IN ({keys})"
|
table_name=namespace_to_table_name(self.namespace),
|
||||||
result = await self.db.query(sql, multirows=True)
|
ids=",".join([f"'{id}'" for id in keys]),
|
||||||
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
|
)
|
||||||
if result is None:
|
params = {"workspace": self.db.workspace}
|
||||||
return set(keys)
|
try:
|
||||||
|
res = await self.db.query(sql, params, multirows=True)
|
||||||
|
if res:
|
||||||
|
exist_keys = [key["id"] for key in res]
|
||||||
else:
|
else:
|
||||||
existed = set([element["id"] for element in result])
|
exist_keys = []
|
||||||
return set(keys) - existed
|
new_keys = set([s for s in keys if s not in exist_keys])
|
||||||
|
print(f"keys: {keys}")
|
||||||
|
print(f"new_keys: {new_keys}")
|
||||||
|
return new_keys
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PostgreSQL database error: {e}")
|
||||||
|
print(sql)
|
||||||
|
print(params)
|
||||||
|
|
||||||
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
|
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
|
||||||
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
|
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
|
||||||
@@ -483,7 +498,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
|
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
|
||||||
params = {"workspace": self.db.workspace, "status": status.value}
|
params = {"workspace": self.db.workspace, "status": status.value}
|
||||||
result = await self.db.query(sql, params, True)
|
result = await self.db.query(sql, params, True)
|
||||||
return {
|
docs_by_status = {
|
||||||
element["id"]: DocProcessingStatus(
|
element["id"]: DocProcessingStatus(
|
||||||
content=result[0]["content"],
|
content=result[0]["content"],
|
||||||
content_summary=element["content_summary"],
|
content_summary=element["content_summary"],
|
||||||
@@ -495,6 +510,7 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
)
|
)
|
||||||
for element in result
|
for element in result
|
||||||
}
|
}
|
||||||
|
return docs_by_status
|
||||||
|
|
||||||
async def index_done_callback(self) -> None:
|
async def index_done_callback(self) -> None:
|
||||||
# PG handles persistence automatically
|
# PG handles persistence automatically
|
||||||
@@ -531,6 +547,11 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
)
|
)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
async def drop(self) -> None:
|
||||||
|
"""Drop the storage"""
|
||||||
|
drop_sql = SQL_TEMPLATES["drop_doc_full"]
|
||||||
|
await self.db.execute(drop_sql)
|
||||||
|
|
||||||
|
|
||||||
class PGGraphQueryException(Exception):
|
class PGGraphQueryException(Exception):
|
||||||
"""Exception for the AGE queries."""
|
"""Exception for the AGE queries."""
|
||||||
@@ -1012,6 +1033,13 @@ class PGGraphStorage(BaseGraphStorage):
|
|||||||
) -> KnowledgeGraph:
|
) -> KnowledgeGraph:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def drop(self) -> None:
|
||||||
|
"""Drop the storage"""
|
||||||
|
drop_sql = SQL_TEMPLATES["drop_vdb_entity"]
|
||||||
|
await self.db.execute(drop_sql)
|
||||||
|
drop_sql = SQL_TEMPLATES["drop_vdb_relation"]
|
||||||
|
await self.db.execute(drop_sql)
|
||||||
|
|
||||||
|
|
||||||
NAMESPACE_TABLE_MAP = {
|
NAMESPACE_TABLE_MAP = {
|
||||||
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
|
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
|
||||||
@@ -1194,4 +1222,27 @@ SQL_TEMPLATES = {
|
|||||||
FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
|
FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
|
||||||
WHERE distance>$2 ORDER BY distance DESC LIMIT $3
|
WHERE distance>$2 ORDER BY distance DESC LIMIT $3
|
||||||
""",
|
""",
|
||||||
|
# DROP tables
|
||||||
|
"drop_all": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
|
||||||
|
""",
|
||||||
|
"drop_doc_full": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
|
||||||
|
""",
|
||||||
|
"drop_doc_chunks": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
|
||||||
|
""",
|
||||||
|
"drop_llm_cache": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
|
||||||
|
""",
|
||||||
|
"drop_vdb_entity": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
|
||||||
|
""",
|
||||||
|
"drop_vdb_relation": """
|
||||||
|
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
|
||||||
|
""",
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user