diff --git a/src/fastapi_cloud_cli/commands/whoami.py b/src/fastapi_cloud_cli/commands/whoami.py index fdc4a5f..5f520f6 100644 --- a/src/fastapi_cloud_cli/commands/whoami.py +++ b/src/fastapi_cloud_cli/commands/whoami.py @@ -14,6 +14,10 @@ def whoami() -> Any: identity = Identity() + if identity.auth_mode == "token": + print("⚡ [bold]Using API token from environment variable[/bold]") + return + if not identity.is_logged_in(): print("No credentials found. Use [blue]`fastapi login`[/] to login.") return diff --git a/src/fastapi_cloud_cli/utils/auth.py b/src/fastapi_cloud_cli/utils/auth.py index 27258f8..8ccdd69 100644 --- a/src/fastapi_cloud_cli/utils/auth.py +++ b/src/fastapi_cloud_cli/utils/auth.py @@ -2,8 +2,9 @@ import binascii import json import logging +import os import time -from typing import Optional +from typing import Literal, Optional from pydantic import BaseModel @@ -59,7 +60,7 @@ def _get_auth_token() -> Optional[str]: return auth_data.access_token -def _is_token_expired(token: str) -> bool: +def _is_jwt_expired(token: str) -> bool: try: parts = token.split(".") @@ -108,21 +109,32 @@ def _is_token_expired(token: str) -> bool: class Identity: + auth_mode: Literal["token", "user"] + def __init__(self) -> None: self.token = _get_auth_token() + self.auth_mode = "user" + + # users using `FASTAPI_CLOUD_TOKEN` + if env_token := self._get_token_from_env(): + self.token = env_token + self.auth_mode = "token" + + def _get_token_from_env(self) -> Optional[str]: + return os.environ.get("FASTAPI_CLOUD_TOKEN") def is_expired(self) -> bool: if not self.token: return True - return _is_token_expired(self.token) + return _is_jwt_expired(self.token) def is_logged_in(self) -> bool: if self.token is None: logger.debug("Login status: False (no token)") return False - if self.is_expired(): + if self.auth_mode == "user" and self.is_expired(): logger.debug("Login status: False (token expired)") return False diff --git a/src/fastapi_cloud_cli/utils/cli.py b/src/fastapi_cloud_cli/utils/cli.py index 5d815e9..e0995f0 100644 --- a/src/fastapi_cloud_cli/utils/cli.py +++ b/src/fastapi_cloud_cli/utils/cli.py @@ -10,6 +10,8 @@ from rich_toolkit.progress import Progress from rich_toolkit.styles import MinimalStyle, TaggedStyle +from .auth import Identity + logger = logging.getLogger(__name__) @@ -89,7 +91,14 @@ def handle_http_errors( logger.debug(e.response.json()) # pragma: no cover if isinstance(e, HTTPStatusError) and e.response.status_code in (401, 403): - message = "The specified token is not valid. Use `fastapi login` to generate a new token." + message = "The specified token is not valid. " + + identity = Identity() + + if identity.auth_mode == "user": + message += "Use `fastapi login` to generate a new token." + else: + message += "Make sure to use a valid token." else: message = ( diff --git a/tests/test_auth.py b/tests/test_auth.py index dd39433..a7e980b 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -7,33 +7,33 @@ from fastapi_cloud_cli.utils.auth import ( AuthConfig, Identity, - _is_token_expired, + _is_jwt_expired, write_auth_config, ) from .utils import create_jwt_token -def test_is_token_expired_with_valid_token() -> None: +def test_is_jwt_expired_with_valid_token() -> None: future_exp = int(time.time()) + 3600 token = create_jwt_token({"exp": future_exp, "sub": "test_user"}) - assert not _is_token_expired(token) + assert not _is_jwt_expired(token) -def test_is_token_expired_with_expired_token() -> None: +def test_is_jwt_expired_with_expired_token() -> None: past_exp = int(time.time()) - 3600 token = create_jwt_token({"exp": past_exp, "sub": "test_user"}) - assert _is_token_expired(token) + assert _is_jwt_expired(token) -def test_is_token_expired_with_no_exp_claim() -> None: +def test_is_jwt_expired_with_no_exp_claim() -> None: token = create_jwt_token({"sub": "test_user"}) # Tokens without exp claim should be considered valid - assert not _is_token_expired(token) + assert not _is_jwt_expired(token) @pytest.mark.parametrize( @@ -46,36 +46,36 @@ def test_is_token_expired_with_no_exp_claim() -> None: "...", ], ) -def test_is_token_expired_with_malformed_token(token: str) -> None: - assert _is_token_expired(token) +def test_is_jwt_expired_with_malformed_token(token: str) -> None: + assert _is_jwt_expired(token) -def test_is_token_expired_with_invalid_base64() -> None: +def test_is_jwt_expired_with_invalid_base64() -> None: token = "header.!!!invalid_signature!!!.signature" - assert _is_token_expired(token) + assert _is_jwt_expired(token) -def test_is_token_expired_with_invalid_json() -> None: +def test_is_jwt_expired_with_invalid_json() -> None: header_encoded = base64.urlsafe_b64encode(b'{"alg":"HS256"}').decode().rstrip("=") payload_encoded = base64.urlsafe_b64encode(b"{invalid json}").decode().rstrip("=") signature = base64.urlsafe_b64encode(b"signature").decode().rstrip("=") token = f"{header_encoded}.{payload_encoded}.{signature}" - assert _is_token_expired(token) + assert _is_jwt_expired(token) -def test_is_token_expired_edge_case_exact_expiration() -> None: +def test_is_jwt_expired_edge_case_exact_expiration() -> None: current_time = int(time.time()) token = create_jwt_token({"exp": current_time, "sub": "test_user"}) - assert _is_token_expired(token) + assert _is_jwt_expired(token) -def test_is_token_expired_edge_case_one_second_before() -> None: +def test_is_jwt_expired_edge_case_one_second_before() -> None: current_time = int(time.time()) token = create_jwt_token({"exp": current_time + 1, "sub": "test_user"}) - assert not _is_token_expired(token) + assert not _is_jwt_expired(token) def test_is_logged_in_with_no_token(temp_auth_config: Path) -> None: diff --git a/tests/test_cli_deploy.py b/tests/test_cli_deploy.py index 2b094b6..8f8382b 100644 --- a/tests/test_cli_deploy.py +++ b/tests/test_cli_deploy.py @@ -1115,3 +1115,104 @@ def test_cancel_upload_swallows_exceptions( assert upload_cancelled_route.called assert "HTTPStatusError" not in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_deploy_successfully_with_token( + logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter +) -> None: + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + deployment_data = _get_random_deployment(app_id=app_id) + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock( + return_value=Response(200, json=app_data) + ) + + respx_mock.post( + f"/apps/{app_id}/deployments/", headers={"Authorization": "Bearer hello"} + ).mock(return_value=Response(201, json=deployment_data)) + + respx_mock.post( + f"/deployments/{deployment_data['id']}/upload", + headers={"Authorization": "Bearer hello"}, + ).mock( + return_value=Response( + 200, + json={"url": "http://test.com", "fields": {"key": "value"}}, + ) + ) + + respx_mock.post("http://test.com", data={"key": "value"}).mock( + return_value=Response(200) + ) + + respx_mock.get( + f"/deployments/{deployment_data['id']}/build-logs", + headers={"Authorization": "Bearer hello"}, + ).mock( + return_value=Response( + 200, + content=build_logs_response( + {"type": "message", "message": "Building...", "id": "1"}, + {"type": "message", "message": "All good!", "id": "2"}, + {"type": "complete"}, + ), + ) + ) + + respx_mock.post( + f"/deployments/{deployment_data['id']}/upload-complete", + headers={"Authorization": "Bearer hello"}, + ).mock(return_value=Response(200)) + + with changing_dir(tmp_path): + result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"}) + + assert result.exit_code == 0 + + # check that logs are shown + assert "All good!" in result.output + + # check that the dashboard URL is shown + assert "You can also check the app logs at" in result.output + assert deployment_data["dashboard_url"] in result.output + + # check that the app URL is shown + assert deployment_data["url"] in result.output + + +@pytest.mark.respx(base_url=settings.base_api_url) +def test_deploy_with_token_fails( + logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter +) -> None: + app_data = _get_random_app() + team_data = _get_random_team() + app_id = app_data["id"] + team_id = team_data["id"] + + config_path = tmp_path / ".fastapicloud" / "cloud.json" + + config_path.parent.mkdir(parents=True, exist_ok=True) + config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}') + + respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock( + return_value=Response(401, json=app_data) + ) + + with changing_dir(tmp_path): + result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"}) + + assert result.exit_code == 1 + + assert ( + "The specified token is not valid. Make sure to use a valid token." + in result.output + ) diff --git a/tests/test_cli_whoami.py b/tests/test_cli_whoami.py index 4e13b6b..fb1d720 100644 --- a/tests/test_cli_whoami.py +++ b/tests/test_cli_whoami.py @@ -70,3 +70,10 @@ def test_prints_not_logged_in(logged_out_cli: None) -> None: assert result.exit_code == 0 assert "No credentials found. Use `fastapi login` to login." in result.output + + +def test_shows_logged_in_via_token(logged_out_cli: None) -> None: + result = runner.invoke(app, ["whoami"], env={"FASTAPI_CLOUD_TOKEN": "ABC"}) + + assert result.exit_code == 0 + assert "Using API token from environment variable" in result.output