import {Action} from 'redux';
import {Channel, channel} from 'redux-saga';
import {
  actionChannel,
  ActionPattern,
  all,
  call,
  delay,
  fork,
  put,
  race,
  select,
  take,
} from 'redux-saga/effects';
import {createAction, getType} from 'typesafe-actions';

import * as PollingActions from '../polling/actions';
import * as PollingHooks from '../polling/hooks';
import * as PollingReducer from '../polling/reducer';
import {ApolloClient} from '../types';
import * as Actions from './actions';
import * as Api from './api';
import * as Lib from './lib';
import * as Reducer from './reducer';
import * as Selectors from './selectors';
import * as ServerQuery from './serverQuery';
import * as Types from './types';

// Fake action to trigger poll requests
const triggerPoll = createAction('@runs/sagaTriggerPoll');
// Fake action to trigger new requests
const triggerNew = createAction('@runs/sagaTriggerNew');
// Fake action to report new queries finished
const resolveNew = createAction(
  '@runs/sagaResolveNew',
  action => (ids: Types.QueryGeneration[]) => action({ids})
);
// Fake action to tell poll worker to start the next poll;
const pollFinished = createAction('@runs/sagaPollFinished');

type LoopAction =
  | ReturnType<typeof triggerPoll>
  | ReturnType<typeof triggerNew>
  | ReturnType<typeof resolveNew>;
type FinishedAction = ReturnType<typeof pollFinished>;

type QueryResult<SR> =
  | {
      result: SR;
      error?: undefined;
    }
  | {
      result?: undefined;
      error: any;
    };

function wrapQueryPromises<SR>(
  promises: Array<Promise<SR>>
): Array<Promise<QueryResult<SR>>> {
  return promises.map(p =>
    p.then(result => ({result})).catch(error => ({error}))
  );
}

interface PartitionedQueries<SQ, SR> {
  okQueries: Array<Types.MergedServerQuery<SQ>>;
  okResults: SR[];
  errQueries: Array<Types.MergedServerQuery<SQ>>;
  errResults: any[];
}

function partitionQueryResults<SQ, SR>(
  serverQueries: Array<Types.MergedServerQuery<SQ>>,
  serverResponse: Array<QueryResult<SR>>
): PartitionedQueries<SQ, SR> {
  const okQueries = serverQueries.filter(
    (q, i) => serverResponse[i].error == null
  );
  const okResults = serverResponse
    .filter(r => r.error == null)
    .map(r => r.result!);
  const errQueries = serverQueries.filter(
    (q, i) => serverResponse[i].error != null
  );
  const errResults = serverResponse
    .filter(r => r.error != null)
    .map(r => r.error);
  return {okQueries, okResults, errQueries, errResults};
}

const pollMultiplier = 5;

// Historical note: we used to use saga's fork() to fork off
// new processes whenever we needed to make new queries in this loop.
// So that, for example, if a new query came in while we were in the
// act of executing some other queries, we would execute the new
// query immediately.
// The new behavior is that any queries block
// the loop. This is much safer in terms of server load. With the
// previous behavior, clicking eyeballs rapidly would cause many
// queries to be made in parallel, even though the user only
// wants results for the most recent query.
// The new behavior ensures that any extraneous queries that are
// requested and then removed while we're already in the act of querying
// don't ever execute.
// It would be even better to cancel live queries that are no longer
// needed! But that is not yet implemented.
// To see the previous behavior, look at this PR:
// https://github.com/wandb/core/pull/6781

function* runRequestLoop<SQ, SR>(
  client: ApolloClient,
  strategy: ServerQuery.StrategyGraphql<SQ, SR>
) {
  const triggerChan: Channel<LoopAction> = yield channel<LoopAction>();
  const finishChan: Channel<FinishedAction> = yield channel<FinishedAction>();

  yield fork(runPollLoop, triggerChan, finishChan);
  yield fork(runNewLoop, triggerChan);

  while (true) {
    const action: LoopAction = yield take(triggerChan);

    const runsState: Reducer.RunsReducerState = yield select(
      Selectors.getRunsState
    );

    switch (action.type) {
      case getType(triggerPoll): {
        const serverQueries = Lib.prepareQueries(strategy, runsState, true);

        // If there are no queries to poll, don't bother making the serverinfo query
        if (serverQueries.length === 0) {
          yield put(finishChan, pollFinished());
          break;
        }

        const queryPromises = wrapQueryPromises(
          serverQueries.map(sq => strategy.doQuery(client, sq.serverQuery))
        );
        const infoPromise = Api.doServerInfoQuery(client).catch(() => ({
          pollingOK: true,
        }));

        const [serverInfo, serverResponse]: [
          Types.ServerInfo,
          Array<QueryResult<SR>>
        ] = yield call(() =>
          Promise.all([infoPromise, Promise.all(queryPromises)])
        );

        if (!serverInfo.pollingOK) {
          yield put(PollingActions.setServerPollingOK(false));
        }

        const {okQueries, okResults, errQueries, errResults} =
          partitionQueryResults(serverQueries, serverResponse);

        if (okResults.length > 0) {
          yield put(Actions.queryResults(okQueries, okResults));
        }
        if (errResults.length > 0) {
          yield put(
            Actions.queryErrors(
              errQueries.map(q => q.sourceIDs),
              errResults
            )
          );
        }

        yield put(finishChan, pollFinished());
        break;
      }
      case getType(triggerNew): {
        const serverQueries = Lib.prepareQueries(
          strategy,
          runsState,
          false,
          []
        );

        const promises = wrapQueryPromises(
          serverQueries.map(sq => strategy.doQuery(client, sq.serverQuery))
        );
        yield all(
          promises.map((p, i) => call(handleQuery, serverQueries[i], p))
        );

        break;
      }
    }
  }
}

function* handleQuery<SQ, SR>(
  query: Types.MergedServerQuery<SQ>,
  promise: Promise<QueryResult<SR>>
) {
  const serverResponse: QueryResult<SR> = yield call(() => promise);

  if (serverResponse.error != null) {
    yield put(Actions.queryErrors([query.sourceIDs], [serverResponse.error]));
  } else {
    yield put(Actions.queryResults([query], [serverResponse.result!]));
  }
}

function* runPollLoop(
  trigger: Channel<LoopAction>,
  finish: Channel<FinishedAction>
) {
  let lastPollQueryTime = new Date().getTime();
  let lastPollQueryDuration = 0;

  while (true) {
    // First, spin until polling is non-zero
    let pollingState: PollingReducer.StateType = yield select(
      state => state.polling
    );
    let pollInterval = PollingHooks.getPollInterval(pollingState);
    if (pollInterval === 0) {
      yield delay(1000);
      continue;
    }
    if (lastPollQueryDuration * pollMultiplier > pollInterval) {
      pollInterval = lastPollQueryDuration * pollMultiplier;
    }

    // Start poll wait after global pollInterval becomes non-zero
    const pollWaitStart = Date.now();

    // Poll wait loop
    while (Date.now() - pollWaitStart < pollInterval) {
      // Dynamically update pollInterval as we wait.
      yield delay(1000);
      pollingState = yield select(state => state.polling);
      pollInterval = PollingHooks.getPollInterval(pollingState);
      if (pollInterval === 0) {
        // break out so we can jump back to the top
        break;
      }
      if (lastPollQueryDuration * pollMultiplier > pollInterval) {
        pollInterval = lastPollQueryDuration * pollMultiplier;
      }
    }
    // pollInterval became 0, jump back to top.
    if (pollInterval === 0) {
      continue;
    }

    // start the poll. once its running, don't worry about poll interval updates.
    // we'll see them on the next go-around.
    const pollQueryStart = new Date().getTime();
    yield put(trigger, triggerPoll());
    yield take(finish);
    lastPollQueryTime = new Date().getTime();
    lastPollQueryDuration = lastPollQueryTime - pollQueryStart;
  }
}

function* runNewLoop(trigger: Channel<LoopAction>) {
  const updateChan: ActionPattern<Action<string>> = yield actionChannel([
    getType(Actions.registerQueryWithID),
    getType(Actions.updateQuery),
  ]);
  while (true) {
    // wait until we get an action that would cause us to make a new query
    yield take(updateChan);

    // delay for 50ms
    const end = new Date().getTime() + 100;
    while (new Date().getTime() < end) {
      const off = end - new Date().getTime();
      // consume all updates that come in during the batching period
      yield race([take(updateChan), delay(off)]);
    }

    yield put(trigger, triggerNew());
  }
}

export default function* allSagas(client: ApolloClient) {
  yield all([runRequestLoop(client, Reducer.STRATEGY)]);
}
