From 52df5ff6459a5a01b86c3e19132cc0b57ffe6fa3 Mon Sep 17 00:00:00 2001 From: orbisai0security Date: Tue, 21 Apr 2026 11:24:31 +0530 Subject: [PATCH] fix: V-001 security vulnerability Automated security fix generated by Orbis Security AI --- .../autogenstudio/web/auth/manager.py | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/python/packages/autogen-studio/autogenstudio/web/auth/manager.py b/python/packages/autogen-studio/autogenstudio/web/auth/manager.py index ab16e0432d0a..a1864514c91d 100644 --- a/python/packages/autogen-studio/autogenstudio/web/auth/manager.py +++ b/python/packages/autogen-studio/autogenstudio/web/auth/manager.py @@ -12,6 +12,13 @@ from .models import AuthConfig, User from .providers import AuthProvider, FirebaseAuthProvider, GithubAuthProvider, MSALAuthProvider, NoAuthProvider +# Allowed JWT algorithms — explicitly pin to HS256 only to prevent algorithm confusion +# and reject the 'none' algorithm attack vector. +_ALLOWED_JWT_ALGORITHMS = ["HS256"] + +# Minimum acceptable JWT secret length to resist brute-force attacks. +_MIN_JWT_SECRET_LENGTH = 32 + class AuthManager: """ @@ -23,6 +30,11 @@ def __init__(self, config: AuthConfig): """Initialize the auth manager with configuration.""" self.config = config self.provider = self._create_provider() + if config.jwt_secret and len(config.jwt_secret) < _MIN_JWT_SECRET_LENGTH: + logger.warning( + "JWT secret is shorter than the recommended minimum of " + f"{_MIN_JWT_SECRET_LENGTH} characters; consider using a longer secret." + ) logger.info(f"Initialized auth manager with provider: {config.type}") def _create_provider(self) -> AuthProvider: @@ -47,17 +59,41 @@ def create_token(self, user: User) -> str: logger.warning("JWT secret not configured, using insecure token") return "dummy_token_" + user.id - expiry = datetime.now(timezone.utc) + timedelta(minutes=self.config.token_expiry_minutes) + now = datetime.now(timezone.utc) + expiry = now + timedelta(minutes=self.config.token_expiry_minutes) payload = { "sub": user.id, "name": user.name, "email": user.email, "provider": user.provider, "roles": user.roles, + "iat": now, "exp": expiry, } return jwt.encode(payload, self.config.jwt_secret, algorithm="HS256") + def _decode_token(self, token: str) -> Dict[str, Any]: + """Decode and validate a JWT token with strict options. + + Raises InvalidTokenException on any validation failure. + """ + if not self.config.jwt_secret: + raise InvalidTokenException() + + decode_options: Dict[str, Any] = { + "verify_signature": True, + "require": ["sub", "exp", "iat"], + "verify_exp": True, + "verify_iat": True, + } + + return jwt.decode( + token, + self.config.jwt_secret, + algorithms=_ALLOWED_JWT_ALGORITHMS, + options=decode_options, + ) + async def authenticate_request(self, request: Request) -> User: """Authenticate a request and return user information.""" # Check if path should be excluded from auth @@ -78,12 +114,12 @@ async def authenticate_request(self, request: Request) -> User: try: if not self.config.jwt_secret: - # For development with no JWT secret - logger.warning("JWT secret not configured, accepting all tokens") - return User(id="guestuser@gmail.com", name="Default User", provider="none") + # JWT secret is mandatory for authenticated providers; refuse the request. + logger.error("JWT secret not configured but authentication is required") + raise InvalidTokenException() - # Decode and validate JWT - payload = jwt.decode(token, self.config.jwt_secret, algorithms=["HS256"]) + # Decode and validate JWT with strict algorithm pinning and claim checks. + payload = self._decode_token(token) # Create User object from token payload return User( @@ -104,13 +140,13 @@ async def authenticate_request(self, request: Request) -> User: def is_valid_token(self, token: str) -> bool: """Check if a JWT token is valid.""" if not self.config.jwt_secret: - return True # No validation in dev mode + return False # Refuse validation when no secret is configured try: - jwt.decode(token, self.config.jwt_secret, algorithms=["HS256"]) + self._decode_token(token) return True - except jwt.ExpiredSignatureError: - logger.warning("Token has expired") + except (jwt.ExpiredSignatureError, jwt.InvalidTokenError, InvalidTokenException): + logger.warning("Token validation failed") return False @classmethod