Modified files
storeapi/security.py
---
+++
@@ -2,7 +2,7 @@
import logging
from fastapi import HTTPException, status
-from jose import jwt
+from jose import ExpiredSignatureError, JWTError, jwt
from passlib.context import CryptContext
from storeapi.database import database, user_table
@@ -58,3 +58,23 @@
if not verify_password(password, user.password):
raise credentials_exception
return user
+
+
+async def get_current_user(token: str):
+ try:
+ payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+ email = payload.get("sub")
+ if email is None:
+ raise credentials_exception
+ except ExpiredSignatureError as e:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Token has expired",
+ headers={"WWW-Authenticate": "Bearer"},
+ ) from e
+ except JWTError as e:
+ raise credentials_exception from e
+ user = await get_user(email=email)
+ if user is None:
+ raise credentials_exception
+ return user
storeapi/tests/test_security.py
---
+++
@@ -49,3 +49,16 @@
async def test_authenticate_user_wrong_password(registered_user: dict):
with pytest.raises(security.HTTPException):
await security.authenticate_user(registered_user["email"], "wrong password")
+
+
+@pytest.mark.anyio
+async def test_get_current_user(registered_user: dict):
+ token = security.create_access_token(registered_user["email"])
+ user = await security.get_current_user(token)
+ assert user.email == registered_user["email"]
+
+
+@pytest.mark.anyio
+async def test_get_current_user_invalid_token():
+ with pytest.raises(security.HTTPException):
+ await security.get_current_user("invalid token")