Metadata-Version: 2.4
Name: ingress-recipient
Version: 1.0.0
Summary: Python SDK for receiving data from vikdata ingress cores
Project-URL: Homepage, https://github.com/vikmar/vikdata
Project-URL: Documentation, https://docs.vikmar.io/ingress/recipient/python
Project-URL: Repository, https://github.com/vikmar/vikdata
Project-URL: Issues, https://github.com/vikmar/vikdata/issues
Author-email: Vikmar <hello@vikmar.io>
License: MIT
Keywords: fastapi,ingress,pydantic,recipient,sdk,vikdata
Classifier: Development Status :: 4 - Beta
Classifier: Framework :: FastAPI
Classifier: Framework :: Pydantic
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Requires-Python: >=3.11
Requires-Dist: pydantic>=2.10.0
Provides-Extra: dev
Requires-Dist: fastapi>=0.115.0; extra == 'dev'
Requires-Dist: httpx>=0.27.0; extra == 'dev'
Requires-Dist: mypy>=1.13.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.24.0; extra == 'dev'
Requires-Dist: pytest-cov>=6.0.0; extra == 'dev'
Requires-Dist: pytest>=8.3.0; extra == 'dev'
Requires-Dist: ruff>=0.8.0; extra == 'dev'
Provides-Extra: fastapi
Requires-Dist: fastapi>=0.115.0; extra == 'fastapi'
Description-Content-Type: text/markdown

# ingress-recipient

Python SDK for receiving data from vikdata ingress cores.

This SDK provides server-side dependencies and types for services that receive processed data FROM ingress cores after the 5-stage pipeline (auth, validate, transform, route, dispatch).

## Installation

```bash
pip install ingress-recipient
```

Or with FastAPI support:

```bash
pip install ingress-recipient[fastapi]
```

## Quick Start

```python
from ingress_recipient import RecipientData, validate_recipient_data
from fastapi import APIRouter, Depends

router = APIRouter()

@router.post("/destinations/telemetry")
async def receive_telemetry(
    data: RecipientData = Depends(validate_recipient_data),
):
    # data is fully typed and validated
    print(f"Received data from channel {data.channel_id}")
    print(f"Tenant: {data.tenant_id}")
    print(f"Transformations: {data.metadata.transformations_applied}")

    # Process data...
    await save_telemetry(data)

    return {"status": "accepted"}
```

## Contract

### RecipientData

The standardized contract that ALL recipients must accept:

```python
class RecipientData(BaseModel):
    channel_id: str           # Channel identifier
    tenant_id: UUID           # Tenant UUID
    timestamp: datetime       # Original data timestamp
    data: dict[str, Any]      # Arbitrary JSON payload
    metadata: RecipientMetadata

class RecipientMetadata(BaseModel):
    source_ip: str | None                    # Optional source IP
    ingress_timestamp: datetime              # Ingress receipt timestamp
    transformations_applied: list[str] | None  # Transformations applied
```

**IMPORTANT**: All field names use `snake_case` (not camelCase) to match backend conventions.

## API Reference

### FastAPI Dependency

#### `validate_recipient_data(data: RecipientData)`

FastAPI dependency that validates incoming `RecipientData`.

**Behavior:**
- **On success**: Returns validated `RecipientData`
- **On failure**: Returns 422 Unprocessable Entity (automatic via Pydantic)

**Example:**

```python
from ingress_recipient import RecipientData, validate_recipient_data
from fastapi import Depends

@router.post("/destinations/telemetry")
async def receive_telemetry(
    data: RecipientData = Depends(validate_recipient_data),
):
    # data is fully typed and validated
    await process_telemetry(data)
    return {"status": "accepted"}
```

#### `ValidatedRecipientData` (Type Alias)

Type alias for cleaner endpoint signatures:

```python
from ingress_recipient import ValidatedRecipientData

@router.post("/destinations/telemetry")
async def receive_telemetry(data: ValidatedRecipientData):
    # data is automatically validated
    await process_telemetry(data)
    return {"status": "accepted"}
```

### Validation Utilities

#### `validate_recipient_data(data: dict)`

Validates data against the `RecipientData` model. Raises `ValidationError` if validation fails.

**Example:**

```python
from ingress_recipient import validate_data
from pydantic import ValidationError

try:
    validated = validate_data(request_body)
    # Use validated data...
except ValidationError as e:
    print(f"Validation failed: {e}")
```

#### `safe_validate_recipient_data(data: dict)`

Safe validation that returns a tuple instead of raising.

**Example:**

```python
from ingress_recipient import safe_validate_recipient_data

validated, errors = safe_validate_recipient_data(request_body)

if validated:
    print(f"Valid data: {validated.channel_id}")
else:
    print(f"Validation errors: {errors}")
```

## Validation Rules

1. **channel_id**: Non-empty string (min length 1, no whitespace-only)
2. **tenant_id**: Valid UUID format
3. **timestamp**: Valid datetime (ISO 8601 or datetime object)
4. **data**: Any valid dict
5. **metadata.ingress_timestamp**: Valid datetime (required)
6. **metadata.source_ip**: Optional string
7. **metadata.transformations_applied**: Optional list of strings

## Error Responses

When validation fails, FastAPI automatically returns 422 Unprocessable Entity:

```json
{
  "detail": [
    {
      "loc": ["body", "channel_id"],
      "msg": "String should have at least 1 character",
      "type": "string_too_short"
    }
  ]
}
```

## Field Naming Convention

**IMPORTANT**: All field names use `snake_case` (not camelCase) to match backend conventions and eliminate case transformation bugs.

```python
# ✅ Correct - snake_case
data = {
    "channel_id": "ch_test",
    "tenant_id": "123e4567-e89b-12d3-a456-426614174000",
    "ingress_timestamp": "2025-12-15T12:00:00Z",
}

# ❌ Incorrect - camelCase (will fail validation)
data = {
    "channelId": "ch_test",
    "tenantId": "123e4567-e89b-12d3-a456-426614174000",
    "ingressTimestamp": "2025-12-15T12:00:00Z",
}
```

## Complete Example

```python
from datetime import UTC, datetime
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from ingress_recipient import RecipientData, RecipientResponse, validate_recipient_data

from .database import get_db
from .models import Telemetry
from .events import event_bus

router = APIRouter()


@router.post("/destinations/telemetry", response_model=RecipientResponse)
async def receive_telemetry(
    data: RecipientData = Depends(validate_recipient_data),
    db: AsyncSession = Depends(get_db),
):
    """Receive telemetry data from ingress core.

    This endpoint receives processed telemetry data after it has gone
    through the ingress pipeline (auth, validate, transform, route, dispatch).
    """

    print("Received telemetry data:")
    print(f"  Channel: {data.channel_id}")
    print(f"  Tenant: {data.tenant_id}")
    print(f"  Timestamp: {data.timestamp}")
    print(f"  Source IP: {data.metadata.source_ip}")
    print(f"  Ingress timestamp: {data.metadata.ingress_timestamp}")
    print(f"  Transformations: {data.metadata.transformations_applied}")
    print(f"  Data: {data.data}")

    try:
        # Extract vessel telemetry from data
        vessel_id = data.data.get("vessel_id")
        latitude = data.data.get("latitude")
        longitude = data.data.get("longitude")
        speed = data.data.get("speed")

        if not all([vessel_id, latitude, longitude]):
            raise ValueError("Missing required telemetry fields")

        # Save to database
        telemetry = Telemetry(
            tenant_id=data.tenant_id,
            channel_id=data.channel_id,
            vessel_id=vessel_id,
            latitude=latitude,
            longitude=longitude,
            speed=speed,
            timestamp=data.timestamp,
            ingress_timestamp=data.metadata.ingress_timestamp,
        )

        db.add(telemetry)
        await db.commit()

        # Publish event for reactive rules
        await event_bus.publish(
            "telemetry.received",
            {
                "tenant_id": str(data.tenant_id),
                "vessel_id": vessel_id,
                "position": {"latitude": latitude, "longitude": longitude},
                "speed": speed,
            },
        )

        return RecipientResponse(
            status="accepted", message="Telemetry processed successfully"
        )

    except Exception as e:
        print(f"Failed to process telemetry: {e}")

        return RecipientResponse(
            status="rejected", message=f"Failed to process telemetry: {str(e)}"
        )
```

## Testing

```bash
# Install with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=ingress_recipient --cov-report=html

# Run mypy type checking
mypy src/ingress_recipient

# Run ruff linting
ruff check src tests
```

## Type Safety

The SDK is fully typed with strict mypy checking:

```python
from ingress_recipient import RecipientData, RecipientMetadata

def handle_telemetry(data: RecipientData) -> None:
    # All fields are fully typed
    channel_id: str = data.channel_id
    tenant_id: UUID = data.tenant_id
    timestamp: datetime = data.timestamp
    payload: dict[str, Any] = data.data
    metadata: RecipientMetadata = data.metadata

    # mypy will catch any type errors
```

## Related Packages

- **[ingress-recipient (TypeScript)](../../typescript/ingress-recipient)** - TypeScript version of this SDK
- **[@vikdata/ingress-edge](../../typescript/ingress-edge)** - Edge SDK for sending data TO ingress cores
- **[fishinfo-sdk](../fishinfo)** - fishinfo Python SDK

## Architecture

This SDK is part of the vikdata ingress platform:

```
IoT Devices/Clients
    ↓
[ingress-edge SDK] (auto-batching, retry, offline buffer)
    ↓
Ingress Core (Go)
    ├─ Auth Stage
    ├─ Validate Stage
    ├─ Transform Stage
    ├─ Route Stage
    └─ Dispatch Stage
    ↓
[ingress-recipient SDK] ← YOU ARE HERE
    ↓
Recipient Services (fishinfo, TimescaleDB, webhooks, etc.)
```

## Development

```bash
# Clone the repository
git clone https://github.com/vikmar/vikdata.git
cd vikdata/sdk/python/ingress-recipient

# Create virtual environment
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Run type checking
mypy src/ingress_recipient

# Run linting
ruff check src tests

# Format code
ruff format src tests
```

## License

MIT

## Support

- **Documentation**: https://docs.vikmar.io/ingress/recipient/python
- **Issues**: https://github.com/vikmar/vikdata/issues
- **Discussions**: https://github.com/vikmar/vikdata/discussions
