From 168232803b5b496ed70235832ba44fa0f7a1d29f Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 13:08:57 +0800 Subject: [PATCH 1/7] Optimized the document processing pipeline - Enqueue all files to pipeline before starting process when scanning input folder - Changed parallel enqueue to sequential to prevent CPU overload by heavy file extraction jobs --- lightrag/api/routers/document_routes.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 7b6f11c1..a26216d2 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -405,7 +405,7 @@ async def pipeline_index_file(rag: LightRAG, file_path: Path): async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): - """Index multiple files concurrently + """Index multiple files sequentially to avoid high CPU load Args: rag: LightRAG instance @@ -416,12 +416,12 @@ async def pipeline_index_files(rag: LightRAG, file_paths: List[Path]): try: enqueued = False - if len(file_paths) == 1: - enqueued = await pipeline_enqueue_file(rag, file_paths[0]) - else: - tasks = [pipeline_enqueue_file(rag, path) for path in file_paths] - enqueued = any(await asyncio.gather(*tasks)) + # Process files sequentially + for file_path in file_paths: + if await pipeline_enqueue_file(rag, file_path): + enqueued = True + # Process the queue only if at least one file was successfully enqueued if enqueued: await rag.apipeline_process_enqueue_documents() except Exception as e: @@ -472,11 +472,8 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): total_files = len(new_files) logger.info(f"Found {total_files} new files to index.") - for idx, file_path in enumerate(new_files): - try: - await pipeline_index_file(rag, file_path) - except Exception as e: - logger.error(f"Error indexing file {file_path}: {str(e)}") + if new_files: + await pipeline_index_files(rag, new_files) except Exception as e: logger.error(f"Error during scanning process: {str(e)}") From 67eee2d2d5c2c4a32cdc7f344f1d9fb9bf6fe851 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 13:27:12 +0800 Subject: [PATCH 2/7] Prioritize OS environment variables over .env file to improve Docker compatibility for the server --- lightrag/api/auth.py | 2 ++ lightrag/api/lightrag_server.py | 2 +- lightrag/api/run_with_gunicorn.py | 2 +- lightrag/api/utils_api.py | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lightrag/api/auth.py b/lightrag/api/auth.py index 78a1da1a..1e92d1b9 100644 --- a/lightrag/api/auth.py +++ b/lightrag/api/auth.py @@ -3,7 +3,9 @@ from datetime import datetime, timedelta import jwt from fastapi import HTTPException, status from pydantic import BaseModel +from dotenv import load_dotenv +load_dotenv() class TokenPayload(BaseModel): sub: str # Username diff --git a/lightrag/api/lightrag_server.py b/lightrag/api/lightrag_server.py index 4d4896fc..6c8d11f1 100644 --- a/lightrag/api/lightrag_server.py +++ b/lightrag/api/lightrag_server.py @@ -49,7 +49,7 @@ from .auth import auth_handler # Load environment variables # Updated to use the .env that is inside the current folder # This update allows the user to put a different.env file for each lightrag folder -load_dotenv(".env", override=True) +load_dotenv() # Initialize config parser config = configparser.ConfigParser() diff --git a/lightrag/api/run_with_gunicorn.py b/lightrag/api/run_with_gunicorn.py index cf9b3b91..00821acb 100644 --- a/lightrag/api/run_with_gunicorn.py +++ b/lightrag/api/run_with_gunicorn.py @@ -13,7 +13,7 @@ from dotenv import load_dotenv # Updated to use the .env that is inside the current folder # This update allows the user to put a different.env file for each lightrag folder -load_dotenv(".env") +load_dotenv() def check_and_install_dependencies(): diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 9a8aaf57..53ea3ae8 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -16,7 +16,7 @@ from starlette.status import HTTP_403_FORBIDDEN from .auth import auth_handler # Load environment variables -load_dotenv(override=True) +load_dotenv() global_args = {"main_args": None} From 0761af19c68210ef2f7fcac909adc255fea26d84 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 13:41:37 +0800 Subject: [PATCH 3/7] Files are now processed in batches in auto scan --- lightrag/api/routers/document_routes.py | 23 +++++++++++++++++++++-- lightrag/api/utils_api.py | 3 +++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index a26216d2..248505ba 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -472,11 +472,30 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): total_files = len(new_files) logger.info(f"Found {total_files} new files to index.") - if new_files: - await pipeline_index_files(rag, new_files) + if not new_files: + return + + # Get MAX_PARALLEL_INSERT from global_args + max_parallel = global_args["max_parallel_insert"] + # Calculate batch size as 2 * MAX_PARALLEL_INSERT + batch_size = 2 * max_parallel + + # Process files in batches + for i in range(0, total_files, batch_size): + batch_files = new_files[i:i+batch_size] + batch_num = i // batch_size + 1 + total_batches = (total_files + batch_size - 1) // batch_size + + logger.info(f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files") + await pipeline_index_files(rag, batch_files) + + # Log progress + processed = min(i + batch_size, total_files) + logger.info(f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)") except Exception as e: logger.error(f"Error during scanning process: {str(e)}") + logger.error(traceback.format_exc()) def create_document_routes( diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 53ea3ae8..53da3e34 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -365,6 +365,9 @@ def parse_args(is_uvicorn_mode: bool = False) -> argparse.Namespace: "LIGHTRAG_VECTOR_STORAGE", DefaultRAGStorageConfig.VECTOR_STORAGE ) + # Get MAX_PARALLEL_INSERT from environment + global_args["max_parallel_insert"] = get_env_value("MAX_PARALLEL_INSERT", 2, int) + # Handle openai-ollama special case if args.llm_binding == "openai-ollama": args.llm_binding = "openai" From 1f6d230da1322fb9b55798a12b273cc00601364c Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 15:26:23 +0800 Subject: [PATCH 4/7] Increased default timeout values for Gunicorn configuration - Doubled default timeout to 300 seconds --- lightrag/api/gunicorn_config.py | 2 +- lightrag/api/run_with_gunicorn.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lightrag/api/gunicorn_config.py b/lightrag/api/gunicorn_config.py index 23e46807..cb6c3d83 100644 --- a/lightrag/api/gunicorn_config.py +++ b/lightrag/api/gunicorn_config.py @@ -29,7 +29,7 @@ preload_app = True worker_class = "uvicorn.workers.UvicornWorker" # Other Gunicorn configurations -timeout = int(os.getenv("TIMEOUT", 150)) # Default 150s to match run_with_gunicorn.py +timeout = int(os.getenv("TIMEOUT", 150 * 2)) # Default 150s *2 to match run_with_gunicorn.py keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s # Logging configuration diff --git a/lightrag/api/run_with_gunicorn.py b/lightrag/api/run_with_gunicorn.py index 00821acb..126d772d 100644 --- a/lightrag/api/run_with_gunicorn.py +++ b/lightrag/api/run_with_gunicorn.py @@ -140,7 +140,7 @@ def main(): # Timeout configuration prioritizes command line arguments gunicorn_config.timeout = ( - args.timeout if args.timeout else int(os.getenv("TIMEOUT", 150)) + args.timeout if args.timeout * 2 else int(os.getenv("TIMEOUT", 150 * 2)) ) # Keepalive configuration From 20d65ae55491eff365f73b92ab9d79807101823f Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 16:08:23 +0800 Subject: [PATCH 5/7] feat(shared_storage): prevent event loop blocking in multiprocess mode Add auxiliary async locks in multiprocess mode to prevent event loop blocking --- lightrag/kg/shared_storage.py | 87 ++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 2 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 736887a6..d5006d62 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -41,6 +41,9 @@ _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None +# async locks for coroutine synchronization in multiprocess mode +_async_locks: Optional[Dict[str, asyncio.Lock]] = None + class UnifiedLock(Generic[T]): """Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock""" @@ -51,12 +54,14 @@ class UnifiedLock(Generic[T]): is_async: bool, name: str = "unnamed", enable_logging: bool = True, + async_lock: Optional[asyncio.Lock] = None, ): self._lock = lock self._is_async = is_async self._pid = os.getpid() # for debug only self._name = name # for debug only self._enable_logging = enable_logging # for debug only + self._async_lock = async_lock # auxiliary lock for coroutine synchronization async def __aenter__(self) -> "UnifiedLock[T]": try: @@ -64,16 +69,35 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) + + # If in multiprocess mode and async lock exists, acquire it first + if not self._is_async and self._async_lock is not None: + direct_log( + f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'", + enable_output=self._enable_logging, + ) + await self._async_lock.acquire() + direct_log( + f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", + enable_output=self._enable_logging, + ) + + # Then acquire the main lock if self._is_async: await self._lock.acquire() else: self._lock.acquire() + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging, ) return self except Exception as e: + # If main lock acquisition fails, release the async lock if it was acquired + if not self._is_async and self._async_lock is not None and self._async_lock.locked(): + self._async_lock.release() + direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", @@ -82,15 +106,29 @@ class UnifiedLock(Generic[T]): raise async def __aexit__(self, exc_type, exc_val, exc_tb): + main_lock_released = False try: direct_log( f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) + + # Release main lock first if self._is_async: self._lock.release() else: self._lock.release() + + main_lock_released = True + + # Then release async lock if in multiprocess mode + if not self._is_async and self._async_lock is not None: + direct_log( + f"== Lock == Process {self._pid}: Releasing async lock for '{self._name}'", + enable_output=self._enable_logging, + ) + self._async_lock.release() + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging, @@ -101,6 +139,27 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) + + # If main lock release failed but async lock hasn't been released, try to release it + if not main_lock_released and not self._is_async and self._async_lock is not None: + try: + direct_log( + f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", + level="WARNING", + enable_output=self._enable_logging, + ) + self._async_lock.release() + direct_log( + f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure", + enable_output=self._enable_logging, + ) + except Exception as inner_e: + direct_log( + f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}", + level="ERROR", + enable_output=self._enable_logging, + ) + raise def __enter__(self) -> "UnifiedLock[T]": @@ -151,51 +210,61 @@ class UnifiedLock(Generic[T]): def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("internal_lock") if is_multiprocess else None return UnifiedLock( lock=_internal_lock, is_async=not is_multiprocess, name="internal_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("storage_lock") if is_multiprocess else None return UnifiedLock( lock=_storage_lock, is_async=not is_multiprocess, name="storage_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: """return unified storage lock for data consistency""" + async_lock = _async_locks.get("pipeline_status_lock") if is_multiprocess else None return UnifiedLock( lock=_pipeline_status_lock, is_async=not is_multiprocess, name="pipeline_status_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: """return unified graph database lock for ensuring atomic operations""" + async_lock = _async_locks.get("graph_db_lock") if is_multiprocess else None return UnifiedLock( lock=_graph_db_lock, is_async=not is_multiprocess, name="graph_db_lock", enable_logging=enable_logging, + async_lock=async_lock, ) def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock: """return unified data initialization lock for ensuring atomic data initialization""" + async_lock = _async_locks.get("data_init_lock") if is_multiprocess else None return UnifiedLock( lock=_data_init_lock, is_async=not is_multiprocess, name="data_init_lock", enable_logging=enable_logging, + async_lock=async_lock, ) @@ -229,7 +298,8 @@ def initialize_share_data(workers: int = 1): _shared_dicts, \ _init_flags, \ _initialized, \ - _update_flags + _update_flags, \ + _async_locks # Check if already initialized if _initialized: @@ -251,6 +321,16 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() + + # Initialize async locks for multiprocess mode + _async_locks = { + "internal_lock": asyncio.Lock(), + "storage_lock": asyncio.Lock(), + "pipeline_status_lock": asyncio.Lock(), + "graph_db_lock": asyncio.Lock(), + "data_init_lock": asyncio.Lock(), + } + direct_log( f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" ) @@ -264,6 +344,7 @@ def initialize_share_data(workers: int = 1): _shared_dicts = {} _init_flags = {} _update_flags = {} + _async_locks = None # No need for async locks in single process mode direct_log(f"Process {os.getpid()} Shared-Data created for Single Process") # Mark as initialized @@ -458,7 +539,8 @@ def finalize_share_data(): _shared_dicts, \ _init_flags, \ _initialized, \ - _update_flags + _update_flags, \ + _async_locks # Check if already initialized if not _initialized: @@ -523,5 +605,6 @@ def finalize_share_data(): _graph_db_lock = None _data_init_lock = None _update_flags = None + _async_locks = None direct_log(f"Process {os.getpid()} storage data finalization complete") From 53396e4d822c766119b6c3e826274f66a971eed6 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 16:56:47 +0800 Subject: [PATCH 6/7] Fixlinting --- lightrag/api/auth.py | 1 + lightrag/api/gunicorn_config.py | 4 ++- lightrag/api/routers/document_routes.py | 16 ++++++----- lightrag/kg/shared_storage.py | 36 +++++++++++++++---------- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/lightrag/api/auth.py b/lightrag/api/auth.py index 1e92d1b9..5d9b00ac 100644 --- a/lightrag/api/auth.py +++ b/lightrag/api/auth.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv load_dotenv() + class TokenPayload(BaseModel): sub: str # Username exp: datetime # Expiration time diff --git a/lightrag/api/gunicorn_config.py b/lightrag/api/gunicorn_config.py index cb6c3d83..0aef108e 100644 --- a/lightrag/api/gunicorn_config.py +++ b/lightrag/api/gunicorn_config.py @@ -29,7 +29,9 @@ preload_app = True worker_class = "uvicorn.workers.UvicornWorker" # Other Gunicorn configurations -timeout = int(os.getenv("TIMEOUT", 150 * 2)) # Default 150s *2 to match run_with_gunicorn.py +timeout = int( + os.getenv("TIMEOUT", 150 * 2) +) # Default 150s *2 to match run_with_gunicorn.py keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s # Logging configuration diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 248505ba..e0c8f545 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -479,19 +479,23 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager): max_parallel = global_args["max_parallel_insert"] # Calculate batch size as 2 * MAX_PARALLEL_INSERT batch_size = 2 * max_parallel - + # Process files in batches for i in range(0, total_files, batch_size): - batch_files = new_files[i:i+batch_size] + batch_files = new_files[i : i + batch_size] batch_num = i // batch_size + 1 total_batches = (total_files + batch_size - 1) // batch_size - - logger.info(f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files") + + logger.info( + f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files" + ) await pipeline_index_files(rag, batch_files) - + # Log progress processed = min(i + batch_size, total_files) - logger.info(f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)") + logger.info( + f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)" + ) except Exception as e: logger.error(f"Error during scanning process: {str(e)}") diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index d5006d62..e26645c8 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -69,7 +69,7 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) - + # If in multiprocess mode and async lock exists, acquire it first if not self._is_async and self._async_lock is not None: direct_log( @@ -81,13 +81,13 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", enable_output=self._enable_logging, ) - + # Then acquire the main lock if self._is_async: await self._lock.acquire() else: self._lock.acquire() - + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", enable_output=self._enable_logging, @@ -95,9 +95,13 @@ class UnifiedLock(Generic[T]): return self except Exception as e: # If main lock acquisition fails, release the async lock if it was acquired - if not self._is_async and self._async_lock is not None and self._async_lock.locked(): + if ( + not self._is_async + and self._async_lock is not None + and self._async_lock.locked() + ): self._async_lock.release() - + direct_log( f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", level="ERROR", @@ -112,15 +116,15 @@ class UnifiedLock(Generic[T]): f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", enable_output=self._enable_logging, ) - + # Release main lock first if self._is_async: self._lock.release() else: self._lock.release() - + main_lock_released = True - + # Then release async lock if in multiprocess mode if not self._is_async and self._async_lock is not None: direct_log( @@ -128,7 +132,7 @@ class UnifiedLock(Generic[T]): enable_output=self._enable_logging, ) self._async_lock.release() - + direct_log( f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", enable_output=self._enable_logging, @@ -139,9 +143,13 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) - + # If main lock release failed but async lock hasn't been released, try to release it - if not main_lock_released and not self._is_async and self._async_lock is not None: + if ( + not main_lock_released + and not self._is_async + and self._async_lock is not None + ): try: direct_log( f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", @@ -159,7 +167,7 @@ class UnifiedLock(Generic[T]): level="ERROR", enable_output=self._enable_logging, ) - + raise def __enter__(self) -> "UnifiedLock[T]": @@ -321,7 +329,7 @@ def initialize_share_data(workers: int = 1): _shared_dicts = _manager.dict() _init_flags = _manager.dict() _update_flags = _manager.dict() - + # Initialize async locks for multiprocess mode _async_locks = { "internal_lock": asyncio.Lock(), @@ -330,7 +338,7 @@ def initialize_share_data(workers: int = 1): "graph_db_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(), } - + direct_log( f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" ) From ce05b436b988bc3be7b1a096cae8596fbc2cb30a Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 21 Mar 2025 17:11:55 +0800 Subject: [PATCH 7/7] Refactor splash screen display configuration layout. --- lightrag/api/utils_api.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lightrag/api/utils_api.py b/lightrag/api/utils_api.py index 53da3e34..25136bd2 100644 --- a/lightrag/api/utils_api.py +++ b/lightrag/api/utils_api.py @@ -444,8 +444,8 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.log_level}") ASCIIColors.white(" ├─ Verbose Debug: ", end="") ASCIIColors.yellow(f"{args.verbose}") - ASCIIColors.white(" ├─ Timeout: ", end="") - ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}") + ASCIIColors.white(" ├─ History Turns: ", end="") + ASCIIColors.yellow(f"{args.history_turns}") ASCIIColors.white(" └─ API Key: ", end="") ASCIIColors.yellow("Set" if args.key else "Not Set") @@ -462,8 +462,10 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.llm_binding}") ASCIIColors.white(" ├─ Host: ", end="") ASCIIColors.yellow(f"{args.llm_binding_host}") - ASCIIColors.white(" └─ Model: ", end="") + ASCIIColors.white(" ├─ Model: ", end="") ASCIIColors.yellow(f"{args.llm_model}") + ASCIIColors.white(" └─ Timeout: ", end="") + ASCIIColors.yellow(f"{args.timeout if args.timeout else 'None (infinite)'}") # Embedding Configuration ASCIIColors.magenta("\n📊 Embedding Configuration:") @@ -478,8 +480,10 @@ def display_splash_screen(args: argparse.Namespace) -> None: # RAG Configuration ASCIIColors.magenta("\n⚙️ RAG Configuration:") - ASCIIColors.white(" ├─ Max Async Operations: ", end="") + ASCIIColors.white(" ├─ Max Async for LLM: ", end="") ASCIIColors.yellow(f"{args.max_async}") + ASCIIColors.white(" ├─ Max Parallel Insert: ", end="") + ASCIIColors.yellow(f"{global_args['max_parallel_insert']}") ASCIIColors.white(" ├─ Max Tokens: ", end="") ASCIIColors.yellow(f"{args.max_tokens}") ASCIIColors.white(" ├─ Max Embed Tokens: ", end="") @@ -488,8 +492,6 @@ def display_splash_screen(args: argparse.Namespace) -> None: ASCIIColors.yellow(f"{args.chunk_size}") ASCIIColors.white(" ├─ Chunk Overlap Size: ", end="") ASCIIColors.yellow(f"{args.chunk_overlap_size}") - ASCIIColors.white(" ├─ History Turns: ", end="") - ASCIIColors.yellow(f"{args.history_turns}") ASCIIColors.white(" ├─ Cosine Threshold: ", end="") ASCIIColors.yellow(f"{args.cosine_threshold}") ASCIIColors.white(" ├─ Top-K: ", end="")