Implement custom chunking feature
This commit is contained in:
@@ -458,6 +458,72 @@ class LightRAG:
|
|||||||
# Ensure all indexes are updated after each document
|
# Ensure all indexes are updated after each document
|
||||||
await self._insert_done()
|
await self._insert_done()
|
||||||
|
|
||||||
|
def insert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
||||||
|
loop = always_get_an_event_loop()
|
||||||
|
return loop.run_until_complete(self.ainsert_custom_chunks(full_text, text_chunks))
|
||||||
|
|
||||||
|
async def ainsert_custom_chunks(self, full_text: str, text_chunks: list[str]):
|
||||||
|
|
||||||
|
update_storage = False
|
||||||
|
try:
|
||||||
|
doc_key = compute_mdhash_id(full_text.strip(), prefix="doc-")
|
||||||
|
new_docs = {
|
||||||
|
doc_key: {"content": full_text.strip()}
|
||||||
|
}
|
||||||
|
|
||||||
|
_add_doc_keys = await self.full_docs.filter_keys([doc_key])
|
||||||
|
new_docs = {k: v for k, v in new_docs.items() if k in _add_doc_keys}
|
||||||
|
if not len(new_docs):
|
||||||
|
logger.warning("This document is already in the storage.")
|
||||||
|
return
|
||||||
|
|
||||||
|
update_storage = True
|
||||||
|
logger.info(f"[New Docs] inserting {len(new_docs)} docs")
|
||||||
|
|
||||||
|
inserting_chunks = {}
|
||||||
|
for chunk_text in text_chunks:
|
||||||
|
chunk_text_stripped = chunk_text.strip()
|
||||||
|
chunk_key = compute_mdhash_id(chunk_text_stripped, prefix="chunk-")
|
||||||
|
|
||||||
|
inserting_chunks[chunk_key] = {
|
||||||
|
"content": chunk_text_stripped,
|
||||||
|
"full_doc_id": doc_key,
|
||||||
|
}
|
||||||
|
|
||||||
|
_add_chunk_keys = await self.text_chunks.filter_keys(list(inserting_chunks.keys()))
|
||||||
|
inserting_chunks = {
|
||||||
|
k: v for k, v in inserting_chunks.items() if k in _add_chunk_keys
|
||||||
|
}
|
||||||
|
if not len(inserting_chunks):
|
||||||
|
logger.warning("All chunks are already in the storage.")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"[New Chunks] inserting {len(inserting_chunks)} chunks")
|
||||||
|
|
||||||
|
await self.chunks_vdb.upsert(inserting_chunks)
|
||||||
|
|
||||||
|
logger.info("[Entity Extraction]...")
|
||||||
|
maybe_new_kg = await extract_entities(
|
||||||
|
inserting_chunks,
|
||||||
|
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||||
|
entity_vdb=self.entities_vdb,
|
||||||
|
relationships_vdb=self.relationships_vdb,
|
||||||
|
global_config=asdict(self),
|
||||||
|
)
|
||||||
|
|
||||||
|
if maybe_new_kg is None:
|
||||||
|
logger.warning("No new entities and relationships found")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
self.chunk_entity_relation_graph = maybe_new_kg
|
||||||
|
|
||||||
|
await self.full_docs.upsert(new_docs)
|
||||||
|
await self.text_chunks.upsert(inserting_chunks)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if update_storage:
|
||||||
|
await self._insert_done()
|
||||||
|
|
||||||
async def _insert_done(self):
|
async def _insert_done(self):
|
||||||
tasks = []
|
tasks = []
|
||||||
for storage_inst in [
|
for storage_inst in [
|
||||||
|
Reference in New Issue
Block a user