feat(auth): implement auto guest mode and enhance token system

- Add role-based token system with metadata support
- Implement automatic guest mode for unconfigured authentication
- Create new /auth-status endpoint for authentication status checking
- Modify frontend to auto-detect auth status and bypass login when appropriate
- Add guest mode indicator in site header for better UX

This change allows users to automatically access the system without manual
login when authentication is not configured, while maintaining secure
authentication when credentials are properly set up.
This commit is contained in:
yangdx
2025-03-18 02:56:02 +08:00
parent 056b58af75
commit f8440c8f80
11 changed files with 460 additions and 45 deletions

View File

@@ -6,8 +6,10 @@ from pydantic import BaseModel
class TokenPayload(BaseModel):
sub: str
exp: datetime
sub: str # Username
exp: datetime # Expiration time
role: str = "user" # User role, default is regular user
metadata: dict = {} # Additional metadata
class AuthHandler:
@@ -15,13 +17,55 @@ class AuthHandler:
self.secret = os.getenv("TOKEN_SECRET", "4f85ds4f56dsf46")
self.algorithm = "HS256"
self.expire_hours = int(os.getenv("TOKEN_EXPIRE_HOURS", 4))
self.guest_expire_hours = int(os.getenv("GUEST_TOKEN_EXPIRE_HOURS", 2)) # Guest token default expiration time
def create_token(self, username: str) -> str:
expire = datetime.utcnow() + timedelta(hours=self.expire_hours)
payload = TokenPayload(sub=username, exp=expire)
def create_token(self, username: str, role: str = "user", custom_expire_hours: int = None, metadata: dict = None) -> str:
"""
Create JWT token
Args:
username: Username
role: User role, default is "user", guest is "guest"
custom_expire_hours: Custom expiration time (hours), if None use default value
metadata: Additional metadata
Returns:
str: Encoded JWT token
"""
# Choose default expiration time based on role
if custom_expire_hours is None:
if role == "guest":
expire_hours = self.guest_expire_hours
else:
expire_hours = self.expire_hours
else:
expire_hours = custom_expire_hours
expire = datetime.utcnow() + timedelta(hours=expire_hours)
# Create payload
payload = TokenPayload(
sub=username,
exp=expire,
role=role,
metadata=metadata or {}
)
return jwt.encode(payload.dict(), self.secret, algorithm=self.algorithm)
def validate_token(self, token: str) -> str:
def validate_token(self, token: str) -> dict:
"""
Validate JWT token
Args:
token: JWT token
Returns:
dict: Dictionary containing user information
Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(token, self.secret, algorithms=[self.algorithm])
expire_timestamp = payload["exp"]
@@ -31,7 +75,14 @@ class AuthHandler:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired"
)
return payload["sub"]
# Return complete payload instead of just username
return {
"username": payload["sub"],
"role": payload.get("role", "user"),
"metadata": payload.get("metadata", {}),
"exp": expire_time
}
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token"

View File

@@ -341,25 +341,66 @@ def create_app(args):
ollama_api = OllamaAPI(rag, top_k=args.top_k)
app.include_router(ollama_api.router, prefix="/api")
@app.get("/auth-status", dependencies=[Depends(optional_api_key)])
async def get_auth_status():
"""Get authentication status and guest token if auth is not configured"""
username = os.getenv("AUTH_USERNAME")
password = os.getenv("AUTH_PASSWORD")
if not (username and password):
# Authentication not configured, return guest token
guest_token = auth_handler.create_token(
username="guest",
role="guest",
metadata={"auth_mode": "disabled"}
)
return {
"auth_configured": False,
"access_token": guest_token,
"token_type": "bearer",
"auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access."
}
return {
"auth_configured": True,
"auth_mode": "enabled"
}
@app.post("/login", dependencies=[Depends(optional_api_key)])
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
username = os.getenv("AUTH_USERNAME")
password = os.getenv("AUTH_PASSWORD")
if not (username and password):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Authentication not configured",
# Authentication not configured, return guest token
guest_token = auth_handler.create_token(
username="guest",
role="guest",
metadata={"auth_mode": "disabled"}
)
return {
"access_token": guest_token,
"token_type": "bearer",
"auth_mode": "disabled",
"message": "Authentication is disabled. Using guest access."
}
if form_data.username != username or form_data.password != password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials"
)
# Regular user login
user_token = auth_handler.create_token(
username=username,
role="user",
metadata={"auth_mode": "enabled"}
)
return {
"access_token": auth_handler.create_token(username),
"access_token": user_token,
"token_type": "bearer",
"auth_mode": "enabled"
}
@app.get("/health", dependencies=[Depends(optional_api_key)])

View File

@@ -9,7 +9,7 @@ import sys
import logging
from ascii_colors import ASCIIColors
from lightrag.api import __api_version__
from fastapi import HTTPException, Security, Depends, Request
from fastapi import HTTPException, Security, Depends, Request, status
from dotenv import load_dotenv
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from starlette.status import HTTP_403_FORBIDDEN
@@ -35,7 +35,8 @@ ollama_server_infos = OllamaServerInfos()
def get_auth_dependency():
whitelist = os.getenv("WHITELIST_PATHS", "").split(",")
# Set default whitelist paths
whitelist = os.getenv("WHITELIST_PATHS", "/login,/health").split(",")
async def dependency(
request: Request,
@@ -44,10 +45,41 @@ def get_auth_dependency():
if request.url.path in whitelist:
return
if not (os.getenv("AUTH_USERNAME") and os.getenv("AUTH_PASSWORD")):
# Check if authentication is configured
auth_configured = bool(os.getenv("AUTH_USERNAME") and os.getenv("AUTH_PASSWORD"))
# If authentication is not configured, accept any token including guest tokens
if not auth_configured:
if token: # If token is provided, still validate it
try:
# Validate token but don't raise exception
token_info = auth_handler.validate_token(token)
# Check if it's a guest token
if token_info.get("role") != "guest":
# Non-guest tokens are not valid when auth is not configured
pass
except Exception as e:
# Ignore validation errors but log them
print(f"Token validation error (ignored): {str(e)}")
return
auth_handler.validate_token(token)
# If authentication is configured, validate the token and reject guest tokens
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Token required"
)
token_info = auth_handler.validate_token(token)
# Reject guest tokens when authentication is configured
if token_info.get("role") == "guest":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Guest access not allowed when authentication is configured."
)
# At this point, we have a valid non-guest token
return
return dependency