⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/fastapi_cloud_cli/commands/whoami.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
def whoami() -> Any:
identity = Identity()

if identity.auth_mode == "token":
print("⚡ [bold]Using API token from environment variable[/bold]")
return

if not identity.is_logged_in():
print("No credentials found. Use [blue]`fastapi login`[/] to login.")
return
Expand Down
20 changes: 16 additions & 4 deletions src/fastapi_cloud_cli/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import binascii
import json
import logging
import os
import time
from typing import Optional
from typing import Literal, Optional

from pydantic import BaseModel

Expand Down Expand Up @@ -59,7 +60,7 @@ def _get_auth_token() -> Optional[str]:
return auth_data.access_token


def _is_token_expired(token: str) -> bool:
def _is_jwt_expired(token: str) -> bool:
try:
parts = token.split(".")

Expand Down Expand Up @@ -108,21 +109,32 @@ def _is_token_expired(token: str) -> bool:


class Identity:
auth_mode: Literal["token", "user"]

def __init__(self) -> None:
self.token = _get_auth_token()
self.auth_mode = "user"

# users using `FASTAPI_CLOUD_TOKEN`
if env_token := self._get_token_from_env():
self.token = env_token
self.auth_mode = "token"

def _get_token_from_env(self) -> Optional[str]:
return os.environ.get("FASTAPI_CLOUD_TOKEN")

def is_expired(self) -> bool:
if not self.token:
return True

return _is_token_expired(self.token)
return _is_jwt_expired(self.token)

def is_logged_in(self) -> bool:
if self.token is None:
logger.debug("Login status: False (no token)")
return False

if self.is_expired():
if self.auth_mode == "user" and self.is_expired():
logger.debug("Login status: False (token expired)")
return False

Expand Down
11 changes: 10 additions & 1 deletion src/fastapi_cloud_cli/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from rich_toolkit.progress import Progress
from rich_toolkit.styles import MinimalStyle, TaggedStyle

from .auth import Identity

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -89,7 +91,14 @@ def handle_http_errors(
logger.debug(e.response.json()) # pragma: no cover

if isinstance(e, HTTPStatusError) and e.response.status_code in (401, 403):
message = "The specified token is not valid. Use `fastapi login` to generate a new token."
message = "The specified token is not valid. "

identity = Identity()

if identity.auth_mode == "user":
message += "Use `fastapi login` to generate a new token."
else:
message += "Make sure to use a valid token."

else:
message = (
Expand Down
34 changes: 17 additions & 17 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,33 @@
from fastapi_cloud_cli.utils.auth import (
AuthConfig,
Identity,
_is_token_expired,
_is_jwt_expired,
write_auth_config,
)

from .utils import create_jwt_token


def test_is_token_expired_with_valid_token() -> None:
def test_is_jwt_expired_with_valid_token() -> None:
future_exp = int(time.time()) + 3600

token = create_jwt_token({"exp": future_exp, "sub": "test_user"})

assert not _is_token_expired(token)
assert not _is_jwt_expired(token)


def test_is_token_expired_with_expired_token() -> None:
def test_is_jwt_expired_with_expired_token() -> None:
past_exp = int(time.time()) - 3600
token = create_jwt_token({"exp": past_exp, "sub": "test_user"})

assert _is_token_expired(token)
assert _is_jwt_expired(token)


def test_is_token_expired_with_no_exp_claim() -> None:
def test_is_jwt_expired_with_no_exp_claim() -> None:
token = create_jwt_token({"sub": "test_user"})

# Tokens without exp claim should be considered valid
assert not _is_token_expired(token)
assert not _is_jwt_expired(token)


@pytest.mark.parametrize(
Expand All @@ -46,36 +46,36 @@ def test_is_token_expired_with_no_exp_claim() -> None:
"...",
],
)
def test_is_token_expired_with_malformed_token(token: str) -> None:
assert _is_token_expired(token)
def test_is_jwt_expired_with_malformed_token(token: str) -> None:
assert _is_jwt_expired(token)


def test_is_token_expired_with_invalid_base64() -> None:
def test_is_jwt_expired_with_invalid_base64() -> None:
token = "header.!!!invalid_signature!!!.signature"
assert _is_token_expired(token)
assert _is_jwt_expired(token)


def test_is_token_expired_with_invalid_json() -> None:
def test_is_jwt_expired_with_invalid_json() -> None:
header_encoded = base64.urlsafe_b64encode(b'{"alg":"HS256"}').decode().rstrip("=")
payload_encoded = base64.urlsafe_b64encode(b"{invalid json}").decode().rstrip("=")
signature = base64.urlsafe_b64encode(b"signature").decode().rstrip("=")
token = f"{header_encoded}.{payload_encoded}.{signature}"

assert _is_token_expired(token)
assert _is_jwt_expired(token)


def test_is_token_expired_edge_case_exact_expiration() -> None:
def test_is_jwt_expired_edge_case_exact_expiration() -> None:
current_time = int(time.time())
token = create_jwt_token({"exp": current_time, "sub": "test_user"})

assert _is_token_expired(token)
assert _is_jwt_expired(token)


def test_is_token_expired_edge_case_one_second_before() -> None:
def test_is_jwt_expired_edge_case_one_second_before() -> None:
current_time = int(time.time())
token = create_jwt_token({"exp": current_time + 1, "sub": "test_user"})

assert not _is_token_expired(token)
assert not _is_jwt_expired(token)


def test_is_logged_in_with_no_token(temp_auth_config: Path) -> None:
Expand Down
101 changes: 101 additions & 0 deletions tests/test_cli_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,3 +1115,104 @@ def test_cancel_upload_swallows_exceptions(

assert upload_cancelled_route.called
assert "HTTPStatusError" not in result.output


@pytest.mark.respx(base_url=settings.base_api_url)
def test_deploy_successfully_with_token(
logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter
) -> None:
app_data = _get_random_app()
team_data = _get_random_team()
app_id = app_data["id"]
team_id = team_data["id"]
deployment_data = _get_random_deployment(app_id=app_id)

config_path = tmp_path / ".fastapicloud" / "cloud.json"

config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}')

respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock(
return_value=Response(200, json=app_data)
)

respx_mock.post(
f"/apps/{app_id}/deployments/", headers={"Authorization": "Bearer hello"}
).mock(return_value=Response(201, json=deployment_data))

respx_mock.post(
f"/deployments/{deployment_data['id']}/upload",
headers={"Authorization": "Bearer hello"},
).mock(
return_value=Response(
200,
json={"url": "http://test.com", "fields": {"key": "value"}},
)
)

respx_mock.post("http://test.com", data={"key": "value"}).mock(
return_value=Response(200)
)

respx_mock.get(
f"/deployments/{deployment_data['id']}/build-logs",
headers={"Authorization": "Bearer hello"},
).mock(
return_value=Response(
200,
content=build_logs_response(
{"type": "message", "message": "Building...", "id": "1"},
{"type": "message", "message": "All good!", "id": "2"},
{"type": "complete"},
),
)
)

respx_mock.post(
f"/deployments/{deployment_data['id']}/upload-complete",
headers={"Authorization": "Bearer hello"},
).mock(return_value=Response(200))

with changing_dir(tmp_path):
result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"})

assert result.exit_code == 0

# check that logs are shown
assert "All good!" in result.output

# check that the dashboard URL is shown
assert "You can also check the app logs at" in result.output
assert deployment_data["dashboard_url"] in result.output

# check that the app URL is shown
assert deployment_data["url"] in result.output


@pytest.mark.respx(base_url=settings.base_api_url)
def test_deploy_with_token_fails(
logged_out_cli: None, tmp_path: Path, respx_mock: respx.MockRouter
) -> None:
app_data = _get_random_app()
team_data = _get_random_team()
app_id = app_data["id"]
team_id = team_data["id"]

config_path = tmp_path / ".fastapicloud" / "cloud.json"

config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(f'{{"app_id": "{app_id}", "team_id": "{team_id}"}}')

respx_mock.get(f"/apps/{app_id}", headers={"Authorization": "Bearer hello"}).mock(
return_value=Response(401, json=app_data)
)

with changing_dir(tmp_path):
result = runner.invoke(app, ["deploy"], env={"FASTAPI_CLOUD_TOKEN": "hello"})

assert result.exit_code == 1

assert (
"The specified token is not valid. Make sure to use a valid token."
in result.output
)
7 changes: 7 additions & 0 deletions tests/test_cli_whoami.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ def test_prints_not_logged_in(logged_out_cli: None) -> None:

assert result.exit_code == 0
assert "No credentials found. Use `fastapi login` to login." in result.output


def test_shows_logged_in_via_token(logged_out_cli: None) -> None:
result = runner.invoke(app, ["whoami"], env={"FASTAPI_CLOUD_TOKEN": "ABC"})

assert result.exit_code == 0
assert "Using API token from environment variable" in result.output