From bbba8de7aca0f12d37ea40eaf95d174980c5112f Mon Sep 17 00:00:00 2001 From: Francia Csaba Date: Wed, 28 Feb 2024 10:53:05 +0100 Subject: [PATCH] [#4] Verify password from agencyconf --- amarillo/services/oauth2.py | 66 +++++++++++-------------------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/amarillo/services/oauth2.py b/amarillo/services/oauth2.py index 6134db0..679722f 100644 --- a/amarillo/services/oauth2.py +++ b/amarillo/services/oauth2.py @@ -10,6 +10,10 @@ from jose import JWTError, jwt from pydantic import BaseModel from amarillo.routers.agencyconf import verify_api_key from amarillo.services.passwords import verify_password +from amarillo.utils.container import container +from amarillo.services.agencies import AgencyService +from amarillo.services.agencyconf import AgencyConfService +from amarillo.models.Carpool import Agency from amarillo.services.config import config @@ -20,63 +24,30 @@ ACCESS_TOKEN_EXPIRE_MINUTES = 30 logging.config.fileConfig('logging.conf', disable_existing_loggers=False) logger = logging.getLogger("main") -# TODO: use agencyconf for saving the hashed password -fake_users_db = { - "johndoe": { - "username": "johndoe", - "full_name": "John Doe", - "email": "johndoe@example.com", - "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", - "disabled": False, - }, - "admin": { - "username": "admin", - "full_name": "Administrator", - "email": "admin@example.com", - "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", - "disabled": False, - } -} - router = APIRouter() - class Token(BaseModel): access_token: str token_type: str - class TokenData(BaseModel): agency_id: Union[str, None] = None - -class User(BaseModel): - username: str - email: Union[str, None] = None - full_name: Union[str, None] = None - disabled: Union[bool, None] = None - - -class UserInDB(User): - hashed_password: str - oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)): if X_API_Key == None: return None return await verify_api_key(X_API_Key) -def get_agency(db, agency_id: str): - if agency_id in db: - user_dict = db[agency_id] - return UserInDB(**user_dict) +def authenticate_agency(agency_id: str, password: str): + agency_conf_service : AgencyConfService = container['agencyconf'] + agency_conf = agency_conf_service.agency_id_to_agency_conf[agency_id] + if not agency_conf: + return False -def authenticate_agency(fake_db, username: str, password: str): - agency = get_agency(fake_db, username) - if not agency: + agency_password = agency_conf.password + if not verify_password(password, agency_password): return False - if not verify_password(password, agency.hashed_password): - return False - return agency + return agency_id def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): @@ -123,7 +94,7 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap async def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ) -> Token: - agency = authenticate_agency(fake_users_db, form_data.username, form_data.password) + agency = authenticate_agency(form_data.username, form_data.password) if not agency: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, @@ -132,20 +103,21 @@ async def login_for_access_token( ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( - data={"sub": agency.username}, expires_delta=access_token_expires + data={"sub": agency}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") # TODO: eventually remove this -@router.get("/users/me/", response_model=User) +@router.get("/users/me/", response_model=Agency) async def read_users_me( - current_agency: Annotated[User, Depends(get_current_agency)] + current_agency: Annotated[Agency, Depends(get_current_agency)] ): - return get_agency(fake_users_db, agency_id=current_agency) + agency_service : AgencyService = container['agencies'] + return agency_service.get_agency(agency_id=current_agency) # TODO: eventually remove this @router.get("/users/me/items/") async def read_own_items( - current_agency: Annotated[User, Depends(get_current_agency)] + current_agency: Annotated[str, Depends(get_current_agency)] ): return [{"item_id": "Foo", "owner": current_agency}]