[#4] Verify password from agencyconf

This commit is contained in:
Csaba 2024-02-28 10:53:05 +01:00
parent c03d5b1232
commit bbba8de7ac

View file

@ -10,6 +10,10 @@ from jose import JWTError, jwt
from pydantic import BaseModel from pydantic import BaseModel
from amarillo.routers.agencyconf import verify_api_key from amarillo.routers.agencyconf import verify_api_key
from amarillo.services.passwords import verify_password from amarillo.services.passwords import verify_password
from amarillo.utils.container import container
from amarillo.services.agencies import AgencyService
from amarillo.services.agencyconf import AgencyConfService
from amarillo.models.Carpool import Agency
from amarillo.services.config import config from amarillo.services.config import config
@ -20,63 +24,30 @@ ACCESS_TOKEN_EXPIRE_MINUTES = 30
logging.config.fileConfig('logging.conf', disable_existing_loggers=False) logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
logger = logging.getLogger("main") logger = logging.getLogger("main")
# TODO: use agencyconf for saving the hashed password
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"admin": {
"username": "admin",
"full_name": "Administrator",
"email": "admin@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
router = APIRouter() router = APIRouter()
class Token(BaseModel): class Token(BaseModel):
access_token: str access_token: str
token_type: str token_type: str
class TokenData(BaseModel): class TokenData(BaseModel):
agency_id: Union[str, None] = None agency_id: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)): async def verify_optional_api_key(X_API_Key: Optional[str] = Header(None)):
if X_API_Key == None: return None if X_API_Key == None: return None
return await verify_api_key(X_API_Key) return await verify_api_key(X_API_Key)
def get_agency(db, agency_id: str): def authenticate_agency(agency_id: str, password: str):
if agency_id in db: agency_conf_service : AgencyConfService = container['agencyconf']
user_dict = db[agency_id] agency_conf = agency_conf_service.agency_id_to_agency_conf[agency_id]
return UserInDB(**user_dict) if not agency_conf:
return False
def authenticate_agency(fake_db, username: str, password: str): agency_password = agency_conf.password
agency = get_agency(fake_db, username) if not verify_password(password, agency_password):
if not agency:
return False return False
if not verify_password(password, agency.hashed_password): return agency_id
return False
return agency
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
@ -123,7 +94,7 @@ async def get_current_agency(token: str = Depends(oauth2_scheme), agency_from_ap
async def login_for_access_token( async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()] form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token: ) -> Token:
agency = authenticate_agency(fake_users_db, form_data.username, form_data.password) agency = authenticate_agency(form_data.username, form_data.password)
if not agency: if not agency:
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -132,20 +103,21 @@ async def login_for_access_token(
) )
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token( access_token = create_access_token(
data={"sub": agency.username}, expires_delta=access_token_expires data={"sub": agency}, expires_delta=access_token_expires
) )
return Token(access_token=access_token, token_type="bearer") return Token(access_token=access_token, token_type="bearer")
# TODO: eventually remove this # TODO: eventually remove this
@router.get("/users/me/", response_model=User) @router.get("/users/me/", response_model=Agency)
async def read_users_me( async def read_users_me(
current_agency: Annotated[User, Depends(get_current_agency)] current_agency: Annotated[Agency, Depends(get_current_agency)]
): ):
return get_agency(fake_users_db, agency_id=current_agency) agency_service : AgencyService = container['agencies']
return agency_service.get_agency(agency_id=current_agency)
# TODO: eventually remove this # TODO: eventually remove this
@router.get("/users/me/items/") @router.get("/users/me/items/")
async def read_own_items( async def read_own_items(
current_agency: Annotated[User, Depends(get_current_agency)] current_agency: Annotated[str, Depends(get_current_agency)]
): ):
return [{"item_id": "Foo", "owner": current_agency}] return [{"item_id": "Foo", "owner": current_agency}]