From 1fc26127d532d6f44eb3262a62bd496894029444 Mon Sep 17 00:00:00 2001 From: yangdx Date: Mon, 28 Apr 2025 23:21:34 +0800 Subject: [PATCH] Fix linting --- lightrag/utils.py | 72 +++++++++++++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/lightrag/utils.py b/lightrag/utils.py index 77314053..da006647 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -271,18 +271,21 @@ def compute_mdhash_id(content: str, prefix: str = "") -> str: # Custom exception class class QueueFullError(Exception): """Raised when the queue is full and the wait times out""" + pass + def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): """ Enhanced priority-limited asynchronous function call decorator - + Args: max_size: Maximum number of concurrent calls max_queue_size: Maximum queue capacity to prevent memory overflow Returns: Decorator function """ + def final_decro(func): queue = asyncio.PriorityQueue(maxsize=max_queue_size) tasks = set() @@ -290,10 +293,10 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): counter = 0 shutdown_event = asyncio.Event() worker_health_check_task = None - + # Track active future objects for cleanup active_futures = weakref.WeakSet() - + # Worker function to process tasks in the queue async def worker(): """Worker that processes tasks in the priority queue""" @@ -302,18 +305,22 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): try: # Use timeout to get tasks, allowing periodic checking of shutdown signal try: - priority, count, future, args, kwargs = await asyncio.wait_for( - queue.get(), timeout=1.0 - ) + ( + priority, + count, + future, + args, + kwargs, + ) = await asyncio.wait_for(queue.get(), timeout=1.0) except asyncio.TimeoutError: # Timeout is just to check shutdown signal, continue to next iteration continue - + # If future is cancelled, skip execution if future.cancelled(): queue.task_done() continue - + try: # Execute function result = await func(*args, **kwargs) @@ -325,7 +332,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): future.cancel() logger.debug("limit_async: Task cancelled during execution") except Exception as e: - logger.error(f"limit_async: Error in decorated function: {str(e)}") + logger.error( + f"limit_async: Error in decorated function: {str(e)}" + ) if not future.done(): future.set_exception(e) finally: @@ -336,22 +345,24 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): await asyncio.sleep(0.1) # Prevent high CPU usage finally: logger.warning("limit_async: Worker exiting") - + async def health_check(): """Periodically check worker health status and recover""" try: while not shutdown_event.is_set(): await asyncio.sleep(5) # Check every 5 seconds - + async with lock: # Directly remove completed tasks from the tasks set tasks.difference_update({t for t in tasks if t.done()}) - + # Create new workers if active tasks less than max_size for better performance active_tasks_count = len(tasks) workers_needed = max_size - active_tasks_count if workers_needed > 0: - logger.info(f"limit_async: Creating {workers_needed} new workers") + logger.info( + f"limit_async: Creating {workers_needed} new workers" + ) for _ in range(workers_needed): task = asyncio.create_task(worker()) tasks.add(task) @@ -360,7 +371,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): logger.error(f"limit_async: Error in health check: {str(e)}") finally: logger.warning("limit_async: Health check task exiting") - + # Ensure worker tasks are started async def ensure_workers(): """Ensure worker tasks and health check are started""" @@ -373,20 +384,25 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): # Try to acquire the lock, wait up to 5 seconds lock_acquired = await asyncio.wait_for(lock.acquire(), timeout=5.0) except asyncio.TimeoutError: - logger.error("limit_async: Timeout acquiring lock in ensure_workers") + logger.error( + "limit_async: Timeout acquiring lock in ensure_workers" + ) # Even if acquiring the lock times out, continue trying to create workers try: # Start the health check task (if not already started) - if worker_health_check_task is None or worker_health_check_task.done(): + if ( + worker_health_check_task is None + or worker_health_check_task.done() + ): worker_health_check_task = asyncio.create_task(health_check()) # Directly remove completed tasks from the tasks set tasks.difference_update({t for t in tasks if t.done()}) - + # Calculate the number of active tasks active_tasks_count = len(tasks) - + # If active tasks count is less than max_size, create new workers workers_needed = max_size - active_tasks_count if workers_needed > 0: @@ -422,7 +438,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): try: await asyncio.wait_for(queue.join(), timeout=5.0) except asyncio.TimeoutError: - logger.warning("limit_async: Timeout waiting for queue to empty during shutdown") + logger.warning( + "limit_async: Timeout waiting for queue to empty during shutdown" + ) # Cancel all worker tasks for task in list(tasks): @@ -444,7 +462,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): logger.info("limit_async: Priority queue workers shutdown complete") @wraps(func) - async def wait_func(*args, _priority=10, _timeout=None, _queue_timeout=None, **kwargs): + async def wait_func( + *args, _priority=10, _timeout=None, _queue_timeout=None, **kwargs + ): """ Execute the function with priority-based concurrency control Args: @@ -479,10 +499,12 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): try: await asyncio.wait_for( queue.put((_priority, current_count, future, args, kwargs)), - timeout=_queue_timeout + timeout=_queue_timeout, ) except asyncio.TimeoutError: - raise QueueFullError(f"Queue full, timeout after {_queue_timeout} seconds") + raise QueueFullError( + f"Queue full, timeout after {_queue_timeout} seconds" + ) else: # No timeout, may wait indefinitely await queue.put((_priority, current_count, future, args, kwargs)) @@ -502,7 +524,9 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): # Cancel the future if not future.done(): future.cancel() - raise TimeoutError(f"limit_async: Task timed out after {_timeout} seconds") + raise TimeoutError( + f"limit_async: Task timed out after {_timeout} seconds" + ) else: # Wait for the result without timeout return await future @@ -514,7 +538,7 @@ def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): wait_func.shutdown = shutdown return wait_func - + return final_decro