User email confirmation

Creating the confirmation token

Want more?

This lesson for enrolled students only. Join the course to unlock it!

You can see the code changes implemented in this lecture below.

If you have purchased the course in a different platform, you still have access to the code changes per lecture here on Teclado. The lecture video and lecture notes remain locked.
Join course for $30

Modified files

storeapi/security.py
--- 
+++ 
@@ -27,12 +27,26 @@
     return 30


+def confirm_token_expire_minutes() -> int:
+    return 1440
+
+
 def create_access_token(email: str):
     logger.debug("Creating access token", extra={"email": email})
     expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
         minutes=access_token_expire_minutes()
     )
-    jwt_data = {"sub": email, "exp": expire}
+    jwt_data = {"sub": email, "exp": expire, "type": "access"}
+    encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM)
+    return encoded_jwt
+
+
+def create_confirmation_token(email: str):
+    logger.debug("Creating confirmation token", extra={"email": email})
+    expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
+        minutes=confirm_token_expire_minutes()
+    )
+    jwt_data = {"sub": email, "exp": expire, "type": "confirmation"}
     encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM)
     return encoded_jwt

@@ -69,6 +83,10 @@
         email = payload.get("sub")
         if email is None:
             raise credentials_exception
+        
+        type = payload.get("type")
+        if type is None or type != "access":
+            raise credentials_exception  
     except ExpiredSignatureError as e:
         raise HTTPException(
             status_code=status.HTTP_401_UNAUTHORIZED,
storeapi/tests/test_security.py
--- 
+++ 
@@ -8,9 +8,20 @@
     assert security.access_token_expire_minutes() == 30


+def test_confirm_token_expire_minutes():
+    assert security.confirm_token_expire_minutes() == 1440
+
+
 def test_create_access_token():
     token = security.create_access_token("123")
-    assert {"sub": "123"}.items() <= jwt.decode(
+    assert {"sub": "123", "type": "access"}.items() <= jwt.decode(
+        token, key=security.SECRET_KEY, algorithms=[security.ALGORITHM]
+    ).items()
+
+
+def test_create_confirmation_token():
+    token = security.create_confirmation_token("123")
+    assert {"sub": "123", "type": "confirmation"}.items() <= jwt.decode(
         token, key=security.SECRET_KEY, algorithms=[security.ALGORITHM]
     ).items()

@@ -64,3 +75,10 @@
 async def test_get_current_user_invalid_token():
     with pytest.raises(security.HTTPException):
         await security.get_current_user("invalid token")
+
+
+@pytest.mark.anyio
+async def test_get_current_user_wrong_type_token(registered_user: dict):
+    token = security.create_confirmation_token(registered_user["email"])
+    with pytest.raises(security.HTTPException):
+        await security.get_current_user(token)