Modified files
storeapi/security.py
---
+++
@@ -1,11 +1,37 @@
+import datetime
import logging
+from fastapi import HTTPException, status
+from jose import jwt
from passlib.context import CryptContext
from storeapi.database import database, user_table
logger = logging.getLogger(__name__)
+SECRET_KEY = "9b73f2a1bdd7ae163444473d29a6885ffa22ab26117068f72a5a56a74d12d1fc"
+ALGORITHM = "HS256"
+
pwd_context = CryptContext(schemes=["bcrypt"])
+
+credentials_exception = HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Could not validate credentials",
+ headers={"WWW-Authenticate": "Bearer"}, # Tells client to authenticate with JWT
+)
+
+
+def access_token_expire_minutes() -> int:
+ return 30
+
+
+def create_access_token(email: str):
+ logger.debug("Creating access token", extra={"email": email})
+ expire = datetime.datetime.utcnow() + datetime.timedelta(
+ minutes=access_token_expire_minutes()
+ )
+ jwt_data = {"sub": email, "exp": expire}
+ encoded_jwt = jwt.encode(jwt_data, SECRET_KEY, algorithm=ALGORITHM)
+ return encoded_jwt
def get_password_hash(password: str) -> str:
@@ -22,3 +48,13 @@
result = await database.fetch_one(query)
if result:
return result
+
+
+async def authenticate_user(email: str, password: str):
+ logger.debug("Authenticating user", extra={"email": email})
+ user = await get_user(email)
+ if not user:
+ raise credentials_exception
+ if not verify_password(password, user.password):
+ raise credentials_exception
+ return user
storeapi/routers/user.py
---
+++
@@ -3,7 +3,12 @@
from fastapi import APIRouter, HTTPException, status
from storeapi.database import database, user_table
from storeapi.models.user import UserIn
-from storeapi.security import get_password_hash, get_user
+from storeapi.security import (
+ authenticate_user,
+ create_access_token,
+ get_password_hash,
+ get_user,
+)
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -23,3 +28,10 @@
await database.execute(query)
return {"detail": "User created."}
+
+
+@router.post("/token")
+async def login(user: UserIn):
+ user = await authenticate_user(user.email, user.password)
+ access_token = create_access_token(user.email)
+ return {"access_token": access_token, "token_type": "bearer"}
storeapi/tests/test_security.py
---
+++
@@ -1,5 +1,17 @@
import pytest
+from jose import jwt
from storeapi import security
+
+
+def test_access_token_expire_minutes():
+ assert security.access_token_expire_minutes() == 30
+
+
+def test_create_access_token():
+ token = security.create_access_token("123")
+ assert {"sub": "123"}.items() <= jwt.decode(
+ token, security.SECRET_KEY, algorithms=[security.ALGORITHM]
+ ).items()
def test_password_hashes():
@@ -17,3 +29,23 @@
async def test_get_user_not_found():
user = await security.get_user("test@example.com")
assert user is None
+
+
+@pytest.mark.anyio
+async def test_authenticate_user(registered_user: dict):
+ user = await security.authenticate_user(
+ registered_user["email"], registered_user["password"]
+ )
+ assert user.email == registered_user["email"]
+
+
+@pytest.mark.anyio
+async def test_authenticate_user_not_found():
+ with pytest.raises(security.HTTPException):
+ await security.authenticate_user("test@example.com", "1234")
+
+
+@pytest.mark.anyio
+async def test_authenticate_user_wrong_password(registered_user: dict):
+ with pytest.raises(security.HTTPException):
+ await security.authenticate_user(registered_user["email"], "wrong password")
storeapi/tests/routers/test_user.py
---
+++
@@ -24,3 +24,23 @@
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
+
+
+@pytest.mark.anyio
+async def test_login_user_not_exists(async_client: AsyncClient):
+ response = await async_client.post(
+ "/token", json={"email": "test@example.net", "password": "1234"}
+ )
+ assert response.status_code == 401
+
+
+@pytest.mark.anyio
+async def test_login_user(async_client: AsyncClient, registered_user: dict):
+ response = await async_client.post(
+ "/token",
+ json={
+ "email": registered_user["email"],
+ "password": registered_user["password"],
+ },
+ )
+ assert response.status_code == 200