Lightning.WorkflowVersions (Lightning v2.15.0-pre5)
View SourceProvenance + comparison helpers for workflow heads.
- Persists append-only rows in
workflow_versionsand maintains a materializedworkflows.version_historyarray (12-char lowercase hex). record_version/3andrecord_versions/3are idempotent (ON CONFLICT DO NOTHING) and concurrency-safe (row lock, append without dupes).history_for/1andlatest_hash/1read the array first; when empty they fall back to the table with deterministic ordering by(inserted_at, id).reconcile_history!/1rebuilds the array from provenance rows.classify/2andclassify_with_delta/2compare two histories (same/ahead/diverged).
Validation & invariants:
hashmust match^[a-f0-9]{12}$;sourcemust be"app"or"cli";(workflow_id, hash)is unique.
Designed for fast diffs and consistent “latest head” lookups.
Summary
Functions
Compares two histories and returns only the relation (no counts).
Compares two histories and returns the relation with a delta.
Generates a deterministic hash for a workflow based on its structure.
Returns the ordered history of heads for a workflow.
Returns the latest head for a workflow (or nil if none).
Rebuilds and persists workflow.version_history from provenance rows.
Records a single workflow head hash with provenance and keeps
workflows.version_history in sync.
Types
@type hash() :: String.t()
Functions
Compares two histories and returns only the relation (no counts).
Wrapper around classify_with_delta/2.
Examples
iex> WorkflowVersions.classify(~w(a b), ~w(a b))
:same
iex> WorkflowVersions.classify(~w(a b), ~w(a b c))
{:ahead, :right}
iex> WorkflowVersions.classify(~w(a x), ~w(a y))
:diverged
@spec classify_with_delta([hash()], [hash()]) :: {:same, 0} | {:ahead, :left, non_neg_integer()} | {:ahead, :right, non_neg_integer()} | {:diverged, non_neg_integer()}
Compares two histories and returns the relation with a delta.
Possible results:
{:same, 0}— sequences are identical{:ahead, :right, n}—rightstrictly extendsleftbynitems{:ahead, :left, n}—leftstrictly extendsrightbynitems{:diverged, k}— sequences share a common prefix of lengthk, then diverge
Examples
iex> WorkflowVersions.classify_with_delta(~w(a b), ~w(a b c d))
{:ahead, :right, 2}
iex> WorkflowVersions.classify_with_delta(~w(a b x), ~w(a b y))
{:diverged, 2}
@spec generate_hash(Lightning.Workflows.Workflow.t() | map()) :: binary()
Generates a deterministic hash for a workflow based on its structure.
Algorithm:
- Create a list
- Add the workflow name to the start of the list
- For each node (trigger, job and edge) in a consistent order
- Get a sorted list of keys
- Filter out ignored keys
- Add each key and value to the list
- Join the list into a string, no separator
- Hash the list with SHA 256
- Truncate the resulting string to 12 characters
Parameters
workflow— the workflow struct to hash
Returns
- A 12-character lowercase hex string
Examples
iex> WorkflowVersions.generate_hash(workflow)
"a1b2c3d4e5f6"
Returns the ordered history of heads for a workflow.
If workflow.version_history is present and non-empty, that array is returned.
Otherwise, the function falls back to workflow_versions ordered by
inserted_at ASC, id ASC to provide deterministic ordering for equal timestamps.
Examples
iex> WorkflowVersions.history_for(%Workflow{version_history: ["a", "b"]})
["a", "b"]
iex> WorkflowVersions.history_for(wf) # when array is empty/nil
["a", "b", "c"]
@spec latest_hash(Lightning.Workflows.Workflow.t()) :: hash() | nil
Returns the latest head for a workflow (or nil if none).
Uses workflow.version_history when populated (taking the last element).
If empty/nil, reads from workflow_versions with
ORDER BY inserted_at DESC, id DESC LIMIT 1 for deterministic results.
Examples
iex> WorkflowVersions.latest_hash(%Workflow{version_history: ["a", "b"]})
"b"
iex> WorkflowVersions.latest_hash(wf_without_versions)
nil
@spec reconcile_history!(Lightning.Workflows.Workflow.t()) :: Lightning.Workflows.Workflow.t()
Rebuilds and persists workflow.version_history from provenance rows.
This is useful for maintenance/migrations when the array drifts from the
workflow_versions table. Ordering is inserted_at ASC, id ASC.
Returns
%Workflow{}— updated workflow with a rebuiltversion_history
Examples
iex> wf = WorkflowVersions.reconcile_history!(wf)
%Workflow{version_history: [...]}
@spec record_version(Lightning.Workflows.Workflow.t(), hash(), String.t()) :: {:ok, Lightning.Workflows.Workflow.t()} | {:error, term()}
Records a single workflow head hash with provenance and keeps
workflows.version_history in sync.
This operation is idempotent and concurrency-safe:
it inserts into workflow_versions with ON CONFLICT DO NOTHING, then
locks the workflow row (FOR UPDATE) and appends hash to the array only
if it is not already present.
Parameters
workflow— the workflow owning the historyhash— 12-char lowercase hex (e.g.,"deadbeefcafe")source—"app"or"cli"(defaults to"app")
Returns
{:ok, %Workflow{}}— workflow (possibly unchanged) with an updatedversion_historyif a newhashwas appended{:error, reason}— database error
Examples
iex> WorkflowVersions.record_version(wf, "deadbeefcafe", "app")
{:ok, %Workflow{version_history: [..., "deadbeefcafe"]}}
iex> WorkflowVersions.record_version(wf, "NOT_HEX", "app")
{:error, :invalid_input}