diff --git a/amarillo/models/User.py b/amarillo/models/User.py index efd20be..fa9a358 100644 --- a/amarillo/models/User.py +++ b/amarillo/models/User.py @@ -1,5 +1,7 @@ -from typing import Optional -from pydantic import ConfigDict, BaseModel, Field +from typing import Annotated, Optional, List, Union +from pydantic import ConfigDict, BaseModel, Field, constr + +MyUrlsType = constr(regex="^[a-z]$") class User(BaseModel): #TODO: add attributes admin, permissions, fullname, email @@ -22,6 +24,11 @@ class User(BaseModel): min_length=8, max_length=256, examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"]) + permissions: Optional[List[str]] = Field([], + description="The permissions of this user, a list of strings in the format or ", + max_length=256, + # pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO + examples=["ride2go:read", "all:read", "admin", "geojson"]) model_config = ConfigDict(json_schema_extra={ "title": "Agency Configuration", "description": "Configuration for an agency.", diff --git a/amarillo/routers/carpool.py b/amarillo/routers/carpool.py index 4a70c85..8310189 100644 --- a/amarillo/routers/carpool.py +++ b/amarillo/routers/carpool.py @@ -73,7 +73,7 @@ async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(ge "description": "Carpool or agency not found"}, }, ) -async def delete_carpool(agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency)): +async def delete_carpool(agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency) ): await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id) logger.info(f"Delete trip {agency_id}:{carpool_id}.") diff --git a/amarillo/services/oauth2.py b/amarillo/services/oauth2.py index 77332ec..b71ffdc 100644 --- a/amarillo/services/oauth2.py +++ b/amarillo/services/oauth2.py @@ -8,6 +8,7 @@ from fastapi import Depends, HTTPException, Header, status, APIRouter from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from pydantic import BaseModel +from amarillo.models.User import User from amarillo.services.passwords import verify_password from amarillo.utils.container import container from amarillo.services.agencies import AgencyService @@ -30,6 +31,7 @@ class Token(BaseModel): token_type: str class TokenData(BaseModel): + #TODO: rename to user_id agency_id: Union[str, None] = None oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) @@ -58,7 +60,13 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt +#TODO: function verify_permission(user, permission) + +#TODO: rename to get_current_user, agency_from_api_key -> user_from_api_key async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)): + return (await get_current_user(token, agency_from_api_key)).user_id + +async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)) -> User: if token: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -73,13 +81,16 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap token_data = TokenData(agency_id=agency_id) except JWTError: raise credentials_exception - user = token_data.agency_id - if user is None: + user_id = token_data.agency_id + if user_id is None: raise credentials_exception - return user + + user_service : UserService = container['users'] + return user_service.get_user(user_id) elif agency_from_api_key: logger.info(f"API Key provided: {agency_from_api_key}") - return agency_from_api_key + user_service : UserService = container['users'] + return user_service.get_user(agency_from_api_key) else: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -88,6 +99,18 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap ) raise credentials_exception +# TODO: use verify_permission("admin", user) + +def verify_permission(permission: str, user: User): + # permission_exception = + + if user.permissions is None or permission not in user.permissions: raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=f"User '{user}' does not have the permission '{permission}'", + headers={"WWW-Authenticate": "Bearer"}, + ) + + async def verify_admin(agency: str = Depends(get_current_agency)): #TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?