diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 3816b360..13f7d5d1 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -438,16 +438,26 @@ class PGVectorStorage(BaseVectorStorage): @dataclass class PGDocStatusStorage(DocStatusStorage): async def filter_keys(self, keys: set[str]) -> set[str]: - """Return keys that don't exist in storage""" - keys = ",".join([f"'{_id}'" for _id in keys]) - sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace='{self.db.workspace}' AND id IN ({keys})" - result = await self.db.query(sql, multirows=True) - # The result is like [{'id': 'id1'}, {'id': 'id2'}, ...]. - if result is None: - return set(keys) - else: - existed = set([element["id"] for element in result]) - return set(keys) - existed + """Filter out duplicated content""" + sql = SQL_TEMPLATES["filter_keys"].format( + table_name=namespace_to_table_name(self.namespace), + ids=",".join([f"'{id}'" for id in keys]), + ) + params = {"workspace": self.db.workspace} + try: + res = await self.db.query(sql, params, multirows=True) + if res: + exist_keys = [key["id"] for key in res] + else: + exist_keys = [] + new_keys = set([s for s in keys if s not in exist_keys]) + print(f"keys: {keys}") + print(f"new_keys: {new_keys}") + return new_keys + except Exception as e: + logger.error(f"PostgreSQL database error: {e}") + print(sql) + print(params) async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"