Lightning.WorkflowVersions (Lightning v2.14.5-pre1)
View SourceProvenance + comparison helpers for workflow heads.
- Persists append-only rows in
workflow_versions
and maintains a materializedworkflows.version_history
array (12-char lowercase hex). record_version/3
andrecord_versions/3
are idempotent (ON CONFLICT DO NOTHING
) and concurrency-safe (row lock, append without dupes).history_for/1
andlatest_hash/1
read the array first; when empty they fall back to the table with deterministic ordering by(inserted_at, id)
.reconcile_history!/1
rebuilds the array from provenance rows.classify/2
andclassify_with_delta/2
compare two histories (same/ahead/diverged).
Validation & invariants:
hash
must match^[a-f0-9]{12}$
;source
must be"app"
or"cli"
;(workflow_id, hash)
is unique.
Designed for fast diffs and consistent “latest head” lookups.
Summary
Functions
Compares two histories and returns only the relation (no counts).
Compares two histories and returns the relation with a delta.
Generates a deterministic hash for a workflow based on its structure.
Returns the ordered history of heads for a workflow.
Returns the latest head for a workflow (or nil
if none).
Rebuilds and persists workflow.version_history
from provenance rows.
Records a single workflow head hash
with provenance and keeps
workflows.version_history
in sync.
Types
@type hash() :: String.t()
Functions
Compares two histories and returns only the relation (no counts).
Wrapper around classify_with_delta/2
.
Examples
iex> WorkflowVersions.classify(~w(a b), ~w(a b))
:same
iex> WorkflowVersions.classify(~w(a b), ~w(a b c))
{:ahead, :right}
iex> WorkflowVersions.classify(~w(a x), ~w(a y))
:diverged
@spec classify_with_delta([hash()], [hash()]) :: {:same, 0} | {:ahead, :left, non_neg_integer()} | {:ahead, :right, non_neg_integer()} | {:diverged, non_neg_integer()}
Compares two histories and returns the relation with a delta.
Possible results:
{:same, 0}
— sequences are identical{:ahead, :right, n}
—right
strictly extendsleft
byn
items{:ahead, :left, n}
—left
strictly extendsright
byn
items{:diverged, k}
— sequences share a common prefix of lengthk
, then diverge
Examples
iex> WorkflowVersions.classify_with_delta(~w(a b), ~w(a b c d))
{:ahead, :right, 2}
iex> WorkflowVersions.classify_with_delta(~w(a b x), ~w(a b y))
{:diverged, 2}
@spec generate_hash(Lightning.Workflows.Workflow.t() | map()) :: binary()
Generates a deterministic hash for a workflow based on its structure.
Algorithm:
- Create a list
- Add the workflow name to the start of the list
- For each node (trigger, job and edge) in a consistent order
- Get a sorted list of keys
- Filter out ignored keys
- Add each key and value to the list
- Join the list into a string, no separator
- Hash the list with SHA 256
- Truncate the resulting string to 12 characters
Parameters
workflow
— the workflow struct to hash
Returns
- A 12-character lowercase hex string
Examples
iex> WorkflowVersions.generate_hash(workflow)
"a1b2c3d4e5f6"
Returns the ordered history of heads for a workflow.
If workflow.version_history
is present and non-empty, that array is returned.
Otherwise, the function falls back to workflow_versions
ordered by
inserted_at ASC, id ASC
to provide deterministic ordering for equal timestamps.
Examples
iex> WorkflowVersions.history_for(%Workflow{version_history: ["a", "b"]})
["a", "b"]
iex> WorkflowVersions.history_for(wf) # when array is empty/nil
["a", "b", "c"]
@spec latest_hash(Lightning.Workflows.Workflow.t()) :: hash() | nil
Returns the latest head for a workflow (or nil
if none).
Uses workflow.version_history
when populated (taking the last element).
If empty/nil, reads from workflow_versions
with
ORDER BY inserted_at DESC, id DESC LIMIT 1
for deterministic results.
Examples
iex> WorkflowVersions.latest_hash(%Workflow{version_history: ["a", "b"]})
"b"
iex> WorkflowVersions.latest_hash(wf_without_versions)
nil
@spec reconcile_history!(Lightning.Workflows.Workflow.t()) :: Lightning.Workflows.Workflow.t()
Rebuilds and persists workflow.version_history
from provenance rows.
This is useful for maintenance/migrations when the array drifts from the
workflow_versions
table. Ordering is inserted_at ASC, id ASC
.
Returns
%Workflow{}
— updated workflow with a rebuiltversion_history
Examples
iex> wf = WorkflowVersions.reconcile_history!(wf)
%Workflow{version_history: [...]}
@spec record_version(Lightning.Workflows.Workflow.t(), hash(), String.t()) :: {:ok, Lightning.Workflows.Workflow.t()} | {:error, term()}
Records a single workflow head hash
with provenance and keeps
workflows.version_history
in sync.
This operation is idempotent and concurrency-safe:
it inserts into workflow_versions
with ON CONFLICT DO NOTHING
, then
locks the workflow row (FOR UPDATE
) and appends hash
to the array only
if it is not already present.
Parameters
workflow
— the workflow owning the historyhash
— 12-char lowercase hex (e.g.,"deadbeefcafe"
)source
—"app"
or"cli"
(defaults to"app"
)
Returns
{:ok, %Workflow{}}
— workflow (possibly unchanged) with an updatedversion_history
if a newhash
was appended{:error, reason}
— database error
Examples
iex> WorkflowVersions.record_version(wf, "deadbeefcafe", "app")
{:ok, %Workflow{version_history: [..., "deadbeefcafe"]}}
iex> WorkflowVersions.record_version(wf, "NOT_HEX", "app")
{:error, :invalid_input}