Source code for gyoza.server.scheduler.scheduler
"""Scheduler for allocating OpRuns to workers."""
from gyoza.server.op_run.repository import OpRunRepository
from gyoza.server.scheduler.strategies.base import SchedulingStrategy
from gyoza.server.worker import WorkerOpRun
from gyoza.server.worker.pool import WorkerPool
[docs]
class Scheduler:
"""Orchestrates run allocation using a pluggable strategy.
The scheduler provides the repository and active workers to the strategy,
which decides how to allocate runs.
Parameters
----------
strategy : SchedulingStrategy
The scheduling algorithm to use.
op_run_repo : OpRunRepository
Repository for querying OpRuns.
worker_pool : WorkerPool
Pool for fetching active workers.
worker_timeout_seconds : int
Timeout in seconds for considering workers active.
"""
def __init__(
self,
strategy: SchedulingStrategy,
op_run_repo: OpRunRepository,
worker_pool: WorkerPool,
worker_timeout_seconds: int = 60,
) -> None:
self._strategy = strategy
self._op_run_repo = op_run_repo
self._worker_pool = worker_pool
self._worker_timeout_seconds = worker_timeout_seconds
[docs]
def allocate_for_worker(self, worker_id: str) -> list[WorkerOpRun]:
"""Get OpRuns that should be allocated to a worker.
Provides the repository and active workers to the strategy,
which determines which runs to allocate. Then transitions
the selected runs from PENDING to ASSIGNED state.
Parameters
----------
worker_id : str
ID of the worker requesting work.
Returns
-------
list[WorkerOpRun]
OpRuns allocated to the worker. Empty if none available
or worker not found.
"""
workers = self._worker_pool.list_active(self._worker_timeout_seconds)
allocated_runs = self._strategy.allocate(worker_id, self._op_run_repo, workers)
# Assign each allocated run to the worker
for worker_op_run in allocated_runs:
run = self._op_run_repo.get(worker_op_run.id)
run.assign(worker_id)
self._op_run_repo.save(run)
return allocated_runs