Source: Adaptor.js

"use strict";

Object.defineProperty(exports, "__esModule", {
  value: true
});
exports.execute = execute;
exports.sql = sql;
exports.findValue = findValue;
exports.insert = insert;
exports.insertMany = insertMany;
exports.upsert = upsert;
exports.upsertIf = upsertIf;
exports.upsertMany = upsertMany;
exports.describeTable = describeTable;
exports.insertTable = insertTable;
exports.modifyTable = modifyTable;
Object.defineProperty(exports, "fn", {
  enumerable: true,
  get: function get() {
    return _languageCommon.fn;
  }
});
Object.defineProperty(exports, "alterState", {
  enumerable: true,
  get: function get() {
    return _languageCommon.alterState;
  }
});
Object.defineProperty(exports, "arrayToString", {
  enumerable: true,
  get: function get() {
    return _languageCommon.arrayToString;
  }
});
Object.defineProperty(exports, "combine", {
  enumerable: true,
  get: function get() {
    return _languageCommon.combine;
  }
});
Object.defineProperty(exports, "dataPath", {
  enumerable: true,
  get: function get() {
    return _languageCommon.dataPath;
  }
});
Object.defineProperty(exports, "dataValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.dataValue;
  }
});
Object.defineProperty(exports, "each", {
  enumerable: true,
  get: function get() {
    return _languageCommon.each;
  }
});
Object.defineProperty(exports, "field", {
  enumerable: true,
  get: function get() {
    return _languageCommon.field;
  }
});
Object.defineProperty(exports, "http", {
  enumerable: true,
  get: function get() {
    return _languageCommon.http;
  }
});
Object.defineProperty(exports, "fields", {
  enumerable: true,
  get: function get() {
    return _languageCommon.fields;
  }
});
Object.defineProperty(exports, "lastReferenceValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.lastReferenceValue;
  }
});
Object.defineProperty(exports, "merge", {
  enumerable: true,
  get: function get() {
    return _languageCommon.merge;
  }
});
Object.defineProperty(exports, "sourceValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.sourceValue;
  }
});

var _languageCommon = require("@openfn/language-common");

var _pg = _interopRequireDefault(require("pg"));

var _pgFormat = _interopRequireDefault(require("pg-format"));

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; }

function _typeof(obj) { "@babel/helpers - typeof"; if (typeof Symbol === "function" && typeof Symbol.iterator === "symbol") { _typeof = function _typeof(obj) { return typeof obj; }; } else { _typeof = function _typeof(obj) { return obj && typeof Symbol === "function" && obj.constructor === Symbol && obj !== Symbol.prototype ? "symbol" : typeof obj; }; } return _typeof(obj); }

function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) { symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); } keys.push.apply(keys, symbols); } return keys; }

function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; }

function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }

/** @module Adaptor */

/**
 * Execute a sequence of operations.
 * Wraps `language-common/execute`, and prepends initial state for postgresql.
 * @example
 * execute(
 *   create('foo'),
 *   delete('bar')
 * )(state)
 * @constructor
 * @param {Operations} operations - Operations to be performed.
 * @returns {Operation}
 */
function execute() {
  for (var _len = arguments.length, operations = new Array(_len), _key = 0; _key < _len; _key++) {
    operations[_key] = arguments[_key];
  }

  var initialState = {
    references: [],
    data: null,
    queries: []
  };
  return function (state) {
    return _languageCommon.execute.apply(void 0, [createClient, connect].concat(operations, [disconnect, cleanupState]))(_objectSpread(_objectSpread({}, initialState), state))["catch"](function (e) {
      console.error(e);
      console.error('Unhandled error in the operations. Exiting process.');
      process.exit(1);
    });
  };
}

function createClient(state) {
  var _state$configuration = state.configuration,
      host = _state$configuration.host,
      port = _state$configuration.port,
      database = _state$configuration.database,
      password = _state$configuration.password,
      user = _state$configuration.user,
      ssl = _state$configuration.ssl,
      allowSelfSignedCert = _state$configuration.allowSelfSignedCert,
      ca = _state$configuration.ca,
      key = _state$configuration.key,
      cert = _state$configuration.cert; // Allowing or blocking self signed certificate
  // https://node-postgres.com/features/ssl

  var sslOptions = ssl ? {
    rejectUnauthorized: !allowSelfSignedCert,
    cert: cert,
    ca: ca,
    key: key
  } : false; // setup client config

  var config = {
    host: host,
    port: port,
    database: database,
    user: user,
    password: password,
    ssl: sslOptions
  }; // instantiate a new client

  var client = new _pg["default"].Client(config);
  return _objectSpread(_objectSpread({}, state), {}, {
    client: client
  });
}

function connect(state) {
  var client = state.client;
  client.connect();
  return _objectSpread(_objectSpread({}, state), {}, {
    client: client
  });
}

function disconnect(state) {
  var client = state.client;
  client.end();
  return state;
}

function cleanupState(state) {
  delete state.client;
  return state;
}

function escapeRegExp(string) {
  return string.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); // $& means the whole matched string
}

function handleValues(sqlString, nullString) {
  var sql = sqlString;

  if (nullString == false) {
    return sqlString;
  } else if (Array.isArray(nullString)) {
    nullString.forEach(function (ns) {
      var re = new RegExp(escapeRegExp(ns), 'g');
      sql = sql.replace(re, 'NULL');
    });
    return sql;
  } else if (_typeof(nullString) === 'object') {
    throw 'setNull must be a string or an array of strings.';
  }

  var re = new RegExp(escapeRegExp(nullString), 'g');
  return sqlString.replace(re, 'NULL');
}

function handleOptions(options) {
  if (options && options.setNull === false) {
    return false;
  }

  return options && options.setNull || "'undefined'";
}

function queryHandler(state, query, options) {
  var client = state.client;
  return new Promise(function (resolve, reject) {
    if (options) {
      if (options.writeSql) {
        console.log('Adding prepared SQL to state.queries array.');
        state.queries.push(query);
      }

      if (options.execute === false) {
        console.log('Not executing query; options.execute === false');
        resolve('Query not executed.');
        return state;
      }
    }

    client.query(query, function (err, result) {
      if (err) {
        reject(err);
        client.end();
      } else {
        console.log("".concat(result.command, " succeeded, rowCount: ").concat(result.rowCount));
        resolve(result);
      }
    });
  }).then(function (data) {
    return _objectSpread(_objectSpread({}, state), {}, {
      response: {
        body: data
      }
    });
  });
}
/**
 * Execute an SQL statement
 * @public
 * @example
 * sql(state => `select(*) from ${state.data.tableName};`, { writeSql: true })
 * @constructor
 * @param {function} sqlQuery - a function which takes state and returns a
 * string of SQL.
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function sql(sqlQuery, options) {
  return function (state) {
    var client = state.client;

    try {
      var body = sqlQuery(state);
      console.log('Preparing to execute sql statement');
      return queryHandler(state, body, options);
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Fetch a uuid key given a condition
 * @public
 * @example
 * findValue({
 *    uuid: 'id',
 *    relation: 'users',
 *    where: { first_name: 'Mamadou' },
 *    operator: { first_name: 'like' }
 *  })
 * @constructor
 * @param {object} filter - A filter object with the lookup table, a uuid and the condition
 * @returns {Operation}
 */


function findValue(filter) {
  return function (state) {
    var client = state.client;
    var uuid = filter.uuid,
        relation = filter.relation,
        where = filter.where,
        operator = filter.operator;
    var whereData = (0, _languageCommon.expandReferences)(where)(state);
    var operatorData = (0, _languageCommon.expandReferences)(operator)(state);
    var conditionsArray = [];

    for (var key in where) {
      conditionsArray.push("".concat(key, " ").concat(operatorData ? operatorData[key] : '=', " '").concat(whereData[key], "'"));
    }

    var condition = conditionsArray.length > 0 ? "where ".concat(conditionsArray.join(' and ')) : ''; // In a near future the 'and' can live in the filter.

    try {
      var body = "select ".concat(uuid, " from ").concat(relation, " ").concat(condition);
      console.log('Preparing to execute sql statement');
      var returnValue = null;
      return new Promise(function (resolve, reject) {
        client.query(body, function (err, result) {
          if (err) {
            console.log(err);
            reject(err);
            client.end();
          } else {
            if (result.rows.length > 0) {
              returnValue = result.rows[0][uuid];
            }

            resolve(returnValue);
          }
        });
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Insert a record
 * @public
 * @example
 * insert('users', { name: 'Elodie', id: 7 }, { setNull: "'NaN'", logValues: true });
 * @constructor
 * @param {string} table - The target table
 * @param {object} record - Payload data for the record as a JS object or function
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function insert(table, record, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(record)(state);
      var columns = Object.keys(data).sort();
      var columnsList = columns.join(', ');
      var values = columns.map(function (key) {
        return data[key];
      });
      var query = handleValues((0, _pgFormat["default"])("INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES (%L);"), values), handleOptions(options));
      var safeQuery = "INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES [--REDACTED--]];");
      var queryToLog = options && options.logValues ? query : safeQuery;
      console.log('Preparing to insert via:', queryToLog);
      return queryHandler(state, query, options);
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Insert many records, using the keys of the first as the column template
 * @public
 * @example
 * insertMany('users', state => state.data.recordArray, { setNull: "'undefined'", logValues: true });
 * @constructor
 * @param {string} table - The target table
 * @param {array} records - An array or a function that takes state and returns an array
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function insertMany(table, records, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(records)(state);
      return new Promise(function (resolve, reject) {
        if (!data || data.length === 0) {
          console.log('No records provided; skipping insert.');
          resolve(state);
        } // Note: we select the keys of the FIRST object as the canonical template.


        var columns = Object.keys(data[0]);
        var columnsList = columns.join(', ');
        var valueSets = data.map(function (x) {
          return Object.values(x);
        });
        var query = handleValues((0, _pgFormat["default"])("INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES %L;"), valueSets), handleOptions(options));
        var safeQuery = "INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES [--REDACTED--]];");
        var queryToLog = options && options.logValues ? query : safeQuery;
        console.log('Preparing to insertMany via:', queryToLog);
        resolve(queryHandler(state, query, options));
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Insert or update a record using ON CONFLICT UPDATE
 * @public
 * @example
 * upsert(
 *   'users', // the DB table
 *   'ON CONSTRAINT users_pkey', // a DB column with a unique constraint OR a CONSTRAINT NAME
 *   { name: 'Elodie', id: 7 },
 *   { setNull: ["''", "'undefined'"], writeSql:true, execute: true, logValues: true }
 * )
 * @constructor
 * @param {string} table - The target table
 * @param {string} uuid - The uuid column to determine a matching/existing record
 * @param {object} record - Payload data for the record as a JS object or function
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function upsert(table, uuid, record, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(record)(state);
      var columns = Object.keys(data).sort();
      var columnsList = columns.join(', ');
      var values = columns.map(function (key) {
        return data[key];
      });
      var conflict = uuid.split(' ').length > 1 ? uuid : "(".concat(uuid, ")");
      var updateValues = columns.map(function (key) {
        return "".concat(key, "=excluded.").concat(key);
      }).join(', ');
      var insertValues = (0, _pgFormat["default"])("INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES (%L)"), values);
      var query = handleValues("".concat(insertValues, "\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";"), handleOptions(options));
      var safeQuery = "INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES [--REDACTED--]\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";");
      var queryToLog = options && options.logValues ? query : safeQuery;
      console.log('Preparing to upsert via:', queryToLog);
      return queryHandler(state, query, options);
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Insert or update a record based on a logical condition using ON CONFLICT UPDATE
 * @public
 * @example
 * upsertIf(
 *   dataValue('name'),
 *   'users', // the DB table
 *   'ON CONSTRAINT users_pkey', // a DB column with a unique constraint OR a CONSTRAINT NAME
 *   { name: 'Elodie', id: 7 },
 *   { writeSql:true, execute: true }
 * )
 * @constructor
 * @param {string} logical - a data to check existing value for.
 * @param {string} table - The target table
 * @param {string} uuid - The uuid column to determine a matching/existing record
 * @param {object} record - Payload data for the record as a JS object or function
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function upsertIf(logical, table, uuid, record, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(record)(state);
      var logicalData = (0, _languageCommon.expandReferences)(logical)(state);
      return new Promise(function (resolve, reject) {
        if (!logicalData) {
          console.log("Skipping upsert for ".concat(uuid, "."));
          resolve(state);
          return state;
        }

        var columns = Object.keys(data).sort();
        var columnsList = columns.join(', ');
        var values = columns.map(function (key) {
          return data[key];
        });
        var conflict = uuid.split(' ').length > 1 ? uuid : "(".concat(uuid, ")");
        var updateValues = columns.map(function (key) {
          return "".concat(key, "=excluded.").concat(key);
        }).join(', ');
        var insertValues = (0, _pgFormat["default"])("INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES (%L)"), values);
        var query = handleValues("".concat(insertValues, "\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";"), handleOptions(options));
        var safeQuery = "INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES [--REDACTED--]\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";");
        var queryToLog = options && options.logValues ? query : safeQuery;
        console.log('Preparing to upsert via:', queryToLog);
        resolve(queryHandler(state, query, options));
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Insert or update multiple records using ON CONFLICT UPDATE and excluded
 * @public
 * @example
 * upsertMany(
 *   'users', // the DB table
 *   'email', // a DB column with a unique constraint OR a CONSTRAINT NAME
 *   [
 *     { name: 'one', email: 'one@openfn.org },
 *     { name: 'two', email: 'two@openfn.org },
 *   ]
 *  { logValues: true }
 * )
 * @constructor
 * @param {string} table - The target table
 * @param {string} uuid - The uuid column to determine a matching/existing record
 * @param {array} data - An array of objects or a function that returns an array
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function upsertMany(table, uuid, data, options) {
  return function (state) {
    var client = state.client;

    try {
      var records = (0, _languageCommon.expandReferences)(data)(state);
      return new Promise(function (resolve, reject) {
        if (!records || records.length === 0) {
          console.log('No records provided; skipping upsert.');
          resolve(state);
        }

        var columns = Object.keys(records[0]);
        var columnsList = columns.join(', ');
        var values = records.map(function (x) {
          return Object.values(x);
        });
        var conflict = uuid.split(' ').length > 1 ? uuid : "(".concat(uuid, ")");
        var updateValues = columns.map(function (key) {
          return "".concat(key, "=excluded.").concat(key);
        }).join(', ');
        var insertValues = (0, _pgFormat["default"])("INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES %L"), values);
        var query = handleValues("".concat(insertValues, "\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";"), handleOptions(options));
        var safeQuery = "INSERT INTO ".concat(table, " (").concat(columnsList, ") VALUES [--REDACTED--]\n        ON CONFLICT ").concat(conflict, "\n        DO UPDATE SET ").concat(updateValues, ";");
        var queryToLog = options && options.logValues ? query : safeQuery;
        console.log('Preparing to upsert via:', queryToLog);
        resolve(queryHandler(state, query, options));
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * List the columns of a table in a database.
 * @public
 * @example
 * describeTable('clinic_visits')
 * @constructor
 * @param {string} tableName - The name of the table to describe
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function describeTable(tableName, options) {
  return function (state) {
    var client = state.client;
    var name = (0, _languageCommon.expandReferences)(tableName)(state);

    try {
      var query = "SELECT column_name, udt_name, is_nullable\n        FROM information_schema.columns\n        WHERE table_name='".concat(name, "';");
      console.log('Preparing to describe table via:', query);
      return queryHandler(state, query, options);
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Create a table in database when given an array of columns and a table_name.
 * @public
 * @example
 * insertTable('table_name', state => state.data.map(
 *   column => ({
 *     name: column.name,
 *     type: column.type,
 *     required: true, // optional
 *     unique: false, // optional - to be set to true for unique constraint
 *   })
 * ));
 * @constructor
 * @param {string} tableName - The name of the table to create
 * @param {array} columns - An array of form columns
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function insertTable(tableName, columns, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(columns)(state);
      return new Promise(function (resolve, reject) {
        if (!data || data.length === 0) {
          console.log('No columns provided; skipping table creation.');
          resolve(state);
        }

        var structureData = data.map(function (x) {
          return "".concat(x.name, " ").concat(x.type, " ").concat(x.hasOwnProperty('default') ? x.type.includes('varchar') || x.type.includes('text') || x.type.includes('BIT') ? "DEFAULT '".concat(x["default"], "'") : "DEFAULT ".concat(x["default"]) : '', " ").concat(x.unique ? 'UNIQUE' : '', " ").concat(x.identity ? 'GENERATED BY DEFAULT AS IDENTITY' : '', " ").concat(x.required ? 'NOT NULL' : '');
        }).join(', ');
        var query = "CREATE TABLE ".concat(tableName, " (\n        ").concat(structureData, "\n      );");
        console.log('Preparing to create table via:', query);
        resolve(queryHandler(state, query, options));
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}
/**
 * Alter an existing table in the database.
 * @public
 * @example
 * modifyTable('table_name', state => state.data.map(
 *   newColumn => ({
 *     name: newColumn.name,
 *     type: newColumn.type,
 *     required: true, // optional
 *     unique: false, // optional - to be set to true for unique constraint
 *   })
 * ));
 * @constructor
 * @param {string} tableName - The name of the table to alter
 * @param {array} columns - An array of form columns
 * @param {object} options - Optional options argument
 * @returns {Operation}
 */


function modifyTable(tableName, columns, options) {
  return function (state) {
    var client = state.client;

    try {
      var data = (0, _languageCommon.expandReferences)(columns)(state);
      return new Promise(function (resolve, reject) {
        if (!data || data.length === 0) {
          console.log('No columns provided; skipping table modification.');
          resolve(state);
        }

        var structureData = data.map(function (x) {
          return "ADD COLUMN ".concat(x.name, " ").concat(x.type, " ").concat(x.hasOwnProperty('default') ? x.type.includes('varchar') || x.type.includes('text') || x.type.includes('BIT') ? "DEFAULT '".concat(x["default"], "'") : "DEFAULT ".concat(x["default"]) : '', " ").concat(x.identity ? 'GENERATED BY DEFAULT AS IDENTITY' : '', " ").concat(x.required ? 'NOT NULL' : '');
        }).join(', ');
        var query = "ALTER TABLE ".concat(tableName, " ").concat(structureData, ";");
        console.log('Preparing to modify table via:', query);
        resolve(queryHandler(state, query, options));
      });
    } catch (e) {
      client.end();
      throw e;
    }
  };
}