Source code for gyoza.server.scheduler.scheduler

"""Scheduler for allocating OpRuns to workers."""

from gyoza.server.op_run.repository import OpRunRepository
from gyoza.server.scheduler.strategies.base import SchedulingStrategy
from gyoza.server.worker import WorkerOpRun
from gyoza.server.worker.pool import WorkerPool


[docs] class Scheduler: """Orchestrates run allocation using a pluggable strategy. The scheduler provides the repository and active workers to the strategy, which decides how to allocate runs. Parameters ---------- strategy : SchedulingStrategy The scheduling algorithm to use. op_run_repo : OpRunRepository Repository for querying OpRuns. worker_pool : WorkerPool Pool for fetching active workers. worker_timeout_seconds : int Timeout in seconds for considering workers active. """ def __init__( self, strategy: SchedulingStrategy, op_run_repo: OpRunRepository, worker_pool: WorkerPool, worker_timeout_seconds: int = 60, ) -> None: self._strategy = strategy self._op_run_repo = op_run_repo self._worker_pool = worker_pool self._worker_timeout_seconds = worker_timeout_seconds
[docs] def allocate_for_worker(self, worker_id: str) -> list[WorkerOpRun]: """Get OpRuns that should be allocated to a worker. Provides the repository and active workers to the strategy, which determines which runs to allocate. Then transitions the selected runs from PENDING to ASSIGNED state. Parameters ---------- worker_id : str ID of the worker requesting work. Returns ------- list[WorkerOpRun] OpRuns allocated to the worker. Empty if none available or worker not found. """ workers = self._worker_pool.list_active(self._worker_timeout_seconds) allocated_runs = self._strategy.allocate(worker_id, self._op_run_repo, workers) # Assign each allocated run to the worker for worker_op_run in allocated_runs: run = self._op_run_repo.get(worker_op_run.id) run.assign(worker_id) self._op_run_repo.save(run) return allocated_runs