Compare commits

..

5 commits

Author SHA1 Message Date
Csaba 797eff5b2a Change bearer token expiry to 1 week
All checks were successful
Amarillo/amarillo-gitea/amarillo-core/pipeline/head This commit looks good
2024-05-22 14:53:10 +02:00
Csaba b9b47dfc2a Updated Jenkinfile to use grfs-exporter
All checks were successful
Amarillo/amarillo-gitea/amarillo-core/pipeline/head This commit looks good
2024-04-22 15:06:02 +02:00
Csaba a04397f59d Use verify_permission in routes
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit
2024-04-22 14:49:42 +02:00
Csaba c8acc46382 verify_permission function 2024-04-22 12:49:13 +02:00
Csaba 11d5849290 Get current user function
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit
2024-04-18 16:55:35 +02:00
9 changed files with 143 additions and 100 deletions

2
Jenkinsfile vendored
View file

@ -9,7 +9,7 @@ pipeline {
IMAGE_NAME = 'amarillo' IMAGE_NAME = 'amarillo'
AMARILLO_DISTRIBUTION = '0.2' AMARILLO_DISTRIBUTION = '0.2'
TAG = "${AMARILLO_DISTRIBUTION}.${BUILD_NUMBER}" TAG = "${AMARILLO_DISTRIBUTION}.${BUILD_NUMBER}"
PLUGINS = 'amarillo-metrics amarillo-enhancer amarillo-grfs-export' PLUGINS = 'amarillo-metrics amarillo-enhancer amarillo-grfs-exporter'
DEPLOY_WEBHOOK_URL = 'http://amarillo.mfdz.de:8888/mitanand' DEPLOY_WEBHOOK_URL = 'http://amarillo.mfdz.de:8888/mitanand'
DEPLOY_SECRET = credentials('AMARILLO-JENKINS-DEPLOY-SECRET') DEPLOY_SECRET = credentials('AMARILLO-JENKINS-DEPLOY-SECRET')
} }

View file

@ -1,6 +1,5 @@
from typing import Optional from typing import Annotated, Optional, List
from pydantic import ConfigDict, BaseModel, Field from pydantic import ConfigDict, BaseModel, Field
class User(BaseModel): class User(BaseModel):
#TODO: add attributes admin, permissions, fullname, email #TODO: add attributes admin, permissions, fullname, email
@ -22,6 +21,11 @@ class User(BaseModel):
min_length=8, min_length=8,
max_length=256, max_length=256,
examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"]) examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"])
permissions: Optional[List[Annotated[str, Field(pattern=r'^[a-z0-9-]+(:[a-z]+)?$')]]] = Field([],
description="The permissions of this user, a list of strings in the format <agency:operation> or <operation>",
max_length=256,
# pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO
examples=["ride2go:read", "all:read", "admin", "geojson"])
model_config = ConfigDict(json_schema_extra={ model_config = ConfigDict(json_schema_extra={
"title": "Agency Configuration", "title": "Agency Configuration",
"description": "Configuration for an agency.", "description": "Configuration for an agency.",

View file

@ -5,8 +5,8 @@ from typing import List
from fastapi import APIRouter, HTTPException, status, Depends from fastapi import APIRouter, HTTPException, status, Depends
from amarillo.models.Carpool import Carpool, Agency from amarillo.models.Carpool import Carpool, Agency
from amarillo.routers.users import verify_permission_for_same_agency_or_admin from amarillo.models.User import User
from amarillo.services.oauth2 import get_current_agency from amarillo.services.oauth2 import get_current_user, verify_permission
# TODO should move this to service # TODO should move this to service
from amarillo.routers.carpool import store_carpool, delete_agency_carpools_older_than from amarillo.routers.carpool import store_carpool, delete_agency_carpools_older_than
from amarillo.services.agencies import AgencyService from amarillo.services.agencies import AgencyService
@ -33,7 +33,7 @@ router = APIRouter(
status.HTTP_404_NOT_FOUND: {"description": "Agency not found"}, status.HTTP_404_NOT_FOUND: {"description": "Agency not found"},
}, },
) )
async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_agency)) -> Agency: async def get_agency(agency_id: str, requesting_user: User = Depends(get_current_user)) -> Agency:
agencies: AgencyService = container['agencies'] agencies: AgencyService = container['agencies']
agency = agencies.get_agency(agency_id) agency = agencies.get_agency(agency_id)
agency_exists = agency is not None agency_exists = agency is not None
@ -62,8 +62,8 @@ async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_ag
status.HTTP_500_INTERNAL_SERVER_ERROR: { status.HTTP_500_INTERNAL_SERVER_ERROR: {
"description": "Import error"} "description": "Import error"}
}) })
async def sync(agency_id: str, requesting_agency_id: str = Depends(get_current_agency)) -> List[Carpool]: async def sync(agency_id: str, requesting_user: User = Depends(get_current_user)) -> List[Carpool]:
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id) verify_permission(f"{agency_id}:sync")
if agency_id == "ride2go": if agency_id == "ride2go":
import_function = import_ride2go import_function = import_ride2go

View file

@ -5,14 +5,14 @@ import os.path
import re import re
from glob import glob from glob import glob
from fastapi import APIRouter, Body, HTTPException, status, Depends, BackgroundTasks from fastapi import APIRouter, Body, HTTPException, status, Depends
from datetime import datetime from datetime import datetime
from amarillo.models.Carpool import Carpool from amarillo.models.Carpool import Carpool
from amarillo.routers.users import verify_permission_for_same_agency_or_admin from amarillo.models.User import User
from amarillo.services.oauth2 import get_current_agency from amarillo.services.oauth2 import get_current_user, verify_permission
from amarillo.tests.sampledata import examples from amarillo.tests.sampledata import examples
from amarillo.services.hooks import run_on_create, run_on_delete
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,11 +32,9 @@ router = APIRouter(
"description": "Agency does not exist"}, "description": "Agency does not exist"},
}) })
async def post_carpool(background_tasks: BackgroundTasks, carpool: Carpool = Body(..., examples=examples), async def post_carpool(carpool: Carpool = Body(..., examples=examples),
requesting_agency_id: str = Depends(get_current_agency)) -> Carpool: requesting_user: User = Depends(get_current_user)) -> Carpool:
await verify_permission_for_same_agency_or_admin(carpool.agency, requesting_agency_id) verify_permission(f"{carpool.agency}:write", requesting_user)
background_tasks.add_task(run_on_create, carpool)
logger.info(f"POST trip {carpool.agency}:{carpool.id}.") logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
await assert_agency_exists(carpool.agency) await assert_agency_exists(carpool.agency)
@ -56,7 +54,9 @@ async def post_carpool(background_tasks: BackgroundTasks, carpool: Carpool = Bod
status.HTTP_404_NOT_FOUND: {"description": "Carpool not found"}, status.HTTP_404_NOT_FOUND: {"description": "Carpool not found"},
}, },
) )
async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(get_current_agency)) -> Carpool: async def get_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)) -> Carpool:
verify_permission(f"{agency_id}:read", requesting_user)
logger.info(f"Get trip {agency_id}:{carpool_id}.") logger.info(f"Get trip {agency_id}:{carpool_id}.")
await assert_agency_exists(agency_id) await assert_agency_exists(agency_id)
await assert_carpool_exists(agency_id, carpool_id) await assert_carpool_exists(agency_id, carpool_id)
@ -75,14 +75,12 @@ async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(ge
"description": "Carpool or agency not found"}, "description": "Carpool or agency not found"},
}, },
) )
async def delete_carpool(background_tasks: BackgroundTasks, agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency)): async def delete_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)):
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id) verify_permission(f"{agency_id}:write", requesting_user)
logger.info(f"Delete trip {agency_id}:{carpool_id}.") logger.info(f"Delete trip {agency_id}:{carpool_id}.")
await assert_agency_exists(agency_id) await assert_agency_exists(agency_id)
await assert_carpool_exists(agency_id, carpool_id) await assert_carpool_exists(agency_id, carpool_id)
cp = await load_carpool(agency_id, carpool_id)
background_tasks.add_task(run_on_delete, cp)
return await _delete_carpool(agency_id, carpool_id) return await _delete_carpool(agency_id, carpool_id)
@ -95,6 +93,12 @@ async def _delete_carpool(agency_id: str, carpool_id: str):
logger.info(f"Saved carpool {agency_id}:{carpool_id} in trash.") logger.info(f"Saved carpool {agency_id}:{carpool_id} in trash.")
os.remove(f"data/carpool/{agency_id}/{carpool_id}.json") os.remove(f"data/carpool/{agency_id}/{carpool_id}.json")
try:
from amarillo.plugins.metrics import trips_deleted_counter
trips_deleted_counter.inc()
except ImportError:
pass
async def store_carpool(carpool: Carpool) -> Carpool: async def store_carpool(carpool: Carpool) -> Carpool:
carpool_exists = os.path.exists(f"data/carpool/{carpool.agency}/{carpool.id}.json") carpool_exists = os.path.exists(f"data/carpool/{carpool.agency}/{carpool.id}.json")
@ -102,6 +106,17 @@ async def store_carpool(carpool: Carpool) -> Carpool:
await set_lastUpdated_if_unset(carpool) await set_lastUpdated_if_unset(carpool)
await save_carpool(carpool) await save_carpool(carpool)
try:
from amarillo.plugins.metrics import trips_created_counter, trips_updated_counter
if(carpool_exists):
# logger.info("Incrementing trips updated")
trips_updated_counter.inc()
else:
# logger.info("Incrementing trips created")
trips_created_counter.inc()
except ImportError:
pass
return carpool return carpool
async def set_lastUpdated_if_unset(carpool): async def set_lastUpdated_if_unset(carpool):
@ -142,6 +157,4 @@ async def delete_agency_carpools_older_than(agency_id, timestamp):
if os.path.getmtime(carpool_file_name) < timestamp: if os.path.getmtime(carpool_file_name) < timestamp:
m = re.search(r'([a-zA-Z0-9_-]+)\.json$', carpool_file_name) m = re.search(r'([a-zA-Z0-9_-]+)\.json$', carpool_file_name)
# TODO log deletion # TODO log deletion
cp = await load_carpool(agency_id, m[1])
run_on_delete(cp)
await _delete_carpool(agency_id, m[1]) await _delete_carpool(agency_id, m[1])

View file

@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, status, Header, Depends
from amarillo.models.User import User from amarillo.models.User import User
from amarillo.services.users import UserService from amarillo.services.users import UserService
from amarillo.services.oauth2 import get_current_agency, verify_admin from amarillo.services.oauth2 import get_current_user, verify_permission
from amarillo.services.config import config from amarillo.services.config import config
from amarillo.utils.container import container from amarillo.utils.container import container
@ -21,22 +21,6 @@ router = APIRouter(
include_in_schema = config.env != 'PROD' include_in_schema = config.env != 'PROD'
# TODO Return code 403 Unauthoized (in response_status_codes as well...)
async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body, agency_id_from_api_key):
"""Verifies that an agency is accessing something it owns or the user is admin
The agency_id is part of some paths, or when not in the path it is in the body, e.g. in PUT /carpool.
This function encapsulates the formula 'working with own stuff, or admin'.
"""
is_permitted = agency_id_in_path_or_body == agency_id_from_api_key or agency_id_from_api_key == "admin"
if not is_permitted:
message = f"Working with {agency_id_in_path_or_body} resources is not permitted for {agency_id_from_api_key}."
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
@router.get("/", @router.get("/",
include_in_schema=include_in_schema, include_in_schema=include_in_schema,
operation_id="getUserIdsWhichHaveAConfiguration", operation_id="getUserIdsWhichHaveAConfiguration",
@ -44,7 +28,7 @@ async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body,
response_model=List[str], response_model=List[str],
description="Returns the user_ids but not the details.", description="Returns the user_ids but not the details.",
status_code=status.HTTP_200_OK) status_code=status.HTTP_200_OK)
async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str]: async def get_user_ids(requesting_user: User = Depends(get_current_user)) -> [str]:
return container['users'].get_user_ids() return container['users'].get_user_ids()
@ -52,7 +36,8 @@ async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str
include_in_schema=include_in_schema, include_in_schema=include_in_schema,
operation_id="postNewUserConf", operation_id="postNewUserConf",
summary="Post a new User") summary="Post a new User")
async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_admin)): async def post_user_conf(user_conf: User, requesting_user: User = Depends(get_current_user)):
verify_permission("admin", requesting_user)
user_service: UserService = container['users'] user_service: UserService = container['users']
user_service.add(user_conf) user_service.add(user_conf)
@ -64,13 +49,13 @@ async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_ad
summary="Delete configuration of a user. Returns true if the token for the user existed, " summary="Delete configuration of a user. Returns true if the token for the user existed, "
"false if it didn't exist." "false if it didn't exist."
) )
async def delete_user(user_id: str, requesting_user_id: str = Depends(get_current_agency)): async def delete_user(user_id: str, requesting_user: User = Depends(get_current_user)):
agency_may_delete_own = requesting_user_id == user_id user_may_delete_own = requesting_user.user_id == user_id
admin_may_delete_everything = requesting_user_id == "admin" admin_may_delete_everything = "admin" in requesting_user.permissions
is_permitted = agency_may_delete_own or admin_may_delete_everything is_permitted = user_may_delete_own or admin_may_delete_everything
if not is_permitted: if not is_permitted:
message = f"The API key for {requesting_user_id} can not delete the configuration for {user_id}" message = f"User '{requesting_user.user_id} can not delete the configuration for {user_id}"
logger.error(message) logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)

View file

@ -37,6 +37,11 @@ class CarpoolService():
if cp and self.is_outdated(cp): if cp and self.is_outdated(cp):
logger.info("Purge outdated offer %s", key) logger.info("Purge outdated offer %s", key)
self.delete(cp.agency, cp.id) self.delete(cp.agency, cp.id)
try:
from amarillo.plugins.metrics import trips_deleted_counter
trips_deleted_counter.inc()
except ImportError:
pass
def get(self, agency_id: str, carpool_id: str): def get(self, agency_id: str, carpool_id: str):
return self.carpools.get(f"{agency_id}:{carpool_id}") return self.carpools.get(f"{agency_id}:{carpool_id}")

View file

@ -1,27 +0,0 @@
from typing import List
from amarillo.models.Carpool import Carpool
class CarpoolEvents:
def on_create(cp : Carpool):
pass
def on_update(cp : Carpool):
pass
def on_delete(cp : Carpool):
pass
carpool_event_listeners : List[CarpoolEvents] = []
def register_carpool_event_listener(cpe : CarpoolEvents):
carpool_event_listeners.append(cpe)
def run_on_create(cp: Carpool):
for cpe in carpool_event_listeners:
cpe.on_create(cp)
def run_on_update(cp: Carpool):
for cpe in carpool_event_listeners:
cpe.on_update(cp)
def run_on_delete(cp: Carpool):
for cpe in carpool_event_listeners:
cpe.on_delete(cp)

View file

@ -3,11 +3,13 @@
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Annotated, Optional, Union from typing import Annotated, Optional, Union
import logging import logging
import logging.config
from fastapi import Depends, HTTPException, Header, status, APIRouter from fastapi import Depends, HTTPException, Header, status, APIRouter
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt from jose import JWTError, jwt
from pydantic import BaseModel from pydantic import BaseModel
from amarillo.models.User import User
from amarillo.services.passwords import verify_password from amarillo.services.passwords import verify_password
from amarillo.utils.container import container from amarillo.utils.container import container
from amarillo.services.agencies import AgencyService from amarillo.services.agencies import AgencyService
@ -18,7 +20,7 @@ from amarillo.services.secrets import secrets
SECRET_KEY = secrets.secret_key SECRET_KEY = secrets.secret_key
ALGORITHM = "HS256" ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 ACCESS_TOKEN_EXPIRE_MINUTES = 7*24*60
logging.config.fileConfig('logging.conf', disable_existing_loggers=False) logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
logger = logging.getLogger("main") logger = logging.getLogger("main")
@ -30,22 +32,22 @@ class Token(BaseModel):
token_type: str token_type: str
class TokenData(BaseModel): class TokenData(BaseModel):
agency_id: Union[str, None] = None user_id: Union[str, None] = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)): async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)):
if X_API_Key == None: return None if X_API_Key == None: return None
return await verify_api_key(X_API_Key) return await verify_api_key(X_API_Key)
def authenticate_agency(agency_id: str, password: str): def authenticate_user(user_id: str, password: str):
user_service : UserService = container['users'] user_service : UserService = container['users']
user_conf = user_service.user_id_to_user_conf.get(agency_id, None) user_conf = user_service.user_id_to_user_conf.get(user_id, None)
if not user_conf: if not user_conf:
return False return False
if not verify_password(password, user_conf.password): if not verify_password(password, user_conf.password):
return False return False
return agency_id return user_id
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
@ -58,7 +60,8 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt return encoded_jwt
async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)):
async def get_current_user(token: str = Depends(oauth2_scheme), user_from_api_key: str = Depends(verify_optional_api_key)) -> User:
if token: if token:
credentials_exception = HTTPException( credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -67,19 +70,22 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
) )
try: try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
agency_id: str = payload.get("sub") user_id: str = payload.get("sub")
if agency_id is None: if user_id is None:
raise credentials_exception raise credentials_exception
token_data = TokenData(agency_id=agency_id) token_data = TokenData(user_id=user_id)
except JWTError: except JWTError:
raise credentials_exception raise credentials_exception
user = token_data.agency_id user_id = token_data.user_id
if user is None: if user_id is None:
raise credentials_exception raise credentials_exception
return user
elif agency_from_api_key: user_service : UserService = container['users']
logger.info(f"API Key provided: {agency_from_api_key}") return user_service.get_user(user_id)
return agency_from_api_key elif user_from_api_key:
logger.info(f"API Key provided: {user_from_api_key}")
user_service : UserService = container['users']
return user_service.get_user(user_from_api_key)
else: else:
credentials_exception = HTTPException( credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -88,15 +94,35 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
) )
raise credentials_exception raise credentials_exception
def verify_permission(permission: str, user: User):
async def verify_admin(agency: str = Depends(get_current_agency)): def permissions_exception():
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin? return HTTPException(
if(agency != "admin"): status_code=status.HTTP_401_UNAUTHORIZED,
message="This operation requires admin privileges" detail=f"User '{user.user_id}' does not have the permission '{permission}'",
logger.error(message) headers={"WWW-Authenticate": "Bearer"},
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) )
return "admin" #user is admin
if "admin" in user.permissions: return
#permission is an operation
if ":" not in permission:
if permission not in user.permissions:
raise permissions_exception()
return
#permission is in agency:operation format
def permission_matches(permission, user_permission):
prescribed_agency, prescribed_operation = permission.split(":")
given_agency, given_operation = user_permission.split(":")
return (prescribed_agency == given_agency or given_agency == "all") and (prescribed_operation == given_operation or given_operation == "all")
if any(permission_matches(permission, p) for p in user.permissions if ":" in p): return
raise permissions_exception()
# noinspection PyPep8Naming # noinspection PyPep8Naming
@ -110,7 +136,7 @@ async def verify_api_key(X_API_Key: str = Header(...)):
async def login_for_access_token( async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()] form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token: ) -> Token:
agency = authenticate_agency(form_data.username, form_data.password) agency = authenticate_user(form_data.username, form_data.password)
if not agency: if not agency:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -126,7 +152,7 @@ async def login_for_access_token(
# TODO: eventually remove this # TODO: eventually remove this
@router.get("/users/me/", response_model=Agency) @router.get("/users/me/", response_model=Agency)
async def read_users_me( async def read_users_me(
current_agency: Annotated[Agency, Depends(get_current_agency)] current_agency: Annotated[Agency, Depends(get_current_user)]
): ):
agency_service : AgencyService = container['agencies'] agency_service : AgencyService = container['agencies']
return agency_service.get_agency(agency_id=current_agency) return agency_service.get_agency(agency_id=current_agency)
@ -134,6 +160,6 @@ async def read_users_me(
# TODO: eventually remove this # TODO: eventually remove this
@router.get("/users/me/items/") @router.get("/users/me/items/")
async def read_own_items( async def read_own_items(
current_agency: Annotated[str, Depends(get_current_agency)] current_agency: Annotated[str, Depends(get_current_user)]
): ):
return [{"item_id": "Foo", "owner": current_agency}] return [{"item_id": "Foo", "owner": current_agency}]

View file

@ -0,0 +1,37 @@
from fastapi import HTTPException
import pytest
from amarillo.services.oauth2 import verify_permission
from amarillo.models.User import User
test_user = User(user_id="test", password="testpassword", permissions=["all:read", "mfdz:write", "ride2go:all", "gtfs"])
admin_user = User(user_id="admin", password="testpassword", permissions=["admin"])
def test_operation():
verify_permission("gtfs", test_user)
with pytest.raises(HTTPException):
verify_permission("geojson", test_user)
def test_agency_permission():
verify_permission("mvv:read", test_user)
verify_permission("mfdz:read", test_user)
verify_permission("mfdz:write", test_user)
verify_permission("ride2go:write", test_user)
with pytest.raises(HTTPException):
verify_permission("mvv:write", test_user)
verify_permission("mvv:all", test_user)
def test_admin():
verify_permission("admin", admin_user)
verify_permission("gtfs", admin_user)
verify_permission("all:all", admin_user)
verify_permission("mvv:all", admin_user)
verify_permission("mfdz:read", admin_user)
verify_permission("mfdz:write", admin_user)
verify_permission("ride2go:write", admin_user)
with pytest.raises(HTTPException):
verify_permission("admin", test_user)
verify_permission("all:all", test_user)