A Job performs a specific task like fetching data from Salesforce, converting JSON to FHIR standard, or uploading data to a database.
Each job uses exactly ONE adaptor (connector) that provides helper functions (Operations) for communicating with data sources.
A job is a single step in a workflow - a series of steps which perform some high level business task, like synchronising patient data or aggregating form submissions or automating business processes.
Jobs are written in a JavaScript-like Domain Specific Language (DSL). While it looks and feels like regular JavaScript, there are some important differences:
console.log()). Note:
some features are not supported(e.g., eval). See full list here
(docs-link)//TODOget(), post(), each()) are special functions that
manage state and async behavior$ symbol is a special operator for accessing state (not a jQuery-like
library)For example, this looks like regular JavaScript but works differently:
// This looks like regular JavaScript Promise chaining
// but is actually using OpenFn's special Operation chaining
get('/data')
.then(state => {
console.log(state.data);
return state;
})
.catch(error => {
console.log('Failed:', error);
});
// This looks like array iteration but is a special Operation
each('$.data[*]', state => {
// This callback transforms state but doesn't control the iteration
return state;
});
// Error handling with .catch()
get('patients').catch((error, state) => {
state.error = error;
console.log('Error occurred:', error);
return state; // Continue execution
// OR
throw error; // Stop execution
});
// Repeated operations using each()
each(
$.items,
post(`patient/${$.data.id}`, $.data).then(state => {
state.completed ??= [];
state.completed.push(state.data);
return state;
})
);
Operations ONLY work at the top level of job code. Never nest operations inside callbacks.
✅ CORRECT:
get('/patients');
each('$.data.patients[*]', state => {
item.id = `item-${index}`;
return state;
});
post('/patients', dataValue('patients'));
❌ WRONG:
get('/patients', {}, state => {
// This will fail - nested operation!
each('$.data.patients[*]', (item, index) => {
item.id = `item-${index}`;
});
});
Callbacks must ALWAYS return the state object.
✅ CORRECT:
fn(state => {
state.transformed = state.data.map(item => ({ ...item }));
return state; // Critical!
});
❌ WRONG:
fn(state => {
state.transformed = state.data.map(item => ({ ...item }));
// Missing return!
});
Use the Lazy State Operator $ or arrow functions to read state values at the
correct time.
✅ CORRECT:
get('/some-data');
post('/upload', $.data); // Using $ operator
// OR
post('/upload', state => state.data); // Using arrow function
❌ WRONG:
get('/some-data');
post('/upload', state.data); // Will be undefined!
The $ operator is syntactic sugar for (state) => state. It ensures values
are resolved at runtime, not load-time.
// Basic usage
upsert('patient', $.data.patients[0]);
// Inside objects
create('agent', {
name: $.patient.name,
country: $.patient.country,
});
// String templates
get(`/patients/${$.patient.id}`);
// Expressions
create({
profit: $.report.revenue - $.report.expenses,
});
// With mapping
each($.data.patients, post(`patients/${$.data.id}`, $.data));
$❌ These are ERRORS:
const url = $.data.url; // Wrong
$.data.x = 10; // Wrong
fn(state => {
$.data.x = 10; // Wrong
});
fn(state => {
state.results = [];
state.lookup = {};
state.keyMap = { AccountName: 'C__Acc_Name' };
state.maxPageSize = 200;
state.convertToSF = item => {
/* transform logic */
};
return state;
});
// Rest of job code...
// Fetch data
get('https://system-a.com/api/patients/123');
// Transform inline
post('https://system-b.com/api/records/123', state => ({
id: state.data.id,
name: `${state.data.first_name} ${state.data.last_name}`,
metadata: state.data.user_data,
}));
// Transform each item
each(
'$.data.patients[*]',
upsert('Person__c', 'Participant_PID__c', state => ({
Participant_PID__c: state.data.pid,
First_Name__c: state.data.participant_first_name,
Surname__c: state.data.participant_surname,
}))
);
// Set cursor
cursor('2024-04-08T12:00:00.0000');
// OR
cursor(state => state.cursor, { defaultValue: 'today' });
// Use cursor in queries
get(state => `/registrations?since=${state.cursor}`);
// Update cursor
cursor('now');
OpenFn Operations provide .then() and .catch() methods to handle successful
results and errors in your job execution. These special Operation methods works
at the top level only.
Use .then() to handle successful Operation results:
get('/api/data')
.then(state => {
// Transform or process the response
state.processedData = processData(state.data);
return state;
})
.then(state => {
// Chain multiple transformations
console.log('Processed:', state.processedData);
return state;
});
Use .catch() to handle errors:
get('/api/data')
.then(state => {
// Transform or process the response
state.processedData = processData(state.data);
return state;
})
.catch(error => {
// Handle errors
console.log('Error:', error);
return state; // Continue execution
// OR
throw error; // Stop execution
});
// Return only needed keys
fn(state => {
return {
data: state.data,
};
});
// Or remove sensitive data
fn(state => {
const { username, password, secrets, ...rest } = state;
return rest;
});
An open-source module providing a set of functions that help you perform actions in a particular system or technology.
Each job uses an adaptor to perform actions in a particular system or technology.
For example, the HTTP adaptor provides functions for making HTTP requests:
get('/endpoint');
post('/endpoint', $.data);
You can find a list of available adaptors here: https://docs.openfn.org/adaptors. Each adaptor has a set of functions with examples. Also you can use CLI to see documentation for an adaptor:
// Show all http adaptor functions
openfn docs http
For more details on a specfic functions, use:
// Show documentation for a get() function
openfn docs http get
.catch() for specific error handling$) for cleaner codeconsole.log() liberally during developmentopenfn compile if needed$.configurationconfiguration and functions from logs// Initialize
fn(state => {
state.errors = [];
state.successful = [];
return state;
});
// Set cursor for incremental sync
cursor(state => state.cursor, { defaultValue: 'yesterday' });
// Fetch new records
get(state => `/patients?modified_since=${state.cursor}`);
// Transform and upload each patient
each(
'$.data.patients[*]',
create('Patient__c', state => ({
External_ID__c: state.data.id,
FirstName: state.data.first_name,
LastName: state.data.last_name,
Email: state.data.email,
Phone: state.data.phone,
}))
.then(state => {
state.successful.push(state.data.id);
return state;
})
.catch((error, state) => {
state.errors.push({
id: state.data.id,
error: error.message,
});
return state; // Continue processing other items
})
);
// Update cursor
cursor('now');
// Clean final state
fn(state => {
return {
successful: state.successful,
errors: state.errors,
total: state.successful.length + state.errors.length,
};
});