Source code for gyoza.deployment.deployer

"""Full deployment pipeline for gyoza operations.

Orchestrates: build image → push image → load OpDefinition → upsert on server.
"""

from __future__ import annotations

import os
import re
import subprocess
from collections.abc import Callable
from pathlib import Path
from typing import Any

from gyoza.client._client import GyozaClient
from gyoza.deployment.builder import build as docker_build
from gyoza.deployment.config import load_config
from gyoza.deployment.io_extractor import extract_io_specs
from gyoza.models.op_definition import OpDefinition
from gyoza.models.resources import Constraints, EventDelivery, RetryPolicy

_DEFAULT_OP_FILE = "main.py"
_REQUIRED_KEYS = frozenset({"name", "version", "image"})
_ENV_VAR_RE = re.compile(
    r"\$\{(?P<name>[A-Za-z_][A-Za-z0-9_]*)(?::-(?P<default>[^}]*))?\}"
)

StepCallback = Callable[[str], None]


def _resolve_env_vars(value: str) -> str:
    """Substitute ``${VAR:-default}`` placeholders from the environment.

    Parameters
    ----------
    value : str
        Raw string that may contain env-var placeholders.

    Returns
    -------
    str
        String with environment values substituted.

    Raises
    ------
    ValueError
        If a required env var (no default) is not set.
    """

    def _replace(match: re.Match[str]) -> str:
        name = match.group("name")
        default = match.group("default")
        env = os.environ.get(name)
        if env is not None:
            return env
        if default is not None:
            return default
        msg = f"Environment variable '{name}' is not set and has no default"
        raise ValueError(msg)

    return _ENV_VAR_RE.sub(_replace, value)


def _load_deploy_config(project_path: Path) -> dict[str, Any]:
    """Load and validate the gyoza config for deployment.

    Parameters
    ----------
    project_path : Path
        Root directory containing the gyoza config.

    Returns
    -------
    dict[str, Any]
        Parsed YAML configuration.

    Raises
    ------
    FileNotFoundError
        If the config file is not found.
    ValueError
        If required keys are missing.
    """
    data = load_config(project_path)

    missing = _REQUIRED_KEYS - data.keys()
    if missing:
        msg = f"Missing required keys in gyoza config: {', '.join(sorted(missing))}"
        raise ValueError(msg)

    if "deploy" not in data:
        msg = "'deploy' key is required in gyoza config for deployment"
        raise ValueError(msg)

    if "gyoza_server_base_url" not in data["deploy"]:
        msg = "'deploy.gyoza_server_base_url' is required in gyoza config"
        raise ValueError(msg)

    return data


def _build_op_definition(project_path: Path, data: dict[str, Any]) -> OpDefinition:
    """Build an OpDefinition from the gyoza config and Python IO file.

    Parameters
    ----------
    project_path : Path
        Root directory of the project.
    data : dict[str, Any]
        Parsed gyoza.yml configuration.

    Returns
    -------
    OpDefinition
        Fully populated operation definition.
    """
    op_file = project_path / data.get("gyoza_op_file", _DEFAULT_OP_FILE)
    input_specs, output_specs = extract_io_specs(op_file)

    raw = data.get("constraints") or {}
    raw_retry = data.get("retry_policy") or {}
    raw_events = data.get("event_delivery") or {}

    return OpDefinition.create(
        id=data["name"],
        version=data["version"],
        image=data["image"],
        input_specs=input_specs,
        output_specs=output_specs,
        constraints=Constraints(
            cpu=raw.get("cpu"),
            ram_mb=raw.get("ram_mb"),
            vram_mb=raw.get("vram_mb"),
        ),
        retry_policy=RetryPolicy(
            max_attempts=raw_retry.get("max_attempts", 1),
            backoff_ms=raw_retry.get("backoff_ms", 0),
        ),
        event_delivery=EventDelivery(
            topic=raw_events.get("topic", ""),
            attributes=raw_events.get("attributes", {}),
        ),
    )


[docs] def deploy( project_path: Path | str, *, on_step: StepCallback | None = None, ) -> dict[str, Any]: """Execute the full deployment pipeline for a gyoza operation. Steps executed in order: 1. **Build** the Docker image via ``docker compose build``. 2. **Push** the image to the container registry. 3. **Load** the OpDefinition from the YAML config and Python IO file. 4. **Upsert** the OpDefinition on the gyoza server. Parameters ---------- project_path : Path | str Directory containing ``gyoza.yml`` and the operation source files. on_step : StepCallback | None Optional callback invoked with a human-readable message at each pipeline step. Returns ------- dict[str, Any] Server response from the upsert operation. Raises ------ FileNotFoundError If required files are missing. ValueError If configuration is invalid or env vars are unset. subprocess.CalledProcessError If Docker build or push fails. httpx.HTTPStatusError If the server upsert call fails. """ log: StepCallback = on_step or (lambda _: None) project_path = Path(project_path).resolve() data = _load_deploy_config(project_path) image: str = data["image"] name: str = data["name"] version: str = data["version"] log(f"🚀 Starting deploy for {name} v{version} ({image})") # 1. Build log("🔨 [1/4] Building Docker image...") docker_build(project_path) log("✅ [1/4] Docker image built") # 2. Push log(f"📤 [2/4] Pushing {image}...") subprocess.run(["docker", "push", image], check=True, text=True) # noqa: S603 log("✅ [2/4] Image pushed") # 3. Load OpDefinition log("📋 [3/4] Loading OpDefinition...") op_def = _build_op_definition(project_path, data) log("✅ [3/4] OpDefinition loaded") # 4. Upsert on server deploy_cfg: dict[str, Any] = data["deploy"] base_url = _resolve_env_vars(str(deploy_cfg["gyoza_server_base_url"])) api_key_raw = deploy_cfg.get("gyoza_server_auth_token") api_key = _resolve_env_vars(str(api_key_raw)) if api_key_raw else None log(f"🌐 [4/4] Upserting OpDefinition on {base_url}...") with GyozaClient(base_url=base_url, api_key=api_key) as client: result = client.upsert_definition(op_def.to_dict()) log("✅ [4/4] OpDefinition upserted") log("🎉 Deploy completed!") return result