Use OAuth2 to authorize endpoints
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit

This commit is contained in:
Csaba 2024-03-01 15:09:52 +01:00
parent acd06b522a
commit 206f93ddde
5 changed files with 32 additions and 32 deletions

View file

@ -5,7 +5,8 @@ from typing import List
from fastapi import APIRouter, HTTPException, status, Depends
from amarillo.models.Carpool import Carpool, Agency
from amarillo.routers.agencyconf import verify_api_key, verify_admin_api_key, verify_permission_for_same_agency_or_admin
from amarillo.routers.agencyconf import verify_permission_for_same_agency_or_admin
from amarillo.services.oauth2 import get_current_agency
# TODO should move this to service
from amarillo.routers.carpool import store_carpool, delete_agency_carpools_older_than
from amarillo.services.agencies import AgencyService
@ -32,7 +33,7 @@ router = APIRouter(
status.HTTP_404_NOT_FOUND: {"description": "Agency not found"},
},
)
async def get_agency(agency_id: str, admin_api_key: str = Depends(verify_api_key)) -> Agency:
async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_agency)) -> Agency:
agencies: AgencyService = container['agencies']
agency = agencies.get_agency(agency_id)
agency_exists = agency is not None
@ -61,7 +62,7 @@ async def get_agency(agency_id: str, admin_api_key: str = Depends(verify_api_key
status.HTTP_500_INTERNAL_SERVER_ERROR: {
"description": "Import error"}
})
async def sync(agency_id: str, requesting_agency_id: str = Depends(verify_api_key)) -> List[Carpool]:
async def sync(agency_id: str, requesting_agency_id: str = Depends(get_current_agency)) -> List[Carpool]:
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
if agency_id == "ride2go":

View file

@ -5,6 +5,7 @@ from fastapi import APIRouter, HTTPException, status, Header, Depends
from amarillo.models.AgencyConf import AgencyConf
from amarillo.services.agencyconf import AgencyConfService
from amarillo.services.oauth2 import get_current_agency, verify_admin
from amarillo.services.config import config
from amarillo.utils.container import container
@ -20,24 +21,6 @@ router = APIRouter(
include_in_schema = config.env != 'PROD'
# noinspection PyPep8Naming
# X_API_Key is upper case for OpenAPI
async def verify_admin_api_key(X_API_Key: str = Header(...)):
if X_API_Key != config.admin_token:
message="X-API-Key header invalid"
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
return "admin"
# noinspection PyPep8Naming
# X_API_Key is upper case for OpenAPI
async def verify_api_key(X_API_Key: str = Header(...)):
agency_conf_service: AgencyConfService = container['agencyconf']
return agency_conf_service.check_api_key(X_API_Key)
# TODO Return code 403 Unauthoized (in response_status_codes as well...)
async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body, agency_id_from_api_key):
"""Verifies that an agency is accessing something it owns or the user is admin
@ -61,7 +44,7 @@ async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body,
response_model=List[str],
description="Returns the agency_ids but not the details.",
status_code=status.HTTP_200_OK)
async def get_agency_ids(admin_api_key: str = Depends(verify_api_key)) -> [str]:
async def get_agency_ids(admin_api_key: str = Depends(get_current_agency)) -> [str]:
return container['agencyconf'].get_agency_ids()
@ -69,7 +52,7 @@ async def get_agency_ids(admin_api_key: str = Depends(verify_api_key)) -> [str]:
include_in_schema=include_in_schema,
operation_id="postNewAgencyConf",
summary="Post a new AgencyConf")
async def post_agency_conf(agency_conf: AgencyConf, admin_api_key: str = Depends(verify_admin_api_key)):
async def post_agency_conf(agency_conf: AgencyConf, admin_api_key: str = Depends(verify_admin)):
agency_conf_service: AgencyConfService = container['agencyconf']
agency_conf_service.add(agency_conf)
@ -81,7 +64,7 @@ async def post_agency_conf(agency_conf: AgencyConf, admin_api_key: str = Depends
summary="Delete configuration of an agency. Returns true if the token for the agency existed, "
"false if it didn't exist."
)
async def delete_agency_conf(agency_id: str, requesting_agency_id: str = Depends(verify_api_key)):
async def delete_agency_conf(agency_id: str, requesting_agency_id: str = Depends(get_current_agency)):
agency_may_delete_own = requesting_agency_id == agency_id
admin_may_delete_everything = requesting_agency_id == "admin"
is_permitted = agency_may_delete_own or admin_may_delete_everything

View file

@ -5,11 +5,12 @@ import os.path
import re
from glob import glob
from fastapi import APIRouter, Body, Header, HTTPException, status, Depends
from fastapi import APIRouter, Body, HTTPException, status, Depends
from datetime import datetime
from amarillo.models.Carpool import Carpool
from amarillo.routers.agencyconf import verify_api_key, verify_permission_for_same_agency_or_admin
from amarillo.routers.agencyconf import verify_permission_for_same_agency_or_admin
from amarillo.services.oauth2 import get_current_agency
from amarillo.tests.sampledata import examples
@ -32,7 +33,7 @@ router = APIRouter(
})
async def post_carpool(carpool: Carpool = Body(..., examples=examples),
requesting_agency_id: str = Depends(verify_api_key)) -> Carpool:
requesting_agency_id: str = Depends(get_current_agency)) -> Carpool:
await verify_permission_for_same_agency_or_admin(carpool.agency, requesting_agency_id)
logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
@ -53,7 +54,7 @@ async def post_carpool(carpool: Carpool = Body(..., examples=examples),
status.HTTP_404_NOT_FOUND: {"description": "Carpool not found"},
},
)
async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(verify_api_key)) -> Carpool:
async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(get_current_agency)) -> Carpool:
logger.info(f"Get trip {agency_id}:{carpool_id}.")
await assert_agency_exists(agency_id)
await assert_carpool_exists(agency_id, carpool_id)
@ -72,7 +73,7 @@ async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(ve
"description": "Carpool or agency not found"},
},
)
async def delete_carpool(agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(verify_api_key)):
async def delete_carpool(agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency)):
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
logger.info(f"Delete trip {agency_id}:{carpool_id}.")

View file

@ -5,10 +5,8 @@ from typing import List
from fastapi import APIRouter, HTTPException, status, Depends
from amarillo.models.Carpool import Region
from amarillo.routers.agencyconf import verify_admin_api_key
from amarillo.services.regions import RegionService
from amarillo.utils.container import container
from fastapi.responses import FileResponse
logger = logging.getLogger(__name__)

View file

@ -8,7 +8,6 @@ from fastapi import Depends, HTTPException, Header, status, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from pydantic import BaseModel
from amarillo.routers.agencyconf import verify_api_key
from amarillo.services.passwords import verify_password
from amarillo.utils.container import container
from amarillo.services.agencies import AgencyService
@ -90,6 +89,24 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
)
raise credentials_exception
async def verify_admin(agency: str = Depends(get_current_agency)):
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?
if(agency != "admin"):
message="This operation requires admin privileges"
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
return "admin"
# noinspection PyPep8Naming
# X_API_Key is upper case for OpenAPI
async def verify_api_key(X_API_Key: str = Header(...)):
agency_conf_service: AgencyConfService = container['agencyconf']
return agency_conf_service.check_api_key(X_API_Key)
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]