View Source Lightning.KafkaTriggers.MessageRecovery (Lightning v2.10.4)
This module contains functionality to recover Kafka messages that have been persisted to the file system. It should only be used to process files that originate from a trusted source (i.e. Lightning's Kafka pipeline.)
It expects to be pointed at a directory structure that looks as follows:
basepath | <workflowid> | <triggerid><topic><partition><offset>.json | <trigger_id><topic><partition><offset>.json |_ <workflow_id>
This is the structure that the Kafka pipeline will use when writing the
messages to the file system. After each message is successfully processed,
the file extension will be changed from .json
to json.recovered
. Recovered
files will not be processed again. They are retained for the purposes of
double-checking the recovery process and can be deleted once this has been
done.
If a file has not had the extension changed, this means that it experienced an error during reprocessing. These files can be reprocessed if you think the error was transient.
Usage:
alias Lightning.KafkaTriggers.MessageRecovery case MessageRecovery.recover_messages(Lightning.Config.kafka_alternate_storage_file_path) do
:ok -> # Success code
{:error, error_count} -> # Failure code
end