Lightning.Runs.Query (Lightning v2.13.3)
View SourceQuery functions for working with Runs
Summary
Functions
Query to return available runs that respect concurrency limits.
Query to return runs that are eligible for claiming.
Applies concurrency window functions to workflow-limited run data.
Return all runs that have been claimed by a worker before the earliest acceptable start time (determined by the run options and grace period) but are still incomplete.
Query to return workflow-limited runs with priority-based ranking.
Functions
Query to return available runs that respect concurrency limits.
This function combines run state filtering with concurrency enforcement by:
- Joining runs with their windowed concurrency data from
in_progress_window/0
- Filtering for runs in
:available
state only - Ensuring runs don't exceed their concurrency limits (workflow or project level)
- Ordering by priority and insertion time for fair processing
Concurrency Logic
A run is included if:
- It's in
:available
state (not claimed, started, or finished) - Either no concurrency limit is set (
concurrency
is nil) OR - The run's position within its concurrency group (
row_number
) is within the limit
Returns run IDs that can be safely claimed without violating concurrency constraints.
@spec eligible_for_claim() :: Ecto.Queryable.t()
Query to return runs that are eligible for claiming.
This is the main function used by other parts of the system to get runs ready for execution. It implements a performance-optimized approach to run claiming that ensures fairness across workflows while respecting concurrency limits.
Algorithm
- Workflow Limiting: Limits the number of runs per workflow to prevent any single workflow from dominating the processing queue
- Window Processing: Applies row numbering within concurrency partitions (workflow or project level) on the workflow-limited dataset
- Concurrency Enforcement: Uses the row numbers to enforce concurrency limits during run claiming
- Availability Filtering: Filters for available runs within concurrency limits
- Priority Ordering: Orders results by priority and insertion time
Implementation
The function uses a multi-step approach for performance optimization:
- Pre-filtering: Uses
workflow_limited_runs/1
to limit runs per workflow (configurable via:per_workflow_claim_limit
) preventing any single workflow from dominating the claim queue while ensuring fairness - Window Functions: Applies
in_progress_window/0
to calculate row numbers within concurrency partitions on the smaller, pre-filtered dataset - Final Filtering: Selects only available runs that fall within their
concurrency limits (where
row_number <= concurrency
)
Concurrency Logic
Workflow concurrency takes precedence over project concurrency when both are set. This ensures that workflow-level limits are respected first, with project-level limits serving as a fallback.
Performance Notes
The dataset is first limited per workflow (default: 50 runs) to manage the size of data processed by expensive window functions, significantly improving query performance on large datasets.
Note
The default :per_workflow_claim_limit
is 50.
This can be configured via the PER_WORKFLOW_CLAIM_LIMIT
environment variable.
The value must be larger than the max concurrency of any individual workflow.
Returns runs ordered by priority and insertion time that can be safely claimed without violating concurrency limits.
@spec in_progress_window() :: Ecto.Queryable.t()
Applies concurrency window functions to workflow-limited run data.
This function takes the output from workflow_limited_runs/1
and applies
SQL window functions to calculate row numbers within concurrency partitions.
Partitioning Logic
Runs are partitioned (grouped) by workflow or project for row numbering. Workflow concurrency takes precedence over project concurrency when both are set. Within each partition, runs are ordered by priority and insertion time.
Return Fields
Returns a query that selects:
id
- the run IDstate
- the current state of the runrow_number
- sequential number within the concurrency partitionproject_id
- the project IDconcurrency
- the maximum concurrent runs allowed (workflow or project level)inserted_at
- when the run was createdpriority
- the run's priority level
The row_number
field is used downstream to enforce concurrency limits by
comparing it against the concurrency
value.
@spec lost() :: Ecto.Queryable.t()
Return all runs that have been claimed by a worker before the earliest acceptable start time (determined by the run options and grace period) but are still incomplete.
This indicates that we may have lost contact with the worker that was responsible for executing the run.
@spec lost_steps() :: Ecto.Queryable.t()
@spec workflow_limited_runs(pos_integer()) :: Ecto.Queryable.t()
Query to return workflow-limited runs with priority-based ranking.
This function creates a dataset where each workflow contributes at most
per_workflow_limit
runs, ranked by priority and insertion time. This prevents
any single workflow from dominating the claim queue while ensuring fairness.
Returns runs with workflow ranking information needed for further processing.