View Source Lightning.KafkaTriggers.MessageRecovery (Lightning v2.10.4)

This module contains functionality to recover Kafka messages that have been persisted to the file system. It should only be used to process files that originate from a trusted source (i.e. Lightning's Kafka pipeline.)

It expects to be pointed at a directory structure that looks as follows:

basepath | <workflowid> | <triggerid><topic><partition><offset>.json | <trigger_id><topic><partition><offset>.json |_ <workflow_id>

This is the structure that the Kafka pipeline will use when writing the messages to the file system. After each message is successfully processed, the file extension will be changed from .json to json.recovered. Recovered files will not be processed again. They are retained for the purposes of double-checking the recovery process and can be deleted once this has been done.

If a file has not had the extension changed, this means that it experienced an error during reprocessing. These files can be reprocessed if you think the error was transient.

Usage:

alias Lightning.KafkaTriggers.MessageRecovery case MessageRecovery.recover_messages(Lightning.Config.kafka_alternate_storage_file_path) do

:ok -> # Success code
{:error, error_count} -> # Failure code

end

Summary

Functions

Link to this function

build_broadway_message(data, metadata)

View Source
Link to this function

extract_trigger_id(file_path)

View Source
Link to this function

recover_messages(base_dir_path)

View Source