User authentication with FastAPI

Generate the access token

Want more?

This lesson for enrolled students only. Join the course to unlock it!

You can see the code changes implemented in this lecture below.

If you have purchased the course in a different platform, you still have access to the code changes per lecture here on Teclado. The lecture video and lecture notes remain locked.
Join course for $30

Modified files

storeapi/security.py
--- 
+++ 
@@ -1,11 +1,37 @@
+import datetime
 import logging

+from fastapi import HTTPException, status
+from jose import jwt
 from passlib.context import CryptContext
 from storeapi.database import database, user_table

 logger = logging.getLogger(__name__)

+SECRET_KEY = "9b73f2a1bdd7ae163444473d29a6885ffa22ab26117068f72a5a56a74d12d1fc"
+ALGORITHM = "HS256"
+
 pwd_context = CryptContext(schemes=["bcrypt"])
+
+credentials_exception = HTTPException(
+    status_code=status.HTTP_401_UNAUTHORIZED,
+    detail="Could not validate credentials",
+    headers={"WWW-Authenticate": "Bearer"},  # Tells client to authenticate with JWT
+)
+
+
+def access_token_expire_minutes() -> int:
+    return 30
+
+
+def create_access_token(email: str):
+    logger.debug("Creating access token", extra={"email": email})
+    expire = datetime.datetime.utcnow() + datetime.timedelta(
+        minutes=access_token_expire_minutes()
+    )
+    jwt_data = {"sub": email, "exp": expire}
+    encoded_jwt = jwt.encode(jwt_data, SECRET_KEY, algorithm=ALGORITHM)
+    return encoded_jwt


 def get_password_hash(password: str) -> str:
@@ -22,3 +48,13 @@
     result = await database.fetch_one(query)
     if result:
         return result
+
+
+async def authenticate_user(email: str, password: str):
+    logger.debug("Authenticating user", extra={"email": email})
+    user = await get_user(email)
+    if not user:
+        raise credentials_exception
+    if not verify_password(password, user.password):
+        raise credentials_exception
+    return user
storeapi/routers/user.py
--- 
+++ 
@@ -3,7 +3,12 @@
 from fastapi import APIRouter, HTTPException, status
 from storeapi.database import database, user_table
 from storeapi.models.user import UserIn
-from storeapi.security import get_password_hash, get_user
+from storeapi.security import (
+    authenticate_user,
+    create_access_token,
+    get_password_hash,
+    get_user,
+)

 logger = logging.getLogger(__name__)
 router = APIRouter()
@@ -23,3 +28,10 @@

     await database.execute(query)
     return {"detail": "User created."}
+
+
+@router.post("/token")
+async def login(user: UserIn):
+    user = await authenticate_user(user.email, user.password)
+    access_token = create_access_token(user.email)
+    return {"access_token": access_token, "token_type": "bearer"}
storeapi/tests/test_security.py
--- 
+++ 
@@ -1,5 +1,17 @@
 import pytest
+from jose import jwt
 from storeapi import security
+
+
+def test_access_token_expire_minutes():
+    assert security.access_token_expire_minutes() == 30
+
+
+def test_create_access_token():
+    token = security.create_access_token("123")
+    assert {"sub": "123"}.items() <= jwt.decode(
+        token, security.SECRET_KEY, algorithms=[security.ALGORITHM]
+    ).items()


 def test_password_hashes():
@@ -17,3 +29,23 @@
 async def test_get_user_not_found():
     user = await security.get_user("test@example.com")
     assert user is None
+
+
+@pytest.mark.anyio
+async def test_authenticate_user(registered_user: dict):
+    user = await security.authenticate_user(
+        registered_user["email"], registered_user["password"]
+    )
+    assert user.email == registered_user["email"]
+
+
+@pytest.mark.anyio
+async def test_authenticate_user_not_found():
+    with pytest.raises(security.HTTPException):
+        await security.authenticate_user("test@example.com", "1234")
+
+
+@pytest.mark.anyio
+async def test_authenticate_user_wrong_password(registered_user: dict):
+    with pytest.raises(security.HTTPException):
+        await security.authenticate_user(registered_user["email"], "wrong password")
storeapi/tests/routers/test_user.py
--- 
+++ 
@@ -24,3 +24,23 @@
     )
     assert response.status_code == 400
     assert "already exists" in response.json()["detail"]
+
+
+@pytest.mark.anyio
+async def test_login_user_not_exists(async_client: AsyncClient):
+    response = await async_client.post(
+        "/token", json={"email": "test@example.net", "password": "1234"}
+    )
+    assert response.status_code == 401
+
+
+@pytest.mark.anyio
+async def test_login_user(async_client: AsyncClient, registered_user: dict):
+    response = await async_client.post(
+        "/token",
+        json={
+            "email": registered_user["email"],
+            "password": registered_user["password"],
+        },
+    )
+    assert response.status_code == 200