Modified files
storeapi/security.py
---
+++
@@ -27,12 +27,26 @@
return 30
+def confirm_token_expire_minutes() -> int:
+ return 1440
+
+
def create_access_token(email: str):
logger.debug("Creating access token", extra={"email": email})
expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
minutes=access_token_expire_minutes()
)
- jwt_data = {"sub": email, "exp": expire}
+ jwt_data = {"sub": email, "exp": expire, "type": "access"}
+ encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM)
+ return encoded_jwt
+
+
+def create_confirmation_token(email: str):
+ logger.debug("Creating confirmation token", extra={"email": email})
+ expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
+ minutes=confirm_token_expire_minutes()
+ )
+ jwt_data = {"sub": email, "exp": expire, "type": "confirmation"}
encoded_jwt = jwt.encode(jwt_data, key=SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@@ -69,6 +83,10 @@
email = payload.get("sub")
if email is None:
raise credentials_exception
+
+ type = payload.get("type")
+ if type is None or type != "access":
+ raise credentials_exception
except ExpiredSignatureError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
storeapi/tests/test_security.py
---
+++
@@ -8,9 +8,20 @@
assert security.access_token_expire_minutes() == 30
+def test_confirm_token_expire_minutes():
+ assert security.confirm_token_expire_minutes() == 1440
+
+
def test_create_access_token():
token = security.create_access_token("123")
- assert {"sub": "123"}.items() <= jwt.decode(
+ assert {"sub": "123", "type": "access"}.items() <= jwt.decode(
+ token, key=security.SECRET_KEY, algorithms=[security.ALGORITHM]
+ ).items()
+
+
+def test_create_confirmation_token():
+ token = security.create_confirmation_token("123")
+ assert {"sub": "123", "type": "confirmation"}.items() <= jwt.decode(
token, key=security.SECRET_KEY, algorithms=[security.ALGORITHM]
).items()
@@ -64,3 +75,10 @@
async def test_get_current_user_invalid_token():
with pytest.raises(security.HTTPException):
await security.get_current_user("invalid token")
+
+
+@pytest.mark.anyio
+async def test_get_current_user_wrong_type_token(registered_user: dict):
+ token = security.create_confirmation_token(registered_user["email"])
+ with pytest.raises(security.HTTPException):
+ await security.get_current_user(token)