diff --git a/.github/workflows/secret-scan.yml b/.github/workflows/secret-scan.yml index 9f9dbf1..3d0d70c 100644 --- a/.github/workflows/secret-scan.yml +++ b/.github/workflows/secret-scan.yml @@ -15,7 +15,11 @@ jobs: with: fetch-depth: 0 - - name: Run gitleaks - uses: gitleaks/gitleaks-action@v2 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + # Use the gitleaks CLI directly (MIT, free) instead of + # gitleaks/gitleaks-action@v2, which requires a paid GITLEAKS_LICENSE + # for GitHub organizations and otherwise fails the job. + - name: Run gitleaks (CLI) + run: | + VERSION=8.18.4 + curl -sSL "https://github.com/gitleaks/gitleaks/releases/download/v${VERSION}/gitleaks_${VERSION}_linux_x64.tar.gz" | tar -xz gitleaks + ./gitleaks detect --source . --no-git --no-banner --redact --exit-code 1 diff --git a/.gitleaks.toml b/.gitleaks.toml index e974c04..43a4f9e 100644 --- a/.gitleaks.toml +++ b/.gitleaks.toml @@ -1,27 +1,18 @@ +# Extends gitleaks' default rules; allowlists fixtures that intentionally +# contain fake credentials (test files, examples, docs). [extend] useDefault = true -[[rules]] -id = "awsys-api-key" -description = "AWSYS.CO API Key" -regex = '''awsys_[0-9a-f]{64}''' -tags = ["key", "awsys"] - -[[rules]] -id = "awsys-api-key-short" -description = "AWSYS.CO API Key (short form)" -regex = '''awsys_[0-9a-zA-Z]{20,}''' -tags = ["key", "awsys"] - [allowlist] -description = "Global allowlist" +description = "Test fixtures, examples and docs use placeholder/fake tokens" paths = [ - '''.gitleaks.toml''', - '''\.env\.example''', - '''README\.md''', + '''(^|/)tests?/''', + '''.*\.test\.(ts|js)$''', + '''.*_test\.go$''', + '''(^|/)examples?/''', + '''(^|/)README\.md$''', + '''(^|/)CHANGELOG\.md$''', ] regexes = [ - '''awsys_''', - '''awsys_YOUR_API_KEY''', - '''awsys_\.\.\.', + '''awsys_(test|example|dummy|fake)[A-Za-z0-9_]*''', ] diff --git a/README.md b/README.md index 6003a0f..c249e10 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,8 @@ client.links.delete("my-link") | Method | Description | |---|---| -| `client.analytics.get_stats(short_path)` | Get click stats for a link | +| `client.analytics.get_stats(short_path)` | Get raw per-click stats for a link | +| `client.analytics.get_aggregate_stats(short_path, *, period=None)` | Get rolled-up (aggregated) stats for a link | ```python stats = client.analytics.get_stats("abc123") @@ -96,6 +97,25 @@ for click in stats.clicks: print(click.country, click.device, click.timestamp) ``` +`get_aggregate_stats` returns server-side aggregations (clicks-by-day, +country/device/UTM breakdowns, unique visitors). The breakdowns present are +**tier-gated** — free-tier responses include an `upgrade_for_more` hint and +omit the richer breakdowns; higher tiers populate `device_breakdown`, +`utm_breakdown`, `hour_breakdown`, etc. + +```python +agg = client.analytics.get_aggregate_stats("abc123", period="30d") +print(agg.total_clicks, agg.unique_visitors, agg.tier) +for day in agg.clicks_by_day: + print(day.date, day.clicks) +print(agg.country_breakdown) # {"US": 100, ...} + +if agg.device_breakdown: # populated on Pro+ + print(agg.device_breakdown.mobile, agg.device_breakdown.desktop) +if agg.upgrade_for_more: # present on free tier + print(agg.upgrade_for_more.message, agg.upgrade_for_more.available) +``` + ### QR Codes | Method | Description | @@ -193,6 +213,40 @@ session = client.web2app.consume_session("0123456789abcdef0123456789abcdef") print(session.link_id, session.utm_params, session.country) ``` +### Imports + +| Method | Description | +|---|---| +| `client.imports.start(*, provider, access_token, target_namespace=None, scan_only=None)` | Start a provider link-import job | +| `client.imports.get_status(job_id)` | Get the current state of an import job | +| `client.imports.cancel(job_id)` | Cancel an in-progress import job | +| `client.imports.list(*, limit=None)` | List import jobs for the account | +| `client.imports.wait_for_completion(job_id, *, poll_interval=2.0, timeout=120.0)` | Poll until the job reaches a terminal state | + +Import jobs run asynchronously server-side. `start` returns a `pending` +`ImportJob`; poll `get_status` (or use `wait_for_completion`) until the status +is one of `completed`, `partial`, `failed`, or `cancelled`. Pass +`scan_only=True` to preview what would be imported without writing links. + +```python +job = client.imports.start(provider="bitly", access_token="") +print(job.id, job.status) + +# Block until the import finishes (raises TimeoutError if it overruns). +done = client.imports.wait_for_completion(job.id, poll_interval=5.0, timeout=300.0) +print(done.status, done.counts.written, done.counts.errored) +for err in done.errors: + print(err) + +# Or drive it manually +for j in client.imports.list(limit=10): + print(j.id, j.provider, j.status) +client.imports.cancel(job.id) +``` + +All `imports` methods are available identically on `AsyncClient` +(`await client.imports.start(...)`, `await client.imports.wait_for_completion(...)`). + ## Error Handling All errors inherit from `AwsysError`. @@ -288,6 +342,13 @@ All responses are parsed into Pydantic v2 models: | `UsageLimits` | `links_per_month`, `monthly_links`, `daily_links`, `monthly_tracked_clicks`, `qr_codes`, `folders` (each `int` or `"unlimited"`), `api_calls_per_month`, `custom_slugs` | | `UsageOverage` | `active`, `started_at`, `expires_at`, `hours_until_drop`, `clicks_this_cycle`, `spending_limit_cents`, `estimated_charge_cents` | | `Web2AppSession` | `success`, `link_id`, `utm_params`, `routing_rule`, `country`, `clicked_at` | +| `ImportJob` | `id`, `user_id`, `provider`, `status`, `scan_only`, `target_namespace`, `scope_filter`, `counts`, `errors`, `created_at`, `updated_at` | +| `ImportCounts` | `fetched`, `transformed`, `written`, `errored` | +| `AggregateAnalytics` | `short_code`, `full_path`, `period`, `total_clicks`, `unique_visitors`, `clicks_by_day`, `country_breakdown`, `tier_limit`, `tier`, `device_breakdown`, `referrer_breakdown`, `browser_breakdown`, `os_breakdown`, `source_breakdown`, `hour_breakdown`, `utm_breakdown`, `upgrade_for_more` | +| `DayClicks` / `HourClicks` | `date`/`hour`, `clicks` | +| `DeviceBreakdown` | `mobile`, `desktop`, `tablet` | +| `UTMBreakdown` | `sources`, `mediums`, `campaigns` | +| `UpgradeForMore` | `available`, `message` | ## Development Setup diff --git a/awsysco/__init__.py b/awsysco/__init__.py index 658b5e7..64adf8a 100644 --- a/awsysco/__init__.py +++ b/awsysco/__init__.py @@ -12,13 +12,19 @@ ) from .models import ( AffiliateProgram, + AggregateAnalytics, BulkLinkResult, BulkResult, ClickEvent, CustomDomain, + DayClicks, + DeviceBreakdown, Folder, FolderList, GeoRestriction, + HourClicks, + ImportCounts, + ImportJob, Link, LinkList, LinkStats, @@ -31,15 +37,17 @@ SavedView, SavedViewFilters, TrustScoreResult, + UpgradeForMore, UsageLimits, UsageOverage, UsageStats, + UTMBreakdown, UtmTemplate, Web2AppSession, Webhook, ) -__version__ = "1.2.0" +__version__ = "1.3.0" __all__ = [ # Clients "Client", @@ -90,4 +98,14 @@ "UsageOverage", # Web2App "Web2AppSession", + # Imports + "ImportCounts", + "ImportJob", + # Aggregate analytics + "AggregateAnalytics", + "DayClicks", + "HourClicks", + "DeviceBreakdown", + "UTMBreakdown", + "UpgradeForMore", ] diff --git a/awsysco/async_resources/analytics.py b/awsysco/async_resources/analytics.py index 592efd7..e379d5c 100644 --- a/awsysco/async_resources/analytics.py +++ b/awsysco/async_resources/analytics.py @@ -5,7 +5,7 @@ from typing import List, Optional from .._async_http import AsyncHttpClient -from ..models import ClickEvent, LinkStats +from ..models import AggregateAnalytics, ClickEvent, LinkStats class AsyncAnalyticsResource: @@ -22,6 +22,18 @@ async def get_stats(self, short_path: str, *, period: Optional[str] = None) -> L ) return LinkStats.model_validate(data) + async def get_aggregate_stats( + self, short_path: str, *, period: Optional[str] = None + ) -> AggregateAnalytics: + params = {} + if period is not None: + params["period"] = period + data = await self._http.get( + f"/api/v1/links/{short_path}/stats/aggregate", + params=params if params else None, + ) + return AggregateAnalytics.model_validate(data) + async def get_recent_clicks(self, *, limit: Optional[int] = None) -> List[ClickEvent]: params = {} if limit is not None: diff --git a/awsysco/async_resources/imports.py b/awsysco/async_resources/imports.py new file mode 100644 index 0000000..a4e7c2c --- /dev/null +++ b/awsysco/async_resources/imports.py @@ -0,0 +1,92 @@ +"""Async Imports resource.""" + +from __future__ import annotations + +import asyncio +import time +from typing import List, Optional +from urllib.parse import quote + +from .._async_http import AsyncHttpClient +from ..models import ImportJob + +# Statuses that indicate the import job has stopped progressing. +_TERMINAL_STATES = {"completed", "partial", "failed", "cancelled"} + + +class AsyncImportsResource: + def __init__(self, http: AsyncHttpClient) -> None: + self._http = http + + async def start( + self, + *, + provider: str, + access_token: str, + target_namespace: Optional[str] = None, + scan_only: Optional[bool] = None, + ) -> ImportJob: + """Start a new provider link-import job. + + The request body uses snake_case keys (``provider``, ``access_token``, + optional ``target_namespace`` / ``scan_only``). + """ + body = {"provider": provider, "access_token": access_token} + if target_namespace is not None: + body["target_namespace"] = target_namespace + if scan_only is not None: + body["scan_only"] = scan_only + data = await self._http.post("/api/v1/imports", json=body) + return ImportJob.model_validate(data) + + async def get_status(self, job_id: str) -> ImportJob: + """Get the current state of an import job.""" + encoded = quote(job_id, safe="") + data = await self._http.get(f"/api/v1/imports/{encoded}") + return ImportJob.model_validate(data) + + async def cancel(self, job_id: str) -> ImportJob: + """Cancel an in-progress import job.""" + encoded = quote(job_id, safe="") + data = await self._http.delete(f"/api/v1/imports/{encoded}") + return ImportJob.model_validate(data) + + async def list(self, *, limit: Optional[int] = None) -> List[ImportJob]: + """List import jobs for the authenticated user.""" + params = {} + if limit is not None: + params["limit"] = limit + data = await self._http.get( + "/api/v1/imports", + params=params if params else None, + ) + items = data.get("jobs", []) if isinstance(data, dict) else (data or []) + return [ImportJob.model_validate(item) for item in items] + + async def wait_for_completion( + self, + job_id: str, + *, + poll_interval: float = 2.0, + timeout: float = 120.0, + ) -> ImportJob: + """Poll an import job until it reaches a terminal state. + + Polls ``get_status`` every ``poll_interval`` seconds until the job + status is one of ``completed``, ``partial``, ``failed``, or + ``cancelled``. + + Raises: + TimeoutError: If the job does not finish within ``timeout``. + """ + deadline = time.monotonic() + timeout + while True: + job = await self.get_status(job_id) + if job.status in _TERMINAL_STATES: + return job + if time.monotonic() >= deadline: + raise TimeoutError( + f"Import job {job_id} did not complete within {timeout}s " + f"(last status: {job.status})" + ) + await asyncio.sleep(poll_interval) diff --git a/awsysco/client.py b/awsysco/client.py index c48daf6..548dc3a 100644 --- a/awsysco/client.py +++ b/awsysco/client.py @@ -13,6 +13,7 @@ from .async_resources.custom_domains import AsyncCustomDomainsResource from .async_resources.data_export import AsyncDataExportResource from .async_resources.folders import AsyncFoldersResource +from .async_resources.imports import AsyncImportsResource from .async_resources.links import AsyncLinksResource from .async_resources.me import AsyncMeResource from .async_resources.namespace import AsyncNamespaceResource @@ -31,6 +32,7 @@ from .resources.custom_domains import CustomDomainsResource from .resources.data_export import DataExportResource from .resources.folders import FoldersResource +from .resources.imports import ImportsResource from .resources.links import LinksResource from .resources.me import MeResource from .resources.namespace import NamespaceResource @@ -99,6 +101,7 @@ def __init__( # Parity resources self.usage = UsageResource(self._http) self.web2app = Web2AppResource(self._http) + self.imports = ImportsResource(self._http) def close(self) -> None: """Close the underlying HTTP connection pool.""" @@ -162,6 +165,7 @@ def __init__( # Parity resources self.usage = AsyncUsageResource(self._http) self.web2app = AsyncWeb2AppResource(self._http) + self.imports = AsyncImportsResource(self._http) async def aclose(self) -> None: """Close the underlying async HTTP connection pool.""" diff --git a/awsysco/models.py b/awsysco/models.py index c14ef0b..7b9b7db 100644 --- a/awsysco/models.py +++ b/awsysco/models.py @@ -34,6 +34,14 @@ "UsageOverage", "UsageStats", "Web2AppSession", + "ImportCounts", + "ImportJob", + "DayClicks", + "HourClicks", + "DeviceBreakdown", + "UTMBreakdown", + "UpgradeForMore", + "AggregateAnalytics", ] @@ -411,3 +419,107 @@ class Web2AppSession(_CamelModel): routing_rule: Optional[Dict[str, Any]] = None country: Optional[str] = None clicked_at: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Imports models — provider link-import jobs +# --------------------------------------------------------------------------- + + +class ImportCounts(_CamelModel): + """Progress counters for an import job.""" + + fetched: int = 0 + transformed: int = 0 + written: int = 0 + errored: int = 0 + + +class ImportJob(_CamelModel): + """A provider link-import job from ``/api/v1/imports``. + + ``status`` progresses through states such as ``pending``, ``running``, + ``completed``, ``partial``, ``failed``, and ``cancelled``. + """ + + id: Optional[str] = None + user_id: Optional[str] = None + provider: Optional[str] = None + status: Optional[str] = None + scan_only: Optional[bool] = None + target_namespace: Optional[str] = None + scope_filter: Optional[str] = None + counts: Optional[ImportCounts] = None + errors: List[str] = Field(default_factory=list) + created_at: Optional[str] = None + updated_at: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Aggregate analytics models — tier-gated rollups +# --------------------------------------------------------------------------- + + +class DayClicks(_CamelModel): + """Clicks for a single calendar day.""" + + date: Optional[str] = None + clicks: Optional[int] = None + + +class HourClicks(_CamelModel): + """Clicks for a single hour-of-day bucket (0–23).""" + + hour: Optional[int] = None + clicks: Optional[int] = None + + +class DeviceBreakdown(_CamelModel): + """Click counts split by device class.""" + + mobile: Optional[int] = None + desktop: Optional[int] = None + tablet: Optional[int] = None + + +class UTMBreakdown(_CamelModel): + """UTM rollups keyed by source / medium / campaign.""" + + sources: Dict[str, int] = Field(default_factory=dict) + mediums: Dict[str, int] = Field(default_factory=dict) + campaigns: Dict[str, int] = Field(default_factory=dict) + + +class UpgradeForMore(_CamelModel): + """Tier-gating hint listing breakdowns available on higher tiers.""" + + available: List[str] = Field(default_factory=list) + message: Optional[str] = None + + +class AggregateAnalytics(_CamelModel): + """Aggregated click analytics from + ``/api/v1/links/{short_path}/stats/aggregate``. + + The set of populated breakdowns depends on the account tier — free-tier + responses include ``upgrade_for_more`` and omit the richer breakdowns, + while higher tiers populate ``device_breakdown``, ``utm_breakdown``, etc. + """ + + short_code: Optional[str] = None + full_path: Optional[str] = None + period: Optional[str] = None + total_clicks: Optional[int] = None + unique_visitors: Optional[int] = None + clicks_by_day: List[DayClicks] = Field(default_factory=list) + country_breakdown: Dict[str, int] = Field(default_factory=dict) + tier_limit: Optional[int] = None + tier: Optional[str] = None + device_breakdown: Optional[DeviceBreakdown] = None + referrer_breakdown: Optional[Dict[str, int]] = None + browser_breakdown: Optional[Dict[str, int]] = None + os_breakdown: Optional[Dict[str, int]] = None + source_breakdown: Optional[Dict[str, int]] = None + hour_breakdown: Optional[List[HourClicks]] = None + utm_breakdown: Optional[UTMBreakdown] = None + upgrade_for_more: Optional[UpgradeForMore] = None diff --git a/awsysco/resources/analytics.py b/awsysco/resources/analytics.py index 91968a5..f0c3c8a 100644 --- a/awsysco/resources/analytics.py +++ b/awsysco/resources/analytics.py @@ -5,7 +5,7 @@ from typing import List, Optional from .._http import HttpClient -from ..models import ClickEvent, LinkStats +from ..models import AggregateAnalytics, ClickEvent, LinkStats class AnalyticsResource: @@ -34,6 +34,34 @@ def get_stats(self, short_path: str, *, period: Optional[str] = None) -> LinkSta ) return LinkStats.model_validate(data) + def get_aggregate_stats( + self, short_path: str, *, period: Optional[str] = None + ) -> AggregateAnalytics: + """Get aggregated (rolled-up) click analytics for a link. + + Unlike :meth:`get_stats`, which returns a raw per-click list, this + returns server-side aggregations (clicks by day, country/device/UTM + breakdowns, unique visitors). The breakdowns present in the response + are tier-gated — free-tier responses include an ``upgrade_for_more`` + hint and omit the richer breakdowns. + + Args: + short_path: The short code or slug identifying the link. + period: Optional time period filter (e.g. ``'7d'``, ``'30d'``, + ``'all'``). + + Returns: + An AggregateAnalytics object. + """ + params = {} + if period is not None: + params["period"] = period + data = self._http.get( + f"/api/v1/links/{short_path}/stats/aggregate", + params=params if params else None, + ) + return AggregateAnalytics.model_validate(data) + def get_recent_clicks(self, *, limit: Optional[int] = None) -> List[ClickEvent]: """Get recent click events across all links for the authenticated user. diff --git a/awsysco/resources/imports.py b/awsysco/resources/imports.py new file mode 100644 index 0000000..d2a05ff --- /dev/null +++ b/awsysco/resources/imports.py @@ -0,0 +1,128 @@ +"""Imports resource — provider link-import jobs.""" + +from __future__ import annotations + +import time +from typing import List, Optional +from urllib.parse import quote + +from .._http import HttpClient +from ..models import ImportJob + +# Statuses that indicate the import job has stopped progressing. +_TERMINAL_STATES = {"completed", "partial", "failed", "cancelled"} + + +class ImportsResource: + """Interact with /api/v1/imports.""" + + def __init__(self, http: HttpClient) -> None: + self._http = http + + def start( + self, + *, + provider: str, + access_token: str, + target_namespace: Optional[str] = None, + scan_only: Optional[bool] = None, + ) -> ImportJob: + """Start a new provider link-import job. + + Args: + provider: The source provider (e.g. ``'bitly'``, ``'rebrandly'``). + access_token: An OAuth/API token for the source provider account. + target_namespace: Optional namespace to import links into. + scan_only: If ``True``, fetch and report without writing links. + + Returns: + The created ImportJob (initially in a ``pending`` state). + """ + body = {"provider": provider, "access_token": access_token} + if target_namespace is not None: + body["target_namespace"] = target_namespace + if scan_only is not None: + body["scan_only"] = scan_only + data = self._http.post("/api/v1/imports", json=body) + return ImportJob.model_validate(data) + + def get_status(self, job_id: str) -> ImportJob: + """Get the current state of an import job. + + Args: + job_id: The import job id. + + Returns: + The ImportJob with up-to-date status and counts. + """ + encoded = quote(job_id, safe="") + data = self._http.get(f"/api/v1/imports/{encoded}") + return ImportJob.model_validate(data) + + def cancel(self, job_id: str) -> ImportJob: + """Cancel an in-progress import job. + + Args: + job_id: The import job id. + + Returns: + The ImportJob reflecting the cancelled state. + """ + encoded = quote(job_id, safe="") + data = self._http.delete(f"/api/v1/imports/{encoded}") + return ImportJob.model_validate(data) + + def list(self, *, limit: Optional[int] = None) -> List[ImportJob]: + """List import jobs for the authenticated user. + + Args: + limit: Maximum number of jobs to return. + + Returns: + A list of ImportJob objects. + """ + params = {} + if limit is not None: + params["limit"] = limit + data = self._http.get( + "/api/v1/imports", + params=params if params else None, + ) + items = data.get("jobs", []) if isinstance(data, dict) else (data or []) + return [ImportJob.model_validate(item) for item in items] + + def wait_for_completion( + self, + job_id: str, + *, + poll_interval: float = 2.0, + timeout: float = 120.0, + ) -> ImportJob: + """Poll an import job until it reaches a terminal state. + + Polls :meth:`get_status` every ``poll_interval`` seconds until the + job status is one of ``completed``, ``partial``, ``failed``, or + ``cancelled``. + + Args: + job_id: The import job id. + poll_interval: Seconds to wait between status checks. + timeout: Maximum seconds to wait before giving up. + + Returns: + The terminal-state ImportJob. + + Raises: + TimeoutError: If the job does not finish within ``timeout``. + """ + deadline = time.monotonic() + timeout + while True: + job = self.get_status(job_id) + if job.status in _TERMINAL_STATES: + return job + if time.monotonic() >= deadline: + raise TimeoutError( + f"Import job {job_id} did not complete within {timeout}s " + f"(last status: {job.status})" + ) + time.sleep(poll_interval) diff --git a/pyproject.toml b/pyproject.toml index 140c3e9..06dfd8f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "awsysco" -version = "1.2.0" +version = "1.3.0" description = "Official Python SDK for the AWSYS.CO URL Shortener API" readme = "README.md" requires-python = ">=3.9" diff --git a/tests/test_analytics.py b/tests/test_analytics.py index 0afde12..91c333f 100644 --- a/tests/test_analytics.py +++ b/tests/test_analytics.py @@ -2,11 +2,13 @@ from __future__ import annotations +import asyncio import time -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock from awsysco import Client -from awsysco.models import ClickEvent, LinkStats +from awsysco.async_resources.analytics import AsyncAnalyticsResource +from awsysco.models import AggregateAnalytics, ClickEvent, LinkStats from awsysco.resources.analytics import AnalyticsResource @@ -92,6 +94,131 @@ def test_get_recent_clicks_handles_wrapped_response(self): assert isinstance(result[0], ClickEvent) +# --------------------------------------------------------------------------- +# Aggregate stats — mocked, tier-gated (free + pro) +# --------------------------------------------------------------------------- + +_FREE_AGGREGATE = { + "shortCode": "abc", + "fullPath": "abc", + "period": "30d", + "totalClicks": 120, + "uniqueVisitors": 80, + "clicksByDay": [ + {"date": "2026-01-01", "clicks": 60}, + {"date": "2026-01-02", "clicks": 60}, + ], + "countryBreakdown": {"US": 100, "CA": 20}, + "tierLimit": 30, + "tier": "free", + "upgradeForMore": { + "available": ["deviceBreakdown", "utmBreakdown", "hourBreakdown"], + "message": "Upgrade to Pro for device, UTM, and hourly breakdowns.", + }, +} + +_PRO_AGGREGATE = { + "shortCode": "abc", + "fullPath": "acme/abc", + "period": "30d", + "totalClicks": 5000, + "uniqueVisitors": 3200, + "clicksByDay": [{"date": "2026-01-01", "clicks": 5000}], + "countryBreakdown": {"US": 4000, "GB": 1000}, + "tierLimit": 365, + "tier": "pro", + "deviceBreakdown": {"mobile": 3000, "desktop": 1800, "tablet": 200}, + "referrerBreakdown": {"twitter.com": 1200}, + "browserBreakdown": {"Chrome": 3500}, + "osBreakdown": {"iOS": 2800}, + "sourceBreakdown": {"newsletter": 900}, + "hourBreakdown": [{"hour": 9, "clicks": 400}, {"hour": 10, "clicks": 600}], + "utmBreakdown": { + "sources": {"newsletter": 900}, + "mediums": {"email": 900}, + "campaigns": {"launch": 700}, + }, +} + + +def _make_aggregate_resource(payload): + http = MagicMock() + http.get.return_value = payload + return AnalyticsResource(http) + + +class TestAggregateAnalyticsSync: + def test_calls_aggregate_endpoint(self): + resource = _make_aggregate_resource(_FREE_AGGREGATE) + resource.get_aggregate_stats("abc") + resource._http.get.assert_called_once_with( + "/api/v1/links/abc/stats/aggregate", params=None + ) + + def test_passes_period_param(self): + resource = _make_aggregate_resource(_FREE_AGGREGATE) + resource.get_aggregate_stats("abc", period="30d") + resource._http.get.assert_called_once_with( + "/api/v1/links/abc/stats/aggregate", params={"period": "30d"} + ) + + def test_free_tier_upgrade_for_more(self): + result = _make_aggregate_resource(_FREE_AGGREGATE).get_aggregate_stats("abc") + assert isinstance(result, AggregateAnalytics) + assert result.tier == "free" + assert result.total_clicks == 120 + assert result.unique_visitors == 80 + assert result.country_breakdown == {"US": 100, "CA": 20} + assert len(result.clicks_by_day) == 2 + assert result.clicks_by_day[0].date == "2026-01-01" + # free tier omits the richer breakdowns + assert result.device_breakdown is None + assert result.utm_breakdown is None + # but surfaces the upgrade hint + assert result.upgrade_for_more is not None + assert "deviceBreakdown" in result.upgrade_for_more.available + + def test_pro_tier_device_breakdown(self): + result = _make_aggregate_resource(_PRO_AGGREGATE).get_aggregate_stats("abc") + assert result.tier == "pro" + assert result.full_path == "acme/abc" + assert result.device_breakdown is not None + assert result.device_breakdown.mobile == 3000 + assert result.device_breakdown.tablet == 200 + assert result.utm_breakdown.sources == {"newsletter": 900} + assert result.hour_breakdown[1].hour == 10 + assert result.upgrade_for_more is None + + +def _make_async_aggregate_resource(payload): + http = MagicMock() + http.get = AsyncMock(return_value=payload) + return AsyncAnalyticsResource(http) + + +class TestAggregateAnalyticsAsync: + def test_calls_aggregate_endpoint(self): + resource = _make_async_aggregate_resource(_FREE_AGGREGATE) + asyncio.run(resource.get_aggregate_stats("abc", period="7d")) + resource._http.get.assert_awaited_once_with( + "/api/v1/links/abc/stats/aggregate", params={"period": "7d"} + ) + + def test_free_tier_upgrade_for_more(self): + resource = _make_async_aggregate_resource(_FREE_AGGREGATE) + result = asyncio.run(resource.get_aggregate_stats("abc")) + assert isinstance(result, AggregateAnalytics) + assert result.tier == "free" + assert result.device_breakdown is None + assert result.upgrade_for_more.message.startswith("Upgrade to Pro") + + def test_pro_tier_device_breakdown(self): + resource = _make_async_aggregate_resource(_PRO_AGGREGATE) + result = asyncio.run(resource.get_aggregate_stats("abc")) + assert result.device_breakdown.desktop == 1800 + assert result.utm_breakdown.campaigns == {"launch": 700} + + # --------------------------------------------------------------------------- # Integration tests — require AWSYS_API_KEY # --------------------------------------------------------------------------- diff --git a/tests/test_imports.py b/tests/test_imports.py new file mode 100644 index 0000000..5e5eebb --- /dev/null +++ b/tests/test_imports.py @@ -0,0 +1,205 @@ +"""Unit tests for the Imports resource (sync + async). Fully mocked.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from awsysco.async_resources.imports import AsyncImportsResource +from awsysco.models import ImportJob +from awsysco.resources.imports import ImportsResource + +_JOB_ID = "imp_abc123" + + +def _job_payload(status="pending"): + return { + "id": _JOB_ID, + "userId": "user_42", + "provider": "bitly", + "status": status, + "scanOnly": False, + "targetNamespace": "acme", + "scopeFilter": None, + "counts": { + "fetched": 100, + "transformed": 100, + "written": 95, + "errored": 5, + }, + "errors": ["link 12 failed: invalid slug"], + "createdAt": "2026-01-01T00:00:00Z", + "updatedAt": "2026-01-01T00:05:00Z", + } + + +def _make_resource(): + http = MagicMock() + http.post.return_value = _job_payload() + http.get.return_value = _job_payload() + http.delete.return_value = _job_payload(status="cancelled") + return ImportsResource(http) + + +class TestImportsSync: + def test_start_posts_snake_case_body(self): + resource = _make_resource() + resource.start( + provider="bitly", + access_token="tok_123", + target_namespace="acme", + scan_only=True, + ) + resource._http.post.assert_called_once_with( + "/api/v1/imports", + json={ + "provider": "bitly", + "access_token": "tok_123", + "target_namespace": "acme", + "scan_only": True, + }, + ) + + def test_start_omits_none_optionals(self): + resource = _make_resource() + resource.start(provider="bitly", access_token="tok_123") + resource._http.post.assert_called_once_with( + "/api/v1/imports", + json={"provider": "bitly", "access_token": "tok_123"}, + ) + + def test_start_returns_import_job(self): + result = _make_resource().start(provider="bitly", access_token="tok") + assert isinstance(result, ImportJob) + assert result.id == _JOB_ID + assert result.provider == "bitly" + assert result.counts.written == 95 + assert result.counts.errored == 5 + assert result.errors == ["link 12 failed: invalid slug"] + + def test_get_status_calls_endpoint(self): + resource = _make_resource() + resource.get_status(_JOB_ID) + resource._http.get.assert_called_once_with(f"/api/v1/imports/{_JOB_ID}") + + def test_get_status_encodes_job_id(self): + resource = _make_resource() + resource.get_status("ns/slug") + resource._http.get.assert_called_once_with("/api/v1/imports/ns%2Fslug") + + def test_cancel_calls_delete(self): + resource = _make_resource() + result = resource.cancel(_JOB_ID) + resource._http.delete.assert_called_once_with(f"/api/v1/imports/{_JOB_ID}") + assert isinstance(result, ImportJob) + assert result.status == "cancelled" + + def test_list_unwraps_jobs_key(self): + resource = _make_resource() + resource._http.get.return_value = {"jobs": [_job_payload(), _job_payload()]} + result = resource.list() + resource._http.get.assert_called_once_with("/api/v1/imports", params=None) + assert len(result) == 2 + assert all(isinstance(j, ImportJob) for j in result) + + def test_list_with_limit(self): + resource = _make_resource() + resource._http.get.return_value = {"jobs": []} + resource.list(limit=5) + resource._http.get.assert_called_once_with( + "/api/v1/imports", params={"limit": 5} + ) + + def test_wait_for_completion_resolves_on_terminal(self): + resource = _make_resource() + # pending -> running -> completed + resource._http.get.side_effect = [ + _job_payload(status="pending"), + _job_payload(status="running"), + _job_payload(status="completed"), + ] + result = resource.wait_for_completion(_JOB_ID, poll_interval=0.0, timeout=5.0) + assert result.status == "completed" + assert resource._http.get.call_count == 3 + + def test_wait_for_completion_times_out(self): + resource = _make_resource() + resource._http.get.return_value = _job_payload(status="running") + with pytest.raises(TimeoutError): + resource.wait_for_completion(_JOB_ID, poll_interval=0.0, timeout=0.0) + + +def _make_async_resource(): + http = MagicMock() + http.post = AsyncMock(return_value=_job_payload()) + http.get = AsyncMock(return_value=_job_payload()) + http.delete = AsyncMock(return_value=_job_payload(status="cancelled")) + return AsyncImportsResource(http) + + +class TestImportsAsync: + def test_start_posts_snake_case_body(self): + resource = _make_async_resource() + asyncio.run( + resource.start( + provider="bitly", access_token="tok_123", scan_only=True + ) + ) + resource._http.post.assert_awaited_once_with( + "/api/v1/imports", + json={ + "provider": "bitly", + "access_token": "tok_123", + "scan_only": True, + }, + ) + + def test_start_returns_import_job(self): + resource = _make_async_resource() + result = asyncio.run(resource.start(provider="bitly", access_token="tok")) + assert isinstance(result, ImportJob) + assert result.counts.fetched == 100 + + def test_get_status_calls_endpoint(self): + resource = _make_async_resource() + asyncio.run(resource.get_status(_JOB_ID)) + resource._http.get.assert_awaited_once_with(f"/api/v1/imports/{_JOB_ID}") + + def test_cancel_calls_delete(self): + resource = _make_async_resource() + result = asyncio.run(resource.cancel(_JOB_ID)) + resource._http.delete.assert_awaited_once_with(f"/api/v1/imports/{_JOB_ID}") + assert result.status == "cancelled" + + def test_list_unwraps_jobs_key(self): + resource = _make_async_resource() + resource._http.get = AsyncMock(return_value={"jobs": [_job_payload()]}) + result = asyncio.run(resource.list()) + assert len(result) == 1 + assert isinstance(result[0], ImportJob) + + def test_wait_for_completion_resolves_on_terminal(self): + resource = _make_async_resource() + resource._http.get = AsyncMock( + side_effect=[ + _job_payload(status="pending"), + _job_payload(status="completed"), + ] + ) + result = asyncio.run( + resource.wait_for_completion(_JOB_ID, poll_interval=0.0, timeout=5.0) + ) + assert result.status == "completed" + assert resource._http.get.await_count == 2 + + def test_wait_for_completion_times_out(self): + resource = _make_async_resource() + resource._http.get = AsyncMock(return_value=_job_payload(status="running")) + with pytest.raises(TimeoutError): + asyncio.run( + resource.wait_for_completion( + _JOB_ID, poll_interval=0.0, timeout=0.0 + ) + )