User authentication with FastAPI

Retrieve the current user with their token

Want more?

This lesson for enrolled students only. Join the course to unlock it!

You can see the code changes implemented in this lecture below.

If you have purchased the course in a different platform, you still have access to the code changes per lecture here on Teclado. The lecture video and lecture notes remain locked.
Join course for $30

Modified files

storeapi/security.py
--- 
+++ 
@@ -2,7 +2,7 @@
 import logging

 from fastapi import HTTPException, status
-from jose import jwt
+from jose import ExpiredSignatureError, JWTError, jwt
 from passlib.context import CryptContext
 from storeapi.database import database, user_table

@@ -58,3 +58,23 @@
     if not verify_password(password, user.password):
         raise credentials_exception
     return user
+
+
+async def get_current_user(token: str):
+    try:
+        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+        email = payload.get("sub")
+        if email is None:
+            raise credentials_exception
+    except ExpiredSignatureError as e:
+        raise HTTPException(
+            status_code=status.HTTP_401_UNAUTHORIZED,
+            detail="Token has expired",
+            headers={"WWW-Authenticate": "Bearer"},
+        ) from e
+    except JWTError as e:
+        raise credentials_exception from e
+    user = await get_user(email=email)
+    if user is None:
+        raise credentials_exception
+    return user
storeapi/tests/test_security.py
--- 
+++ 
@@ -49,3 +49,16 @@
 async def test_authenticate_user_wrong_password(registered_user: dict):
     with pytest.raises(security.HTTPException):
         await security.authenticate_user(registered_user["email"], "wrong password")
+
+
+@pytest.mark.anyio
+async def test_get_current_user(registered_user: dict):
+    token = security.create_access_token(registered_user["email"])
+    user = await security.get_current_user(token)
+    assert user.email == registered_user["email"]
+
+
+@pytest.mark.anyio
+async def test_get_current_user_invalid_token():
+    with pytest.raises(security.HTTPException):
+        await security.get_current_user("invalid token")