Source: Adaptor.js

"use strict";

Object.defineProperty(exports, "__esModule", {
  value: true
});
exports.execute = execute;
exports.insert = insert;
exports.upsert = upsert;
exports.query = query;
exports.sqlString = sqlString;
Object.defineProperty(exports, "field", {
  enumerable: true,
  get: function get() {
    return _languageCommon.field;
  }
});
Object.defineProperty(exports, "fields", {
  enumerable: true,
  get: function get() {
    return _languageCommon.fields;
  }
});
Object.defineProperty(exports, "sourceValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.sourceValue;
  }
});
Object.defineProperty(exports, "alterState", {
  enumerable: true,
  get: function get() {
    return _languageCommon.alterState;
  }
});
Object.defineProperty(exports, "arrayToString", {
  enumerable: true,
  get: function get() {
    return _languageCommon.arrayToString;
  }
});
Object.defineProperty(exports, "each", {
  enumerable: true,
  get: function get() {
    return _languageCommon.each;
  }
});
Object.defineProperty(exports, "combine", {
  enumerable: true,
  get: function get() {
    return _languageCommon.combine;
  }
});
Object.defineProperty(exports, "merge", {
  enumerable: true,
  get: function get() {
    return _languageCommon.merge;
  }
});
Object.defineProperty(exports, "dataPath", {
  enumerable: true,
  get: function get() {
    return _languageCommon.dataPath;
  }
});
Object.defineProperty(exports, "dataValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.dataValue;
  }
});
Object.defineProperty(exports, "lastReferenceValue", {
  enumerable: true,
  get: function get() {
    return _languageCommon.lastReferenceValue;
  }
});
Object.defineProperty(exports, "http", {
  enumerable: true,
  get: function get() {
    return _languageCommon.http;
  }
});

var _languageCommon = require("@openfn/language-common");

var _url = require("url");

var _mysql = _interopRequireDefault(require("mysql"));

var _squel = _interopRequireDefault(require("squel"));

function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { "default": obj }; }

function ownKeys(object, enumerableOnly) { var keys = Object.keys(object); if (Object.getOwnPropertySymbols) { var symbols = Object.getOwnPropertySymbols(object); if (enumerableOnly) symbols = symbols.filter(function (sym) { return Object.getOwnPropertyDescriptor(object, sym).enumerable; }); keys.push.apply(keys, symbols); } return keys; }

function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; if (i % 2) { ownKeys(Object(source), true).forEach(function (key) { _defineProperty(target, key, source[key]); }); } else if (Object.getOwnPropertyDescriptors) { Object.defineProperties(target, Object.getOwnPropertyDescriptors(source)); } else { ownKeys(Object(source)).forEach(function (key) { Object.defineProperty(target, key, Object.getOwnPropertyDescriptor(source, key)); }); } } return target; }

function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; }

/** @module Adaptor */

/**
 * Execute a sequence of operations.
 * Wraps `language-common/execute`, and prepends initial state for mysql.
 * @example
 * execute(
 *   create('foo'),
 *   delete('bar')
 * )(state)
 * @constructor
 * @param {Operations} operations - Operations to be performed.
 * @returns {Operation}
 */
function execute() {
  for (var _len = arguments.length, operations = new Array(_len), _key = 0; _key < _len; _key++) {
    operations[_key] = arguments[_key];
  }

  var initialState = {
    references: [],
    data: null
  };
  return function (state) {
    return _languageCommon.execute.apply(void 0, [connect].concat(operations, [disconnect, cleanupState]))(_objectSpread(_objectSpread({}, initialState), state));
  };
}

function connect(state) {
  var _state$configuration = state.configuration,
      host = _state$configuration.host,
      port = _state$configuration.port,
      database = _state$configuration.database,
      password = _state$configuration.password,
      user = _state$configuration.user;

  var connection = _mysql["default"].createConnection({
    host: host,
    user: user,
    password: password,
    database: database,
    port: port
  });

  connection.connect();
  console.log("Preparing to query \"" + database + "\"...");
  return _objectSpread(_objectSpread({}, state), {}, {
    connection: connection
  });
}

function disconnect(state) {
  state.connection.end();
  return state;
}

function cleanupState(state) {
  delete state.connection;
  return state;
}
/**
 * Insert a record
 * @example
 * execute(
 *   insert('table', fields(
 *      field('name', dataValue('name'))
 *   ))
 * )(state)
 * @constructor
 * @param {string} table - The target table
 * @param {object} fields - A fields object
 * @returns {Operation}
 */


function insert(table, fields) {
  return function (state) {
    var connection = state.connection;
    var valuesObj = (0, _languageCommon.expandReferences)(fields)(state);

    var squelMysql = _squel["default"].useFlavour('mysql');

    var sqlParams = squelMysql.insert({
      autoQuoteFieldNames: true
    }).into(table).setFields(valuesObj).toParam();
    var sql = sqlParams.text;
    var inserts = sqlParams.values;
    exports.sqlString = sqlString = _mysql["default"].format(sql, inserts);
    console.log("Executing MySQL query: ".concat(sqlString));
    return new Promise(function (resolve, reject) {
      // execute a query on our database
      // TODO: figure out how to escape the string.
      connection.query(sqlString, function (err, results, fields) {
        if (err) {
          reject(err); // Disconnect if there's an error.

          console.log('There is an error. Disconnecting from database.');
          connection.end();
        } else {
          console.log('Success...');
          console.log(results);
          console.log(fields);
          resolve(results);
        }
      });
    }).then(function (data) {
      var nextState = _objectSpread(_objectSpread({}, state), {}, {
        response: {
          body: data
        }
      });

      return nextState;
    });
  };
}
/**
 * Insert or Update a record if matched
 * @example
 * execute(
 *   upsert('table', fields(
 *      field('name', dataValue('name'))
 *   ))
 * )(state)
 * @constructor
 * @param {string} table - The target table
 * @param {object} fields - A fields object
 * @returns {Operation}
 */


function upsert(table, fields) {
  return function (state) {
    var connection = state.connection;
    var valuesObj = (0, _languageCommon.expandReferences)(fields)(state);

    var squelMysql = _squel["default"].useFlavour('mysql');

    var insertParams = squelMysql.insert({
      autoQuoteFieldNames: true
    }).into(table).setFields(valuesObj).toParam();
    var sql = insertParams.text;
    var inserts = insertParams.values;

    var insertString = _mysql["default"].format(sql, inserts);

    var updateParams = squelMysql.update({
      autoQuoteFieldNames: true
    }).table('').setFields(valuesObj).toParam();
    var sql = updateParams.text;
    var inserts = updateParams.values;

    var updateString = _mysql["default"].format(sql, inserts);

    var upsertString = insertString + " ON DUPLICATE KEY UPDATE " + updateString.slice(10);
    console.log('Executing MySQL query: ' + upsertString);
    return new Promise(function (resolve, reject) {
      // execute a query on our database
      // TODO: figure out how to escape the string.
      connection.query(upsertString, function (err, results, fields) {
        if (err) {
          reject(err); // Disconnect if there's an error.

          console.log("That's an error. Disconnecting from database.");
          connection.end();
        } else {
          console.log('Success...');
          console.log(results);
          console.log(fields);
          resolve(results);
        }
      });
    }).then(function (data) {
      var nextState = _objectSpread(_objectSpread({}, state), {}, {
        response: {
          body: data
        }
      });

      return nextState;
    });
  };
}
/**
 * Execute a SQL statement
 * @example
 * execute(
 *   query({ sql: 'select * from users;' })
 * )(state)
 * @constructor
 * @param {object} options - Payload data for the message
 * @returns {Operation}
 */


function query(options) {
  return function (state) {
    var connection = state.connection;
    var opts = (0, _languageCommon.expandReferences)(options)(state);
    console.log('Executing MySQL statement with options: ' + JSON.stringify(opts, 2, null));
    return new Promise(function (resolve, reject) {
      // execute a query on our database
      connection.query(opts, function (err, results, fields) {
        if (err) {
          reject(err); // Disconnect if there's an error.

          console.log("That's an error. Disconnecting from database.");
          connection.end();
        } else {
          console.log('Success...');
          resolve(JSON.parse(JSON.stringify(results)));
        }
      });
    }).then(function (data) {
      console.log(data);

      var nextState = _objectSpread(_objectSpread({}, state), {}, {
        response: {
          body: data
        }
      });

      return nextState;
    });
  };
}
/**
 * Execute a SQL statement
 * @example
 * execute(
 *   sqlString(state => "select * from items;")
 * )(state)
 * @constructor
 * @param {String} queryString - A query string (or function which takes state and returns a string)
 * @returns {Operation}
 */


function sqlString(queryString) {
  return function (state) {
    return query({
      sql: queryString
    })(state);
  };
}