Fixlinting

This commit is contained in:
yangdx
2025-03-21 16:56:47 +08:00
parent 20d65ae554
commit 53396e4d82
4 changed files with 36 additions and 21 deletions

View File

@@ -7,6 +7,7 @@ from dotenv import load_dotenv
load_dotenv() load_dotenv()
class TokenPayload(BaseModel): class TokenPayload(BaseModel):
sub: str # Username sub: str # Username
exp: datetime # Expiration time exp: datetime # Expiration time

View File

@@ -29,7 +29,9 @@ preload_app = True
worker_class = "uvicorn.workers.UvicornWorker" worker_class = "uvicorn.workers.UvicornWorker"
# Other Gunicorn configurations # Other Gunicorn configurations
timeout = int(os.getenv("TIMEOUT", 150 * 2)) # Default 150s *2 to match run_with_gunicorn.py timeout = int(
os.getenv("TIMEOUT", 150 * 2)
) # Default 150s *2 to match run_with_gunicorn.py
keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s keepalive = int(os.getenv("KEEPALIVE", 5)) # Default 5s
# Logging configuration # Logging configuration

View File

@@ -479,19 +479,23 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
max_parallel = global_args["max_parallel_insert"] max_parallel = global_args["max_parallel_insert"]
# Calculate batch size as 2 * MAX_PARALLEL_INSERT # Calculate batch size as 2 * MAX_PARALLEL_INSERT
batch_size = 2 * max_parallel batch_size = 2 * max_parallel
# Process files in batches # Process files in batches
for i in range(0, total_files, batch_size): for i in range(0, total_files, batch_size):
batch_files = new_files[i:i+batch_size] batch_files = new_files[i : i + batch_size]
batch_num = i // batch_size + 1 batch_num = i // batch_size + 1
total_batches = (total_files + batch_size - 1) // batch_size total_batches = (total_files + batch_size - 1) // batch_size
logger.info(f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files") logger.info(
f"Processing batch {batch_num}/{total_batches} with {len(batch_files)} files"
)
await pipeline_index_files(rag, batch_files) await pipeline_index_files(rag, batch_files)
# Log progress # Log progress
processed = min(i + batch_size, total_files) processed = min(i + batch_size, total_files)
logger.info(f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)") logger.info(
f"Processed {processed}/{total_files} files ({processed/total_files*100:.1f}%)"
)
except Exception as e: except Exception as e:
logger.error(f"Error during scanning process: {str(e)}") logger.error(f"Error during scanning process: {str(e)}")

View File

@@ -69,7 +69,7 @@ class UnifiedLock(Generic[T]):
f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})", f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# If in multiprocess mode and async lock exists, acquire it first # If in multiprocess mode and async lock exists, acquire it first
if not self._is_async and self._async_lock is not None: if not self._is_async and self._async_lock is not None:
direct_log( direct_log(
@@ -81,13 +81,13 @@ class UnifiedLock(Generic[T]):
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired", f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# Then acquire the main lock # Then acquire the main lock
if self._is_async: if self._is_async:
await self._lock.acquire() await self._lock.acquire()
else: else:
self._lock.acquire() self._lock.acquire()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})", f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
@@ -95,9 +95,13 @@ class UnifiedLock(Generic[T]):
return self return self
except Exception as e: except Exception as e:
# If main lock acquisition fails, release the async lock if it was acquired # If main lock acquisition fails, release the async lock if it was acquired
if not self._is_async and self._async_lock is not None and self._async_lock.locked(): if (
not self._is_async
and self._async_lock is not None
and self._async_lock.locked()
):
self._async_lock.release() self._async_lock.release()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}", f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
level="ERROR", level="ERROR",
@@ -112,15 +116,15 @@ class UnifiedLock(Generic[T]):
f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})", f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# Release main lock first # Release main lock first
if self._is_async: if self._is_async:
self._lock.release() self._lock.release()
else: else:
self._lock.release() self._lock.release()
main_lock_released = True main_lock_released = True
# Then release async lock if in multiprocess mode # Then release async lock if in multiprocess mode
if not self._is_async and self._async_lock is not None: if not self._is_async and self._async_lock is not None:
direct_log( direct_log(
@@ -128,7 +132,7 @@ class UnifiedLock(Generic[T]):
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
self._async_lock.release() self._async_lock.release()
direct_log( direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})", f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
enable_output=self._enable_logging, enable_output=self._enable_logging,
@@ -139,9 +143,13 @@ class UnifiedLock(Generic[T]):
level="ERROR", level="ERROR",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
# If main lock release failed but async lock hasn't been released, try to release it # If main lock release failed but async lock hasn't been released, try to release it
if not main_lock_released and not self._is_async and self._async_lock is not None: if (
not main_lock_released
and not self._is_async
and self._async_lock is not None
):
try: try:
direct_log( direct_log(
f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure", f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
@@ -159,7 +167,7 @@ class UnifiedLock(Generic[T]):
level="ERROR", level="ERROR",
enable_output=self._enable_logging, enable_output=self._enable_logging,
) )
raise raise
def __enter__(self) -> "UnifiedLock[T]": def __enter__(self) -> "UnifiedLock[T]":
@@ -321,7 +329,7 @@ def initialize_share_data(workers: int = 1):
_shared_dicts = _manager.dict() _shared_dicts = _manager.dict()
_init_flags = _manager.dict() _init_flags = _manager.dict()
_update_flags = _manager.dict() _update_flags = _manager.dict()
# Initialize async locks for multiprocess mode # Initialize async locks for multiprocess mode
_async_locks = { _async_locks = {
"internal_lock": asyncio.Lock(), "internal_lock": asyncio.Lock(),
@@ -330,7 +338,7 @@ def initialize_share_data(workers: int = 1):
"graph_db_lock": asyncio.Lock(), "graph_db_lock": asyncio.Lock(),
"data_init_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(),
} }
direct_log( direct_log(
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})" f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
) )