Lightning.AiAssistant.MessageProcessor (Lightning v2.14.5-pre1)
View SourceAsynchronous message processor for AI Assistant using Oban.
This module handles the background processing of AI chat messages, ensuring reliable and scalable AI interactions. It processes messages outside of the web request lifecycle, providing better user experience and system resilience.
Summary
Functions
Handles exceptions for ai_assistant
Oban jobs.
Handles :stop
events for ai_assistant
Oban jobs.
Processes an AI assistant message asynchronously.
Defines the job timeout based on Apollo configuration.
Updates a message's status and broadcasts the change.
Functions
Handles exceptions for ai_assistant
Oban jobs.
Parameters
measure
— A map containing job execution metrics (duration
,memory
,reductions
).meta
— A map containing job metadata (job
,error
,stacktrace
, etc.).
Handles :stop
events for ai_assistant
Oban jobs.
This function is invoked when a job in the ai_assistant
queue stops with a non-success state
(e.g., :discard
, :cancelled
, or other custom stop reasons).
Jobs with the :success
state are ignored.
Parameters
measure
— A map containing job execution metrics (duration
,memory
,reductions
).meta
— A map containing job metadata (job
,state
, etc.).
@spec perform(Oban.Job.t()) :: :ok
Processes an AI assistant message asynchronously.
This is the main entry point called by Oban. It handles the complete lifecycle of message processing including status updates and broadcasting.
Arguments
job
- Oban job containingmessage_id
andsession_id
in args
Returns
Always returns :ok
to prevent Oban retries, even on errors.
Errors are handled by updating message status and logging.
@spec timeout(Oban.Job.t()) :: pos_integer()
Defines the job timeout based on Apollo configuration.
Adds a 10-second buffer to the Apollo timeout to account for network overhead and processing time.
Returns
Timeout in milliseconds
@spec update_message_status( Lightning.AiAssistant.ChatMessage.t(), atom() ) :: {:ok, Lightning.AiAssistant.ChatSession.t(), Lightning.AiAssistant.ChatMessage.t()}
Updates a message's status and broadcasts the change.
This function updates the message status in the database, fetches the updated session with all associations, and broadcasts the status change to connected clients via Phoenix PubSub.
Parameters
message
- TheChatMessage
struct to updatestatus
- The new status atom (:processing
,:success
, or:error
)
Returns
{:ok, updated_session, updated_message}
- Tuple with the updated session
and message structs