Lightning.WorkflowVersions (Lightning v2.14.5-pre1)

View Source

Provenance + comparison helpers for workflow heads.

  • Persists append-only rows in workflow_versions and maintains a materialized workflows.version_history array (12-char lowercase hex).
  • record_version/3 and record_versions/3 are idempotent (ON CONFLICT DO NOTHING) and concurrency-safe (row lock, append without dupes).
  • history_for/1 and latest_hash/1 read the array first; when empty they fall back to the table with deterministic ordering by (inserted_at, id).
  • reconcile_history!/1 rebuilds the array from provenance rows.
  • classify/2 and classify_with_delta/2 compare two histories (same/ahead/diverged).

Validation & invariants:

  • hash must match ^[a-f0-9]{12}$; source must be "app" or "cli"; (workflow_id, hash) is unique.

Designed for fast diffs and consistent “latest head” lookups.

Summary

Functions

Compares two histories and returns only the relation (no counts).

Compares two histories and returns the relation with a delta.

Generates a deterministic hash for a workflow based on its structure.

Returns the ordered history of heads for a workflow.

Returns the latest head for a workflow (or nil if none).

Rebuilds and persists workflow.version_history from provenance rows.

Records a single workflow head hash with provenance and keeps workflows.version_history in sync.

Types

hash()

@type hash() :: String.t()

Functions

classify(left, right)

@spec classify([hash()], [hash()]) ::
  :same | {:ahead, :left} | {:ahead, :right} | :diverged

Compares two histories and returns only the relation (no counts).

Wrapper around classify_with_delta/2.

Examples

iex> WorkflowVersions.classify(~w(a b), ~w(a b))
:same

iex> WorkflowVersions.classify(~w(a b), ~w(a b c))
{:ahead, :right}

iex> WorkflowVersions.classify(~w(a x), ~w(a y))
:diverged

classify_with_delta(left, right)

@spec classify_with_delta([hash()], [hash()]) ::
  {:same, 0}
  | {:ahead, :left, non_neg_integer()}
  | {:ahead, :right, non_neg_integer()}
  | {:diverged, non_neg_integer()}

Compares two histories and returns the relation with a delta.

Possible results:

  • {:same, 0} — sequences are identical
  • {:ahead, :right, n}right strictly extends left by n items
  • {:ahead, :left, n}left strictly extends right by n items
  • {:diverged, k} — sequences share a common prefix of length k, then diverge

Examples

iex> WorkflowVersions.classify_with_delta(~w(a b), ~w(a b c d))
{:ahead, :right, 2}

iex> WorkflowVersions.classify_with_delta(~w(a b x), ~w(a b y))
{:diverged, 2}

generate_hash(workflow)

@spec generate_hash(Lightning.Workflows.Workflow.t() | map()) :: binary()

Generates a deterministic hash for a workflow based on its structure.

Algorithm:

  • Create a list
  • Add the workflow name to the start of the list
  • For each node (trigger, job and edge) in a consistent order
    • Get a sorted list of keys
    • Filter out ignored keys
    • Add each key and value to the list
  • Join the list into a string, no separator
  • Hash the list with SHA 256
  • Truncate the resulting string to 12 characters

Parameters

  • workflow — the workflow struct to hash

Returns

  • A 12-character lowercase hex string

Examples

iex> WorkflowVersions.generate_hash(workflow)
"a1b2c3d4e5f6"

history_for(workflow)

Returns the ordered history of heads for a workflow.

If workflow.version_history is present and non-empty, that array is returned. Otherwise, the function falls back to workflow_versions ordered by inserted_at ASC, id ASC to provide deterministic ordering for equal timestamps.

Examples

iex> WorkflowVersions.history_for(%Workflow{version_history: ["a", "b"]})
["a", "b"]

iex> WorkflowVersions.history_for(wf) # when array is empty/nil
["a", "b", "c"]

latest_hash(wf)

@spec latest_hash(Lightning.Workflows.Workflow.t()) :: hash() | nil

Returns the latest head for a workflow (or nil if none).

Uses workflow.version_history when populated (taking the last element). If empty/nil, reads from workflow_versions with ORDER BY inserted_at DESC, id DESC LIMIT 1 for deterministic results.

Examples

iex> WorkflowVersions.latest_hash(%Workflow{version_history: ["a", "b"]})
"b"

iex> WorkflowVersions.latest_hash(wf_without_versions)
nil

reconcile_history!(wf)

Rebuilds and persists workflow.version_history from provenance rows.

This is useful for maintenance/migrations when the array drifts from the workflow_versions table. Ordering is inserted_at ASC, id ASC.

Returns

  • %Workflow{} — updated workflow with a rebuilt version_history

Examples

iex> wf = WorkflowVersions.reconcile_history!(wf)
%Workflow{version_history: [...]}

record_version(workflow, hash, source \\ "app")

@spec record_version(Lightning.Workflows.Workflow.t(), hash(), String.t()) ::
  {:ok, Lightning.Workflows.Workflow.t()} | {:error, term()}

Records a single workflow head hash with provenance and keeps workflows.version_history in sync.

This operation is idempotent and concurrency-safe: it inserts into workflow_versions with ON CONFLICT DO NOTHING, then locks the workflow row (FOR UPDATE) and appends hash to the array only if it is not already present.

Parameters

  • workflow — the workflow owning the history
  • hash — 12-char lowercase hex (e.g., "deadbeefcafe")
  • source"app" or "cli" (defaults to "app")

Returns

  • {:ok, %Workflow{}} — workflow (possibly unchanged) with an updated version_history if a new hash was appended
  • {:error, reason} — database error

Examples

iex> WorkflowVersions.record_version(wf, "deadbeefcafe", "app")
{:ok, %Workflow{version_history: [..., "deadbeefcafe"]}}

iex> WorkflowVersions.record_version(wf, "NOT_HEX", "app")
{:error, :invalid_input}