import { chunk } from 'lodash';
import { executeQueryLambda } from './rockset';
import moment from 'moment';
import WDC from '../../api/wdc';

const tableau = window.tableau;

const log = msg => {
  tableau.log(msg);
  tableau.reportProgress(msg);
};

function apiLog(entityRef, dataSet, Params, Message = '') {
  if (Message !== '') {
    log(Message);
  }

  return WDC(entityRef, {
    Action: 'Log',
    DataSet: dataSet,
    Params,
    Message,
  })
    .then(console.log)
    .catch(console.error);
}

async function extractItems(Fields, entityRef, dataSet, privateKey, doneCallback, table) {
  const Limit = Fields.find(p => p.name === 'Limit');
  const Offset = Fields.find(p => p.name === 'Offset');

  let rowsReceived = 0,
    loops = 0,
    done = false;

  do {
    Offset.value = rowsReceived;

    await apiLog(entityRef, dataSet, Fields, 'Execute Query Lambda');

    const startTm = Date.now();

    const Items = await executeQueryLambda(dataSet, Fields, privateKey)
      .then(({ Items = [] }) => {
        return Items;
      })
      .catch(e => async () => {
        await apiLog(
          entityRef,
          dataSet,
          Fields,
          `QL::ERROR:: ${e.message || 'Error executing query lambda'}`
        );
        doneCallback();
        return [];
      });

    rowsReceived += Items.length;

    const Duration = Date.now() - startTm;

    await apiLog(
      entityRef,
      dataSet,
      Fields,
      `[${loops + 1}] Received ${
        Items.length
      } Items :: Total ${rowsReceived} :: Duration ${Duration}ms`
    );

    if (Items.length < parseInt(Limit.value)) {
      done = true;
    }

    if (Items.length > 0) {
      const chunks = chunk(Items, 100000);

      chunks.forEach((c, index) => {
        table.appendRows(c);
      });
    }
  } while (!done);

  return rowsReceived;
}

const fetchIncremental = async (table, settings, doneCallback, privateKey) => {
  const { entityRef, dataSet, params = [] } = settings;

  const StartDt = params.find(p => p.name === 'StartDt').value.substr(0, 10);
  const EndDt = params.find(p => p.name === 'EndDt').value.substr(0, 10);

  await apiLog(entityRef, dataSet, { StartDt, EndDt }, 'Search Range');

  let SearchDt = StartDt;

  let totalItems = 0;

  do {
    await apiLog(entityRef, dataSet, { SearchDt }, `Search Date ${SearchDt}`);

    const Fields = [
      {
        name: 'EID',
        type: 'string',
        value: entityRef,
      },
      {
        name: 'Offset',
        type: 'int',
        value: 0,
      },
      {
        name: 'Limit',
        type: 'int',
        value: 100000,
      },
      {
        name: 'StartDt',
        type: 'string',
        value: `${SearchDt}T00:00:00Z`,
      },
      {
        name: 'EndDt',
        type: 'string',
        value: `${SearchDt}T23:59:59Z`,
      },
    ];

    let dailyItems = await extractItems(
      Fields,
      entityRef,
      dataSet,
      privateKey,
      doneCallback,
      table
    );

    totalItems += dailyItems;

    SearchDt = moment(SearchDt).add(1, 'day').format('YYYY-MM-DD');
  } while (SearchDt <= EndDt);

  await apiLog(entityRef, dataSet, params, `Extract Completed. ${totalItems} Items Extracted.`);

  doneCallback();
};

export default fetchIncremental;
