From c8acc46382e77239c1f44213827d2bc621d6a492 Mon Sep 17 00:00:00 2001 From: Francia Csaba Date: Mon, 22 Apr 2024 12:49:13 +0200 Subject: [PATCH] verify_permission function --- amarillo/models/User.py | 9 +++----- amarillo/services/oauth2.py | 31 ++++++++++++++++++++----- amarillo/tests/test_permissions.py | 36 ++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 amarillo/tests/test_permissions.py diff --git a/amarillo/models/User.py b/amarillo/models/User.py index fa9a358..d94eacf 100644 --- a/amarillo/models/User.py +++ b/amarillo/models/User.py @@ -1,8 +1,5 @@ -from typing import Annotated, Optional, List, Union -from pydantic import ConfigDict, BaseModel, Field, constr - -MyUrlsType = constr(regex="^[a-z]$") - +from typing import Annotated, Optional, List +from pydantic import ConfigDict, BaseModel, Field class User(BaseModel): #TODO: add attributes admin, permissions, fullname, email @@ -24,7 +21,7 @@ class User(BaseModel): min_length=8, max_length=256, examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"]) - permissions: Optional[List[str]] = Field([], + permissions: Optional[List[Annotated[str, Field(pattern=r'^[a-z0-9]+(:[a-z]+)?$')]]] = Field([], description="The permissions of this user, a list of strings in the format or ", max_length=256, # pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO diff --git a/amarillo/services/oauth2.py b/amarillo/services/oauth2.py index b71ffdc..c63b458 100644 --- a/amarillo/services/oauth2.py +++ b/amarillo/services/oauth2.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta, timezone from typing import Annotated, Optional, Union import logging +import logging.config from fastapi import Depends, HTTPException, Header, status, APIRouter from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm @@ -60,7 +61,6 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt -#TODO: function verify_permission(user, permission) #TODO: rename to get_current_user, agency_from_api_key -> user_from_api_key async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)): @@ -101,15 +101,36 @@ async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_ # TODO: use verify_permission("admin", user) -def verify_permission(permission: str, user: User): - # permission_exception = - if user.permissions is None or permission not in user.permissions: raise HTTPException( +def verify_permission(permission: str, user: User): + + def permissions_exception(): + return HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail=f"User '{user}' does not have the permission '{permission}'", + detail=f"User '{user.user_id}' does not have the permission '{permission}'", headers={"WWW-Authenticate": "Bearer"}, ) + #user is admin + if "admin" in user.permissions: return + + #permission is an operation + if ":" not in permission: + if permission not in user.permissions: + raise permissions_exception() + + return + + def permission_matches(permission, user_permission): + prescribed_agency, prescribed_operation = permission.split(":") + given_agency, given_operation = user_permission.split(":") + + return (prescribed_agency == given_agency or given_agency == "all") and (prescribed_operation == given_operation or given_operation == "all") + + if any(permission_matches(permission, p) for p in user.permissions if ":" in p): return + + raise permissions_exception() + async def verify_admin(agency: str = Depends(get_current_agency)): diff --git a/amarillo/tests/test_permissions.py b/amarillo/tests/test_permissions.py new file mode 100644 index 0000000..7ae4fed --- /dev/null +++ b/amarillo/tests/test_permissions.py @@ -0,0 +1,36 @@ +from fastapi import HTTPException +import pytest +from amarillo.services.oauth2 import verify_permission +from amarillo.models.User import User + +test_user = User(user_id="test", password="testpassword", permissions=["all:read", "mfdz:write", "ride2go:all", "metrics"]) +admin_user = User(user_id="admin", password="testpassword", permissions=["admin"]) + +def test_operation(): + verify_permission("metrics", test_user) + + with pytest.raises(HTTPException): + verify_permission("geojson", test_user) + +def test_agency_permission(): + verify_permission("mvv:read", test_user) + verify_permission("mfdz:read", test_user) + verify_permission("mfdz:write", test_user) + verify_permission("ride2go:write", test_user) + + with pytest.raises(HTTPException): + verify_permission("mvv:write", test_user) + verify_permission("mvv:all", test_user) + + +def test_admin(): + verify_permission("admin", admin_user) + verify_permission("all:all", admin_user) + verify_permission("mvv:all", admin_user) + verify_permission("mfdz:read", admin_user) + verify_permission("mfdz:write", admin_user) + verify_permission("ride2go:write", admin_user) + + with pytest.raises(HTTPException): + verify_permission("admin", test_user) + verify_permission("all:all", test_user) \ No newline at end of file