flip_api.utils.cognito_helpers ============================== .. py:module:: flip_api.utils.cognito_helpers Attributes ---------- .. autoapisummary:: flip_api.utils.cognito_helpers._EMAIL_VALIDATOR flip_api.utils.cognito_helpers._DEFAULT_PORTS flip_api.utils.cognito_helpers._MFA_STATE_TTL_SECONDS flip_api.utils.cognito_helpers._mfa_state_cache flip_api.utils.cognito_helpers._mfa_state_cache_lock Functions --------- .. autoapisummary:: flip_api.utils.cognito_helpers._cognito_client flip_api.utils.cognito_helpers._origin_from_url flip_api.utils.cognito_helpers.get_cors_allowed_origins flip_api.utils.cognito_helpers.get_pool_id flip_api.utils.cognito_helpers.get_user_pool_id flip_api.utils.cognito_helpers.get_cognito_users flip_api.utils.cognito_helpers._safe_email_for_cognito_filter flip_api.utils.cognito_helpers._safe_uuid_for_cognito_filter flip_api.utils.cognito_helpers.get_user_by_email_or_id flip_api.utils.cognito_helpers.get_username flip_api.utils.cognito_helpers.update_user flip_api.utils.cognito_helpers.delete_cognito_user flip_api.utils.cognito_helpers.reset_user_mfa flip_api.utils.cognito_helpers._invalidate_mfa_cache flip_api.utils.cognito_helpers.is_mfa_enabled flip_api.utils.cognito_helpers.revoke_token flip_api.utils.cognito_helpers.get_user_role_data flip_api.utils.cognito_helpers.get_all_roles flip_api.utils.cognito_helpers.validate_roles flip_api.utils.cognito_helpers.create_cognito_user flip_api.utils.cognito_helpers.filter_enabled_users Module Contents --------------- .. py:data:: _EMAIL_VALIDATOR :type: pydantic.TypeAdapter[str] .. py:function:: _cognito_client() -> Any Module-level cached cognito-idp client. boto3 clients are thread-safe and expensive to construct; sharing one avoids paying endpoint-resolution cost on every authenticated request. .. py:data:: _DEFAULT_PORTS .. py:function:: _origin_from_url(url: str) -> str | None Return ``scheme://host[:port]`` for ``url``, omitting ports that match the scheme default. Browsers strip default ports from the ``Origin`` header (RFC 6454), so an allowlist entry like ``https://localhost:443`` would never match an actual request — normalize before use. Returns ``None`` for URLs without a usable scheme/host. .. py:function:: get_cors_allowed_origins() -> list[str] Derive the CORS allowlist from the Cognito user pool client's CallbackURLs. The same Cognito app client that authenticates UI logins already enumerates the trusted UI origins per environment (see ``deploy/providers/AWS/services.tf``). Reusing it as the CORS allowlist keeps "where users can sign in" and "where the UI may call this API" in lockstep, without a separate env var. :returns: Unique normalized origins (``scheme://host[:port]``) suitable for ``CORSMiddleware(allow_origins=...)``. :rtype: list[str] .. py:function:: get_pool_id(request: fastapi.Request) -> str Extract the user pool ID from the request context. :param request: FastAPI request object :type request: Request :returns: The user pool ID extracted from the request context :rtype: str :raises HTTPException: If the user pool ID is not found .. py:function:: get_user_pool_id(request: fastapi.Request) -> str .. py:function:: get_cognito_users(params: dict[str, Any] | None = None) -> list[flip_api.domain.schemas.users.CognitoUser] Get users from Cognito user pool. :param params: Additional parameters to pass to the ListUsers API call. :type params: dict[str, Any] | None :returns: List of CognitoUser objects. :rtype: list[CognitoUser] :raises HTTPException: If there is an error fetching users from Cognito or if the user pool ID is not found. .. py:function:: _safe_email_for_cognito_filter(email: str) -> str Validate that ``email`` is safe to interpolate into a Cognito ListUsers Filter expression. Cognito's filter syntax delimits values with double quotes; an unescaped ``"`` (or backslash) in the value can break out of the quoted context and inject additional clauses. EmailStr already forbids the characters that would let an attacker do this, but rejecting them explicitly here keeps this helper safe to call from any future caller that forgets the upstream validation. .. py:function:: _safe_uuid_for_cognito_filter(user_id: str | uuid.UUID) -> str Coerce ``user_id`` to its canonical string UUID form, raising 400 if it isn't a valid UUID. A valid UUID's string form is hex+hyphen only, so once normalised it can be interpolated into Cognito's filter syntax without escaping. .. py:function:: get_user_by_email_or_id(user_pool_id: str, email: str | None = None, user_id: uuid.UUID | None = None) -> flip_api.domain.schemas.users.CognitoUser Get a user from Cognito by email or ID. :param user_pool_id: Cognito user pool ID :type user_pool_id: str :param email: User email (optional) :type email: str | None :param user_id: User ID (optional) :type user_id: UUID | None :returns: The user matching the email or ID. :rtype: CognitoUser :raises HTTPException: If neither email nor user_id is provided, or if the supplied identifier fails format validation. .. py:function:: get_username(user_id: str, user_pool_id: str) -> str Get a username from Cognito by user ID. :param user_id: User ID (sub in Cognito) :type user_id: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :returns: The username (email) associated with the user ID. :rtype: str :raises HTTPException: 400 if ``user_id`` isn't a valid UUID, 404 if no matching user, 500 on Cognito errors. .. py:function:: update_user(username: str, user_pool_id: str, disabled: bool) -> flip_api.domain.schemas.users.Disabled Enable or disable a user in Cognito. :param username: Username (email) :type username: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :param disabled: Whether to disable the user :type disabled: bool :returns: An object indicating the disabled status of the user after the update. :rtype: Disabled :raises HTTPException: If the request cannot be processed. .. py:function:: delete_cognito_user(username: str, user_pool_id: str) -> None Delete a user from Cognito. :param username: Username (email) :type username: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :returns: None :raises HTTPException: If there is an error deleting the user from Cognito. .. py:function:: reset_user_mfa(username: str, user_pool_id: str) -> None Disable a user's TOTP MFA preference and invalidate their sessions. Cognito has no admin API to delete a verified TOTP secret, so clearing the preference is the only server-side handle; the app-layer MFA gate (``verify_token`` + router guard) then funnels the user through post-auth enrolment, which mints a fresh secret. A global sign-out revokes any active refresh tokens so a pre-reset session cannot keep operating. :param username: Username (email) :type username: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :returns: None :raises HTTPException: If resetting MFA or signing the user out fails .. py:data:: _MFA_STATE_TTL_SECONDS :value: 60.0 .. py:data:: _mfa_state_cache :type: dict[tuple[str, str], tuple[bool, float]] .. py:data:: _mfa_state_cache_lock .. py:function:: _invalidate_mfa_cache(username: str, user_pool_id: str) -> None Drop the cached MFA-enabled state for a user (used after admin reset). .. py:function:: is_mfa_enabled(username: str, user_pool_id: str) -> bool Check whether a user has TOTP MFA active in Cognito. A user is considered MFA-active if SOFTWARE_TOKEN_MFA is present in their UserMFASettingList — Cognito only adds that entry after the user has both verified a software token and had their preference set with Enabled=True. Results are cached for a short TTL because ``verify_token`` calls this on every authenticated request; see ``_MFA_STATE_TTL_SECONDS``. :param username: Username (email) :type username: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :returns: True if TOTP MFA is enabled for the user, False otherwise. :rtype: bool :raises HTTPException: If the Cognito lookup fails. .. py:function:: revoke_token(refresh_token: str, client_id: str) -> None Revoke a refresh token in Cognito. :param refresh_token: Refresh token to revoke :type refresh_token: str :param client_id: Cognito app client ID :type client_id: str :returns: None :raises HTTPException: If token revocation fails .. py:function:: get_user_role_data(paging_info: flip_api.utils.paging_utils.PagingInfo, users: list[flip_api.domain.schemas.users.CognitoUser], session: sqlmodel.Session) -> list[flip_api.domain.schemas.users.IUser] Get user role data with pagination and filtering. :param paging_info: Pagination and filtering information. :type paging_info: PagingInfo :param users: List of Cognito users. :type users: list[CognitoUser] :param session: Database session. :type session: Session :returns: List of IUser objects with roles. :rtype: list[IUser] .. py:function:: get_all_roles(db: sqlmodel.Session) -> list[uuid.UUID] Get all role IDs from the database. :param db: Database session :type db: Session :returns: List of role IDs :rtype: list[UUID] .. py:function:: validate_roles(user_roles: list[uuid.UUID], roles_from_db: list[uuid.UUID]) -> None Validate that all user roles exist in the database. :param user_roles: List of role IDs to validate :type user_roles: list[UUID] :param roles_from_db: List of valid role IDs from the database :type roles_from_db: list[UUID] :returns: None :raises HTTPException: If any role is invalid .. py:function:: create_cognito_user(email: str, user_pool_id: str) -> uuid.UUID Create a new user in Cognito. :param email: User email :type email: str :param user_pool_id: Cognito user pool ID :type user_pool_id: str :returns: The ID of the created user :rtype: UUID :raises HTTPException: If user creation fails .. py:function:: filter_enabled_users(user_pool_id: str, users: list[uuid.UUID]) -> list[uuid.UUID] Filter out disabled users from a list of user IDs. :param user_pool_id: Cognito user pool ID :type user_pool_id: str :param users: List of user IDs to filter :type users: list[UUID] :returns: List of enabled user IDs :rtype: list[UUID]