Lightning.Runs.Query (Lightning v2.13.3)

View Source

Query functions for working with Runs

Summary

Functions

Query to return available runs that respect concurrency limits.

Query to return runs that are eligible for claiming.

Applies concurrency window functions to workflow-limited run data.

Return all runs that have been claimed by a worker before the earliest acceptable start time (determined by the run options and grace period) but are still incomplete.

Query to return workflow-limited runs with priority-based ranking.

Functions

available_within_concurrency_limits(in_progress_window_query \\ in_progress_window())

Query to return available runs that respect concurrency limits.

This function combines run state filtering with concurrency enforcement by:

  1. Joining runs with their windowed concurrency data from in_progress_window/0
  2. Filtering for runs in :available state only
  3. Ensuring runs don't exceed their concurrency limits (workflow or project level)
  4. Ordering by priority and insertion time for fair processing

Concurrency Logic

A run is included if:

  • It's in :available state (not claimed, started, or finished)
  • Either no concurrency limit is set (concurrency is nil) OR
  • The run's position within its concurrency group (row_number) is within the limit

Returns run IDs that can be safely claimed without violating concurrency constraints.

eligible_for_claim()

@spec eligible_for_claim() :: Ecto.Queryable.t()

Query to return runs that are eligible for claiming.

This is the main function used by other parts of the system to get runs ready for execution. It implements a performance-optimized approach to run claiming that ensures fairness across workflows while respecting concurrency limits.

Algorithm

  1. Workflow Limiting: Limits the number of runs per workflow to prevent any single workflow from dominating the processing queue
  2. Window Processing: Applies row numbering within concurrency partitions (workflow or project level) on the workflow-limited dataset
  3. Concurrency Enforcement: Uses the row numbers to enforce concurrency limits during run claiming
  4. Availability Filtering: Filters for available runs within concurrency limits
  5. Priority Ordering: Orders results by priority and insertion time

Implementation

The function uses a multi-step approach for performance optimization:

  1. Pre-filtering: Uses workflow_limited_runs/1 to limit runs per workflow (configurable via :per_workflow_claim_limit) preventing any single workflow from dominating the claim queue while ensuring fairness
  2. Window Functions: Applies in_progress_window/0 to calculate row numbers within concurrency partitions on the smaller, pre-filtered dataset
  3. Final Filtering: Selects only available runs that fall within their concurrency limits (where row_number <= concurrency)

Concurrency Logic

Workflow concurrency takes precedence over project concurrency when both are set. This ensures that workflow-level limits are respected first, with project-level limits serving as a fallback.

Performance Notes

The dataset is first limited per workflow (default: 50 runs) to manage the size of data processed by expensive window functions, significantly improving query performance on large datasets.

Note

The default :per_workflow_claim_limit is 50. This can be configured via the PER_WORKFLOW_CLAIM_LIMIT environment variable. The value must be larger than the max concurrency of any individual workflow.

Returns runs ordered by priority and insertion time that can be safely claimed without violating concurrency limits.

in_progress_window()

@spec in_progress_window() :: Ecto.Queryable.t()

Applies concurrency window functions to workflow-limited run data.

This function takes the output from workflow_limited_runs/1 and applies SQL window functions to calculate row numbers within concurrency partitions.

Partitioning Logic

Runs are partitioned (grouped) by workflow or project for row numbering. Workflow concurrency takes precedence over project concurrency when both are set. Within each partition, runs are ordered by priority and insertion time.

Return Fields

Returns a query that selects:

  • id - the run ID
  • state - the current state of the run
  • row_number - sequential number within the concurrency partition
  • project_id - the project ID
  • concurrency - the maximum concurrent runs allowed (workflow or project level)
  • inserted_at - when the run was created
  • priority - the run's priority level

The row_number field is used downstream to enforce concurrency limits by comparing it against the concurrency value.

lost()

@spec lost() :: Ecto.Queryable.t()

Return all runs that have been claimed by a worker before the earliest acceptable start time (determined by the run options and grace period) but are still incomplete.

This indicates that we may have lost contact with the worker that was responsible for executing the run.

lost_steps()

@spec lost_steps() :: Ecto.Queryable.t()

workflow_limited_runs(per_workflow_limit \\ 50)

@spec workflow_limited_runs(pos_integer()) :: Ecto.Queryable.t()

Query to return workflow-limited runs with priority-based ranking.

This function creates a dataset where each workflow contributes at most per_workflow_limit runs, ranked by priority and insertion time. This prevents any single workflow from dominating the claim queue while ensuring fairness.

Returns runs with workflow ranking information needed for further processing.