Lightning.AiAssistant.MessageProcessor (Lightning v2.14.5-pre1)

View Source

Asynchronous message processor for AI Assistant using Oban.

This module handles the background processing of AI chat messages, ensuring reliable and scalable AI interactions. It processes messages outside of the web request lifecycle, providing better user experience and system resilience.

Summary

Functions

Handles exceptions for ai_assistant Oban jobs.

Handles :stop events for ai_assistant Oban jobs.

Processes an AI assistant message asynchronously.

Defines the job timeout based on Apollo configuration.

Updates a message's status and broadcasts the change.

Functions

handle_ai_assistant_exception(measure, meta)

Handles exceptions for ai_assistant Oban jobs.

Parameters

  • measure — A map containing job execution metrics (duration, memory, reductions).
  • meta — A map containing job metadata (job, error, stacktrace, etc.).

handle_ai_assistant_stop(measure, meta)

Handles :stop events for ai_assistant Oban jobs.

This function is invoked when a job in the ai_assistant queue stops with a non-success state (e.g., :discard, :cancelled, or other custom stop reasons).

Jobs with the :success state are ignored.

Parameters

  • measure — A map containing job execution metrics (duration, memory, reductions).
  • meta — A map containing job metadata (job, state, etc.).

perform(job)

@spec perform(Oban.Job.t()) :: :ok

Processes an AI assistant message asynchronously.

This is the main entry point called by Oban. It handles the complete lifecycle of message processing including status updates and broadcasting.

Arguments

  • job - Oban job containing message_id and session_id in args

Returns

Always returns :ok to prevent Oban retries, even on errors. Errors are handled by updating message status and logging.

timeout(job)

@spec timeout(Oban.Job.t()) :: pos_integer()

Defines the job timeout based on Apollo configuration.

Adds a 10-second buffer to the Apollo timeout to account for network overhead and processing time.

Returns

Timeout in milliseconds

update_message_status(message, status)

Updates a message's status and broadcasts the change.

This function updates the message status in the database, fetches the updated session with all associations, and broadcasts the status change to connected clients via Phoenix PubSub.

Parameters

  • message - The ChatMessage struct to update
  • status - The new status atom (:processing, :success, or :error)

Returns

{:ok, updated_session, updated_message} - Tuple with the updated session and message structs