Compare commits
5 commits
carpool-ev
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
797eff5b2a | ||
|
|
b9b47dfc2a | ||
|
|
a04397f59d | ||
|
|
c8acc46382 | ||
|
|
11d5849290 |
2
Jenkinsfile
vendored
2
Jenkinsfile
vendored
|
|
@ -9,7 +9,7 @@ pipeline {
|
|||
IMAGE_NAME = 'amarillo'
|
||||
AMARILLO_DISTRIBUTION = '0.2'
|
||||
TAG = "${AMARILLO_DISTRIBUTION}.${BUILD_NUMBER}"
|
||||
PLUGINS = 'amarillo-metrics amarillo-enhancer amarillo-grfs-export'
|
||||
PLUGINS = 'amarillo-metrics amarillo-enhancer amarillo-grfs-exporter'
|
||||
DEPLOY_WEBHOOK_URL = 'http://amarillo.mfdz.de:8888/mitanand'
|
||||
DEPLOY_SECRET = credentials('AMARILLO-JENKINS-DEPLOY-SECRET')
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
from typing import Optional
|
||||
from typing import Annotated, Optional, List
|
||||
from pydantic import ConfigDict, BaseModel, Field
|
||||
|
||||
class User(BaseModel):
|
||||
#TODO: add attributes admin, permissions, fullname, email
|
||||
|
||||
|
|
@ -22,6 +21,11 @@ class User(BaseModel):
|
|||
min_length=8,
|
||||
max_length=256,
|
||||
examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"])
|
||||
permissions: Optional[List[Annotated[str, Field(pattern=r'^[a-z0-9-]+(:[a-z]+)?$')]]] = Field([],
|
||||
description="The permissions of this user, a list of strings in the format <agency:operation> or <operation>",
|
||||
max_length=256,
|
||||
# pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO
|
||||
examples=["ride2go:read", "all:read", "admin", "geojson"])
|
||||
model_config = ConfigDict(json_schema_extra={
|
||||
"title": "Agency Configuration",
|
||||
"description": "Configuration for an agency.",
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@ from typing import List
|
|||
from fastapi import APIRouter, HTTPException, status, Depends
|
||||
|
||||
from amarillo.models.Carpool import Carpool, Agency
|
||||
from amarillo.routers.users import verify_permission_for_same_agency_or_admin
|
||||
from amarillo.services.oauth2 import get_current_agency
|
||||
from amarillo.models.User import User
|
||||
from amarillo.services.oauth2 import get_current_user, verify_permission
|
||||
# TODO should move this to service
|
||||
from amarillo.routers.carpool import store_carpool, delete_agency_carpools_older_than
|
||||
from amarillo.services.agencies import AgencyService
|
||||
|
|
@ -33,7 +33,7 @@ router = APIRouter(
|
|||
status.HTTP_404_NOT_FOUND: {"description": "Agency not found"},
|
||||
},
|
||||
)
|
||||
async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_agency)) -> Agency:
|
||||
async def get_agency(agency_id: str, requesting_user: User = Depends(get_current_user)) -> Agency:
|
||||
agencies: AgencyService = container['agencies']
|
||||
agency = agencies.get_agency(agency_id)
|
||||
agency_exists = agency is not None
|
||||
|
|
@ -62,8 +62,8 @@ async def get_agency(agency_id: str, admin_api_key: str = Depends(get_current_ag
|
|||
status.HTTP_500_INTERNAL_SERVER_ERROR: {
|
||||
"description": "Import error"}
|
||||
})
|
||||
async def sync(agency_id: str, requesting_agency_id: str = Depends(get_current_agency)) -> List[Carpool]:
|
||||
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
|
||||
async def sync(agency_id: str, requesting_user: User = Depends(get_current_user)) -> List[Carpool]:
|
||||
verify_permission(f"{agency_id}:sync")
|
||||
|
||||
if agency_id == "ride2go":
|
||||
import_function = import_ride2go
|
||||
|
|
|
|||
|
|
@ -5,14 +5,14 @@ import os.path
|
|||
import re
|
||||
from glob import glob
|
||||
|
||||
from fastapi import APIRouter, Body, HTTPException, status, Depends, BackgroundTasks
|
||||
from fastapi import APIRouter, Body, HTTPException, status, Depends
|
||||
from datetime import datetime
|
||||
|
||||
from amarillo.models.Carpool import Carpool
|
||||
from amarillo.routers.users import verify_permission_for_same_agency_or_admin
|
||||
from amarillo.services.oauth2 import get_current_agency
|
||||
from amarillo.models.User import User
|
||||
from amarillo.services.oauth2 import get_current_user, verify_permission
|
||||
from amarillo.tests.sampledata import examples
|
||||
from amarillo.services.hooks import run_on_create, run_on_delete
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -32,11 +32,9 @@ router = APIRouter(
|
|||
"description": "Agency does not exist"},
|
||||
|
||||
})
|
||||
async def post_carpool(background_tasks: BackgroundTasks, carpool: Carpool = Body(..., examples=examples),
|
||||
requesting_agency_id: str = Depends(get_current_agency)) -> Carpool:
|
||||
await verify_permission_for_same_agency_or_admin(carpool.agency, requesting_agency_id)
|
||||
|
||||
background_tasks.add_task(run_on_create, carpool)
|
||||
async def post_carpool(carpool: Carpool = Body(..., examples=examples),
|
||||
requesting_user: User = Depends(get_current_user)) -> Carpool:
|
||||
verify_permission(f"{carpool.agency}:write", requesting_user)
|
||||
|
||||
logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
|
||||
await assert_agency_exists(carpool.agency)
|
||||
|
|
@ -56,7 +54,9 @@ async def post_carpool(background_tasks: BackgroundTasks, carpool: Carpool = Bod
|
|||
status.HTTP_404_NOT_FOUND: {"description": "Carpool not found"},
|
||||
},
|
||||
)
|
||||
async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(get_current_agency)) -> Carpool:
|
||||
async def get_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)) -> Carpool:
|
||||
verify_permission(f"{agency_id}:read", requesting_user)
|
||||
|
||||
logger.info(f"Get trip {agency_id}:{carpool_id}.")
|
||||
await assert_agency_exists(agency_id)
|
||||
await assert_carpool_exists(agency_id, carpool_id)
|
||||
|
|
@ -75,14 +75,12 @@ async def get_carpool(agency_id: str, carpool_id: str, api_key: str = Depends(ge
|
|||
"description": "Carpool or agency not found"},
|
||||
},
|
||||
)
|
||||
async def delete_carpool(background_tasks: BackgroundTasks, agency_id: str, carpool_id: str, requesting_agency_id: str = Depends(get_current_agency)):
|
||||
await verify_permission_for_same_agency_or_admin(agency_id, requesting_agency_id)
|
||||
async def delete_carpool(agency_id: str, carpool_id: str, requesting_user: User = Depends(get_current_user)):
|
||||
verify_permission(f"{agency_id}:write", requesting_user)
|
||||
|
||||
logger.info(f"Delete trip {agency_id}:{carpool_id}.")
|
||||
await assert_agency_exists(agency_id)
|
||||
await assert_carpool_exists(agency_id, carpool_id)
|
||||
cp = await load_carpool(agency_id, carpool_id)
|
||||
background_tasks.add_task(run_on_delete, cp)
|
||||
|
||||
return await _delete_carpool(agency_id, carpool_id)
|
||||
|
||||
|
|
@ -95,6 +93,12 @@ async def _delete_carpool(agency_id: str, carpool_id: str):
|
|||
logger.info(f"Saved carpool {agency_id}:{carpool_id} in trash.")
|
||||
os.remove(f"data/carpool/{agency_id}/{carpool_id}.json")
|
||||
|
||||
try:
|
||||
from amarillo.plugins.metrics import trips_deleted_counter
|
||||
trips_deleted_counter.inc()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
async def store_carpool(carpool: Carpool) -> Carpool:
|
||||
carpool_exists = os.path.exists(f"data/carpool/{carpool.agency}/{carpool.id}.json")
|
||||
|
|
@ -102,6 +106,17 @@ async def store_carpool(carpool: Carpool) -> Carpool:
|
|||
await set_lastUpdated_if_unset(carpool)
|
||||
await save_carpool(carpool)
|
||||
|
||||
try:
|
||||
from amarillo.plugins.metrics import trips_created_counter, trips_updated_counter
|
||||
if(carpool_exists):
|
||||
# logger.info("Incrementing trips updated")
|
||||
trips_updated_counter.inc()
|
||||
else:
|
||||
# logger.info("Incrementing trips created")
|
||||
trips_created_counter.inc()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
return carpool
|
||||
|
||||
async def set_lastUpdated_if_unset(carpool):
|
||||
|
|
@ -142,6 +157,4 @@ async def delete_agency_carpools_older_than(agency_id, timestamp):
|
|||
if os.path.getmtime(carpool_file_name) < timestamp:
|
||||
m = re.search(r'([a-zA-Z0-9_-]+)\.json$', carpool_file_name)
|
||||
# TODO log deletion
|
||||
cp = await load_carpool(agency_id, m[1])
|
||||
run_on_delete(cp)
|
||||
await _delete_carpool(agency_id, m[1])
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from fastapi import APIRouter, HTTPException, status, Header, Depends
|
|||
|
||||
from amarillo.models.User import User
|
||||
from amarillo.services.users import UserService
|
||||
from amarillo.services.oauth2 import get_current_agency, verify_admin
|
||||
from amarillo.services.oauth2 import get_current_user, verify_permission
|
||||
from amarillo.services.config import config
|
||||
from amarillo.utils.container import container
|
||||
|
||||
|
|
@ -21,22 +21,6 @@ router = APIRouter(
|
|||
include_in_schema = config.env != 'PROD'
|
||||
|
||||
|
||||
# TODO Return code 403 Unauthoized (in response_status_codes as well...)
|
||||
async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body, agency_id_from_api_key):
|
||||
"""Verifies that an agency is accessing something it owns or the user is admin
|
||||
|
||||
The agency_id is part of some paths, or when not in the path it is in the body, e.g. in PUT /carpool.
|
||||
|
||||
This function encapsulates the formula 'working with own stuff, or admin'.
|
||||
"""
|
||||
is_permitted = agency_id_in_path_or_body == agency_id_from_api_key or agency_id_from_api_key == "admin"
|
||||
|
||||
if not is_permitted:
|
||||
message = f"Working with {agency_id_in_path_or_body} resources is not permitted for {agency_id_from_api_key}."
|
||||
logger.error(message)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
|
||||
|
||||
|
||||
@router.get("/",
|
||||
include_in_schema=include_in_schema,
|
||||
operation_id="getUserIdsWhichHaveAConfiguration",
|
||||
|
|
@ -44,7 +28,7 @@ async def verify_permission_for_same_agency_or_admin(agency_id_in_path_or_body,
|
|||
response_model=List[str],
|
||||
description="Returns the user_ids but not the details.",
|
||||
status_code=status.HTTP_200_OK)
|
||||
async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str]:
|
||||
async def get_user_ids(requesting_user: User = Depends(get_current_user)) -> [str]:
|
||||
return container['users'].get_user_ids()
|
||||
|
||||
|
||||
|
|
@ -52,7 +36,8 @@ async def get_user_ids(admin_api_key: str = Depends(get_current_agency)) -> [str
|
|||
include_in_schema=include_in_schema,
|
||||
operation_id="postNewUserConf",
|
||||
summary="Post a new User")
|
||||
async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_admin)):
|
||||
async def post_user_conf(user_conf: User, requesting_user: User = Depends(get_current_user)):
|
||||
verify_permission("admin", requesting_user)
|
||||
user_service: UserService = container['users']
|
||||
user_service.add(user_conf)
|
||||
|
||||
|
|
@ -64,13 +49,13 @@ async def post_user_conf(user_conf: User, admin_api_key: str = Depends(verify_ad
|
|||
summary="Delete configuration of a user. Returns true if the token for the user existed, "
|
||||
"false if it didn't exist."
|
||||
)
|
||||
async def delete_user(user_id: str, requesting_user_id: str = Depends(get_current_agency)):
|
||||
agency_may_delete_own = requesting_user_id == user_id
|
||||
admin_may_delete_everything = requesting_user_id == "admin"
|
||||
is_permitted = agency_may_delete_own or admin_may_delete_everything
|
||||
async def delete_user(user_id: str, requesting_user: User = Depends(get_current_user)):
|
||||
user_may_delete_own = requesting_user.user_id == user_id
|
||||
admin_may_delete_everything = "admin" in requesting_user.permissions
|
||||
is_permitted = user_may_delete_own or admin_may_delete_everything
|
||||
|
||||
if not is_permitted:
|
||||
message = f"The API key for {requesting_user_id} can not delete the configuration for {user_id}"
|
||||
message = f"User '{requesting_user.user_id} can not delete the configuration for {user_id}"
|
||||
logger.error(message)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,11 @@ class CarpoolService():
|
|||
if cp and self.is_outdated(cp):
|
||||
logger.info("Purge outdated offer %s", key)
|
||||
self.delete(cp.agency, cp.id)
|
||||
try:
|
||||
from amarillo.plugins.metrics import trips_deleted_counter
|
||||
trips_deleted_counter.inc()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def get(self, agency_id: str, carpool_id: str):
|
||||
return self.carpools.get(f"{agency_id}:{carpool_id}")
|
||||
|
|
|
|||
|
|
@ -1,27 +0,0 @@
|
|||
from typing import List
|
||||
from amarillo.models.Carpool import Carpool
|
||||
|
||||
class CarpoolEvents:
|
||||
def on_create(cp : Carpool):
|
||||
pass
|
||||
def on_update(cp : Carpool):
|
||||
pass
|
||||
def on_delete(cp : Carpool):
|
||||
pass
|
||||
|
||||
carpool_event_listeners : List[CarpoolEvents] = []
|
||||
|
||||
def register_carpool_event_listener(cpe : CarpoolEvents):
|
||||
carpool_event_listeners.append(cpe)
|
||||
|
||||
def run_on_create(cp: Carpool):
|
||||
for cpe in carpool_event_listeners:
|
||||
cpe.on_create(cp)
|
||||
|
||||
def run_on_update(cp: Carpool):
|
||||
for cpe in carpool_event_listeners:
|
||||
cpe.on_update(cp)
|
||||
|
||||
def run_on_delete(cp: Carpool):
|
||||
for cpe in carpool_event_listeners:
|
||||
cpe.on_delete(cp)
|
||||
|
|
@ -3,11 +3,13 @@
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Annotated, Optional, Union
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
from fastapi import Depends, HTTPException, Header, status, APIRouter
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
from jose import JWTError, jwt
|
||||
from pydantic import BaseModel
|
||||
from amarillo.models.User import User
|
||||
from amarillo.services.passwords import verify_password
|
||||
from amarillo.utils.container import container
|
||||
from amarillo.services.agencies import AgencyService
|
||||
|
|
@ -18,7 +20,7 @@ from amarillo.services.secrets import secrets
|
|||
|
||||
SECRET_KEY = secrets.secret_key
|
||||
ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 7*24*60
|
||||
|
||||
logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
|
||||
logger = logging.getLogger("main")
|
||||
|
|
@ -30,22 +32,22 @@ class Token(BaseModel):
|
|||
token_type: str
|
||||
|
||||
class TokenData(BaseModel):
|
||||
agency_id: Union[str, None] = None
|
||||
user_id: Union[str, None] = None
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
||||
async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)):
|
||||
if X_API_Key == None: return None
|
||||
return await verify_api_key(X_API_Key)
|
||||
|
||||
def authenticate_agency(agency_id: str, password: str):
|
||||
def authenticate_user(user_id: str, password: str):
|
||||
user_service : UserService = container['users']
|
||||
user_conf = user_service.user_id_to_user_conf.get(agency_id, None)
|
||||
user_conf = user_service.user_id_to_user_conf.get(user_id, None)
|
||||
if not user_conf:
|
||||
return False
|
||||
|
||||
if not verify_password(password, user_conf.password):
|
||||
return False
|
||||
return agency_id
|
||||
return user_id
|
||||
|
||||
|
||||
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
|
||||
|
|
@ -58,7 +60,8 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
|
|||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
return encoded_jwt
|
||||
|
||||
async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)):
|
||||
|
||||
async def get_current_user(token: str = Depends(oauth2_scheme), user_from_api_key: str = Depends(verify_optional_api_key)) -> User:
|
||||
if token:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
|
|
@ -67,19 +70,22 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
|
|||
)
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
agency_id: str = payload.get("sub")
|
||||
if agency_id is None:
|
||||
user_id: str = payload.get("sub")
|
||||
if user_id is None:
|
||||
raise credentials_exception
|
||||
token_data = TokenData(agency_id=agency_id)
|
||||
token_data = TokenData(user_id=user_id)
|
||||
except JWTError:
|
||||
raise credentials_exception
|
||||
user = token_data.agency_id
|
||||
if user is None:
|
||||
user_id = token_data.user_id
|
||||
if user_id is None:
|
||||
raise credentials_exception
|
||||
return user
|
||||
elif agency_from_api_key:
|
||||
logger.info(f"API Key provided: {agency_from_api_key}")
|
||||
return agency_from_api_key
|
||||
|
||||
user_service : UserService = container['users']
|
||||
return user_service.get_user(user_id)
|
||||
elif user_from_api_key:
|
||||
logger.info(f"API Key provided: {user_from_api_key}")
|
||||
user_service : UserService = container['users']
|
||||
return user_service.get_user(user_from_api_key)
|
||||
else:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
|
|
@ -88,15 +94,35 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
|
|||
)
|
||||
raise credentials_exception
|
||||
|
||||
def verify_permission(permission: str, user: User):
|
||||
|
||||
async def verify_admin(agency: str = Depends(get_current_agency)):
|
||||
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?
|
||||
if(agency != "admin"):
|
||||
message="This operation requires admin privileges"
|
||||
logger.error(message)
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
|
||||
def permissions_exception():
|
||||
return HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=f"User '{user.user_id}' does not have the permission '{permission}'",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
return "admin"
|
||||
#user is admin
|
||||
if "admin" in user.permissions: return
|
||||
|
||||
#permission is an operation
|
||||
if ":" not in permission:
|
||||
if permission not in user.permissions:
|
||||
raise permissions_exception()
|
||||
|
||||
return
|
||||
|
||||
#permission is in agency:operation format
|
||||
def permission_matches(permission, user_permission):
|
||||
prescribed_agency, prescribed_operation = permission.split(":")
|
||||
given_agency, given_operation = user_permission.split(":")
|
||||
|
||||
return (prescribed_agency == given_agency or given_agency == "all") and (prescribed_operation == given_operation or given_operation == "all")
|
||||
|
||||
if any(permission_matches(permission, p) for p in user.permissions if ":" in p): return
|
||||
|
||||
raise permissions_exception()
|
||||
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
|
|
@ -110,7 +136,7 @@ async def verify_api_key(X_API_Key: str = Header(...)):
|
|||
async def login_for_access_token(
|
||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
|
||||
) -> Token:
|
||||
agency = authenticate_agency(form_data.username, form_data.password)
|
||||
agency = authenticate_user(form_data.username, form_data.password)
|
||||
if not agency:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
|
|
@ -126,7 +152,7 @@ async def login_for_access_token(
|
|||
# TODO: eventually remove this
|
||||
@router.get("/users/me/", response_model=Agency)
|
||||
async def read_users_me(
|
||||
current_agency: Annotated[Agency, Depends(get_current_agency)]
|
||||
current_agency: Annotated[Agency, Depends(get_current_user)]
|
||||
):
|
||||
agency_service : AgencyService = container['agencies']
|
||||
return agency_service.get_agency(agency_id=current_agency)
|
||||
|
|
@ -134,6 +160,6 @@ async def read_users_me(
|
|||
# TODO: eventually remove this
|
||||
@router.get("/users/me/items/")
|
||||
async def read_own_items(
|
||||
current_agency: Annotated[str, Depends(get_current_agency)]
|
||||
current_agency: Annotated[str, Depends(get_current_user)]
|
||||
):
|
||||
return [{"item_id": "Foo", "owner": current_agency}]
|
||||
|
|
|
|||
37
amarillo/tests/test_permissions.py
Normal file
37
amarillo/tests/test_permissions.py
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
from fastapi import HTTPException
|
||||
import pytest
|
||||
from amarillo.services.oauth2 import verify_permission
|
||||
from amarillo.models.User import User
|
||||
|
||||
test_user = User(user_id="test", password="testpassword", permissions=["all:read", "mfdz:write", "ride2go:all", "gtfs"])
|
||||
admin_user = User(user_id="admin", password="testpassword", permissions=["admin"])
|
||||
|
||||
def test_operation():
|
||||
verify_permission("gtfs", test_user)
|
||||
|
||||
with pytest.raises(HTTPException):
|
||||
verify_permission("geojson", test_user)
|
||||
|
||||
def test_agency_permission():
|
||||
verify_permission("mvv:read", test_user)
|
||||
verify_permission("mfdz:read", test_user)
|
||||
verify_permission("mfdz:write", test_user)
|
||||
verify_permission("ride2go:write", test_user)
|
||||
|
||||
with pytest.raises(HTTPException):
|
||||
verify_permission("mvv:write", test_user)
|
||||
verify_permission("mvv:all", test_user)
|
||||
|
||||
|
||||
def test_admin():
|
||||
verify_permission("admin", admin_user)
|
||||
verify_permission("gtfs", admin_user)
|
||||
verify_permission("all:all", admin_user)
|
||||
verify_permission("mvv:all", admin_user)
|
||||
verify_permission("mfdz:read", admin_user)
|
||||
verify_permission("mfdz:write", admin_user)
|
||||
verify_permission("ride2go:write", admin_user)
|
||||
|
||||
with pytest.raises(HTTPException):
|
||||
verify_permission("admin", test_user)
|
||||
verify_permission("all:all", test_user)
|
||||
Loading…
Reference in a new issue