Use verify_permission in routes
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit

This commit is contained in:
Csaba 2024-04-22 14:49:42 +02:00
parent c8acc46382
commit a04397f59d
6 changed files with 43 additions and 73 deletions

View file

@ -21,7 +21,7 @@ class User(BaseModel):
min_length=8,
max_length=256,
examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"])
permissions: Optional[List[Annotated[str, Field(pattern=r'^[a-z0-9]+(:[a-z]+)?$')]]] = Field([],
permissions: Optional[List[Annotated[str, Field(pattern=r'^[a-z0-9-]+(:[a-z]+)?$')]]] = Field([],
description="The permissions of this user, a list of strings in the format <agency:operation> or <operation>",
max_length=256,
# pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO

View file

@ -5,8 +5,8 @@ from typing import List
from fastapi import APIRouter, HTTPException, status, Depends
from amarillo.models.Carpool import Carpool, Agency
from amarillo.routers.users import verify_permission_for_same_agency_or_admin
from amarillo.services.oauth2 import get_current_agency
from amarillo.models.User import User
from amarillo.services.oauth2 import get_current_user, verify_permission
# TODO should move this to service
from amarillo.routers.carpool import store_carpool, delete_agency_carpools_older_than
from amarillo.services.agencies import AgencyService
@ -33,7 +33,7 @@ router = APIRouter(
status.HTTP_404_NOT_FOUND: {"description": "Agency not found"},
},
)
async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_agency)) -> Agency:
async def get_agency(agency_id: str, requesting_user: User = Depends(get_current_user)) -> Agency:
agencies: AgencyService = container['agencies']
agency = agencies.get_agency(agency_id)
agency_exists = agency is not None
@ -62,8 +62,8 @@ async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_ag
status.HTTP_500_INTERNAL_SERVER_ERROR: {
"description": "Import error"}
})
async def sync(agency_id: str, requesting_agency_id: str = Depends(get_current_agency)) -> List[Carpool]:
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
async def sync(agency_id: str, requesting_user: User = Depends(get_current_user)) -> List[Carpool]:
verify_permission(f"{agency_id}:sync")
if agency_id == "ride2go":
import_function = import_ride2go

View file

@ -9,8 +9,8 @@ from fastapi import APIRouter, Body, HTTPException, status, Depends
from datetime import datetime
from amarillo.models.Carpool import Carpool
from amarillo.routers.users import verify_permission_for_same_agency_or_admin
from amarillo.services.oauth2 import get_current_agency
from amarillo.models.User import User
from amarillo.services.oauth2 import get_current_user, verify_permission
from amarillo.tests.sampledata import examples
@ -33,8 +33,8 @@ router = APIRouter(
})
async def post_carpool(carpool: Carpool = Body(..., examples=examples),
requesting_agency_id: str = Depends(get_current_agency)) -> Carpool:
await verify_permission_for_same_agency_or_admin(carpool.agency, requesting_agency_id)
requesting_user: User = Depends(get_current_user)) -> Carpool:
verify_permission(f"{carpool.agency}:write", requesting_user)
logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
await assert_agency_exists(carpool.agency)
@ -54,7 +54,9 @@ async def post_carpool(carpool: Carpool = Body(..., examples=examples),
status.HTTP_404_NOT_FOUND: {"description": "Carpool not found"},
},
)
async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(get_current_agency)) -> Carpool:
async def get_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)) -> Carpool:
verify_permission(f"{agency_id}:read", requesting_user)
logger.info(f"Get trip {agency_id}:{carpool_id}.")
await assert_agency_exists(agency_id)
await assert_carpool_exists(agency_id, carpool_id)
@ -73,8 +75,8 @@ async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(ge
"description": "Carpool or agency not found"},
},
)
async def delete_carpool(agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency) ):
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
async def delete_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)):
verify_permission(f"{agency_id}:write", requesting_user)
logger.info(f"Delete trip {agency_id}:{carpool_id}.")
await assert_agency_exists(agency_id)

View file

@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, status, Header, Depends
from amarillo.models.User import User
from amarillo.services.users import UserService
from amarillo.services.oauth2 import get_current_agency, verify_admin
from amarillo.services.oauth2 import get_current_user, verify_permission
from amarillo.services.config import config
from amarillo.utils.container import container
@ -21,22 +21,6 @@ router = APIRouter(
include_in_schema = config.env != 'PROD'
# TODO Return code 403 Unauthoized (in response_status_codes as well...)
async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body, agency_id_from_api_key):
"""Verifies that an agency is accessing something it owns or the user is admin
The agency_id is part of some paths, or when not in the path it is in the body, e.g. in PUT /carpool.
This function encapsulates the formula 'working with own stuff, or admin'.
"""
is_permitted = agency_id_in_path_or_body == agency_id_from_api_key or agency_id_from_api_key == "admin"
if not is_permitted:
message = f"Working with {agency_id_in_path_or_body} resources is not permitted for {agency_id_from_api_key}."
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
@router.get("/",
include_in_schema=include_in_schema,
operation_id="getUserIdsWhichHaveAConfiguration",
@ -44,7 +28,7 @@ async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body,
response_model=List[str],
description="Returns the user_ids but not the details.",
status_code=status.HTTP_200_OK)
async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str]:
async def get_user_ids(requesting_user: User = Depends(get_current_user)) -> [str]:
return container['users'].get_user_ids()
@ -52,7 +36,8 @@ async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str
include_in_schema=include_in_schema,
operation_id="postNewUserConf",
summary="Post a new User")
async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_admin)):
async def post_user_conf(user_conf: User, requesting_user: User = Depends(get_current_user)):
verify_permission("admin", requesting_user)
user_service: UserService = container['users']
user_service.add(user_conf)
@ -64,13 +49,13 @@ async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_ad
summary="Delete configuration of a user. Returns true if the token for the user existed, "
"false if it didn't exist."
)
async def delete_user(user_id: str, requesting_user_id: str = Depends(get_current_agency)):
agency_may_delete_own = requesting_user_id == user_id
admin_may_delete_everything = requesting_user_id == "admin"
is_permitted = agency_may_delete_own or admin_may_delete_everything
async def delete_user(user_id: str, requesting_user: User = Depends(get_current_user)):
user_may_delete_own = requesting_user.user_id == user_id
admin_may_delete_everything = "admin" in requesting_user.permissions
is_permitted = user_may_delete_own or admin_may_delete_everything
if not is_permitted:
message = f"The API key for {requesting_user_id} can not delete the configuration for {user_id}"
message = f"User '{requesting_user.user_id} can not delete the configuration for {user_id}"
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)

View file

@ -32,23 +32,22 @@ class Token(BaseModel):
token_type: str
class TokenData(BaseModel):
#TODO: rename to user_id
agency_id: Union[str, None] = None
user_id: Union[str, None] = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)):
if X_API_Key == None: return None
return await verify_api_key(X_API_Key)
def authenticate_agency(agency_id: str, password: str):
def authenticate_user(user_id: str, password: str):
user_service : UserService = container['users']
user_conf = user_service.user_id_to_user_conf.get(agency_id, None)
user_conf = user_service.user_id_to_user_conf.get(user_id, None)
if not user_conf:
return False
if not verify_password(password, user_conf.password):
return False
return agency_id
return user_id
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
@ -62,11 +61,7 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
return encoded_jwt
#TODO: rename to get_current_user, agency_from_api_key -> user_from_api_key
async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)):
return (await get_current_user(token, agency_from_api_key)).user_id
async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)) -> User:
async def get_current_user(token: str = Depends(oauth2_scheme), user_from_api_key: str = Depends(verify_optional_api_key)) -> User:
if token:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -75,22 +70,22 @@ async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
agency_id: str = payload.get("sub")
if agency_id is None:
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(agency_id=agency_id)
token_data = TokenData(user_id=user_id)
except JWTError:
raise credentials_exception
user_id = token_data.agency_id
user_id = token_data.user_id
if user_id is None:
raise credentials_exception
user_service : UserService = container['users']
return user_service.get_user(user_id)
elif agency_from_api_key:
logger.info(f"API Key provided: {agency_from_api_key}")
elif user_from_api_key:
logger.info(f"API Key provided: {user_from_api_key}")
user_service : UserService = container['users']
return user_service.get_user(agency_from_api_key)
return user_service.get_user(user_from_api_key)
else:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -99,9 +94,6 @@ async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_
)
raise credentials_exception
# TODO: use verify_permission("admin", user)
def verify_permission(permission: str, user: User):
def permissions_exception():
@ -121,6 +113,7 @@ def verify_permission(permission: str, user: User):
return
#permission is in agency:operation format
def permission_matches(permission, user_permission):
prescribed_agency, prescribed_operation = permission.split(":")
given_agency, given_operation = user_permission.split(":")
@ -132,17 +125,6 @@ def verify_permission(permission: str, user: User):
raise permissions_exception()
async def verify_admin(agency: str = Depends(get_current_agency)):
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?
if(agency != "admin"):
message="This operation requires admin privileges"
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
return "admin"
# noinspection PyPep8Naming
# X_API_Key is upper case for OpenAPI
async def verify_api_key(X_API_Key: str = Header(...)):
@ -154,7 +136,7 @@ async def verify_api_key(X_API_Key: str = Header(...)):
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
agency = authenticate_agency(form_data.username, form_data.password)
agency = authenticate_user(form_data.username, form_data.password)
if not agency:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -170,7 +152,7 @@ async def login_for_access_token(
# TODO: eventually remove this
@router.get("/users/me/", response_model=Agency)
async def read_users_me(
current_agency: Annotated[Agency, Depends(get_current_agency)]
current_agency: Annotated[Agency, Depends(get_current_user)]
):
agency_service : AgencyService = container['agencies']
return agency_service.get_agency(agency_id=current_agency)
@ -178,6 +160,6 @@ async def read_users_me(
# TODO: eventually remove this
@router.get("/users/me/items/")
async def read_own_items(
current_agency: Annotated[str, Depends(get_current_agency)]
current_agency: Annotated[str, Depends(get_current_user)]
):
return [{"item_id": "Foo", "owner": current_agency}]

View file

@ -3,11 +3,11 @@ import pytest
from amarillo.services.oauth2 import verify_permission
from amarillo.models.User import User
test_user = User(user_id="test", password="testpassword", permissions=["all:read", "mfdz:write", "ride2go:all", "metrics"])
test_user = User(user_id="test", password="testpassword", permissions=["all:read", "mfdz:write", "ride2go:all", "gtfs"])
admin_user = User(user_id="admin", password="testpassword", permissions=["admin"])
def test_operation():
verify_permission("metrics", test_user)
verify_permission("gtfs", test_user)
with pytest.raises(HTTPException):
verify_permission("geojson", test_user)
@ -25,6 +25,7 @@ def test_agency_permission():
def test_admin():
verify_permission("admin", admin_user)
verify_permission("gtfs", admin_user)
verify_permission("all:all", admin_user)
verify_permission("mvv:all", admin_user)
verify_permission("mfdz:read", admin_user)