Get current user function
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit
Some checks failed
Amarillo/amarillo-gitea/amarillo-core/pipeline/head There was a failure building this commit
This commit is contained in:
parent
66cc746937
commit
11d5849290
|
|
@ -1,5 +1,7 @@
|
||||||
from typing import Optional
|
from typing import Annotated, Optional, List, Union
|
||||||
from pydantic import ConfigDict, BaseModel, Field
|
from pydantic import ConfigDict, BaseModel, Field, constr
|
||||||
|
|
||||||
|
MyUrlsType = constr(regex="^[a-z]$")
|
||||||
|
|
||||||
class User(BaseModel):
|
class User(BaseModel):
|
||||||
#TODO: add attributes admin, permissions, fullname, email
|
#TODO: add attributes admin, permissions, fullname, email
|
||||||
|
|
@ -22,6 +24,11 @@ class User(BaseModel):
|
||||||
min_length=8,
|
min_length=8,
|
||||||
max_length=256,
|
max_length=256,
|
||||||
examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"])
|
examples=["$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"])
|
||||||
|
permissions: Optional[List[str]] = Field([],
|
||||||
|
description="The permissions of this user, a list of strings in the format <agency:operation> or <operation>",
|
||||||
|
max_length=256,
|
||||||
|
# pattern=r'^[a-zA-Z0-9]+(:[a-zA-Z]+)?$', #TODO
|
||||||
|
examples=["ride2go:read", "all:read", "admin", "geojson"])
|
||||||
model_config = ConfigDict(json_schema_extra={
|
model_config = ConfigDict(json_schema_extra={
|
||||||
"title": "Agency Configuration",
|
"title": "Agency Configuration",
|
||||||
"description": "Configuration for an agency.",
|
"description": "Configuration for an agency.",
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ from fastapi import Depends, HTTPException, Header, status, APIRouter
|
||||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||||
from jose import JWTError, jwt
|
from jose import JWTError, jwt
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
from amarillo.models.User import User
|
||||||
from amarillo.services.passwords import verify_password
|
from amarillo.services.passwords import verify_password
|
||||||
from amarillo.utils.container import container
|
from amarillo.utils.container import container
|
||||||
from amarillo.services.agencies import AgencyService
|
from amarillo.services.agencies import AgencyService
|
||||||
|
|
@ -30,6 +31,7 @@ class Token(BaseModel):
|
||||||
token_type: str
|
token_type: str
|
||||||
|
|
||||||
class TokenData(BaseModel):
|
class TokenData(BaseModel):
|
||||||
|
#TODO: rename to user_id
|
||||||
agency_id: Union[str, None] = None
|
agency_id: Union[str, None] = None
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
||||||
|
|
@ -58,7 +60,13 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
|
||||||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||||
return encoded_jwt
|
return encoded_jwt
|
||||||
|
|
||||||
|
#TODO: function verify_permission(user, permission)
|
||||||
|
|
||||||
|
#TODO: rename to get_current_user, agency_from_api_key -> user_from_api_key
|
||||||
async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)):
|
async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)):
|
||||||
|
return (await get_current_user(token, agency_from_api_key)).user_id
|
||||||
|
|
||||||
|
async def get_current_user(token: str = Depends(oauth2_scheme), agency_from_api_key: str = Depends(verify_optional_api_key)) -> User:
|
||||||
if token:
|
if token:
|
||||||
credentials_exception = HTTPException(
|
credentials_exception = HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
|
@ -73,13 +81,16 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
|
||||||
token_data = TokenData(agency_id=agency_id)
|
token_data = TokenData(agency_id=agency_id)
|
||||||
except JWTError:
|
except JWTError:
|
||||||
raise credentials_exception
|
raise credentials_exception
|
||||||
user = token_data.agency_id
|
user_id = token_data.agency_id
|
||||||
if user is None:
|
if user_id is None:
|
||||||
raise credentials_exception
|
raise credentials_exception
|
||||||
return user
|
|
||||||
|
user_service : UserService = container['users']
|
||||||
|
return user_service.get_user(user_id)
|
||||||
elif agency_from_api_key:
|
elif agency_from_api_key:
|
||||||
logger.info(f"API Key provided: {agency_from_api_key}")
|
logger.info(f"API Key provided: {agency_from_api_key}")
|
||||||
return agency_from_api_key
|
user_service : UserService = container['users']
|
||||||
|
return user_service.get_user(agency_from_api_key)
|
||||||
else:
|
else:
|
||||||
credentials_exception = HTTPException(
|
credentials_exception = HTTPException(
|
||||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
|
@ -88,6 +99,18 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
|
||||||
)
|
)
|
||||||
raise credentials_exception
|
raise credentials_exception
|
||||||
|
|
||||||
|
# TODO: use verify_permission("admin", user)
|
||||||
|
|
||||||
|
def verify_permission(permission: str, user: User):
|
||||||
|
# permission_exception =
|
||||||
|
|
||||||
|
if user.permissions is None or permission not in user.permissions: raise HTTPException(
|
||||||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
detail=f"User '{user}' does not have the permission '{permission}'",
|
||||||
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def verify_admin(agency: str = Depends(get_current_agency)):
|
async def verify_admin(agency: str = Depends(get_current_agency)):
|
||||||
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?
|
#TODO: maybe separate error for when admin credentials are invalid vs valid but not admin?
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue