import Observable from 'zen-observable';

import {GlobalCGEventTracker} from '../analytics/tracker';
import * as GraphTypes from '../model/graph/types';
import * as Model from '../model/types';
import {OpStore} from '../opStore/types';
import {Server} from '../server/types';
import {ID} from '../util/id';
import {Client} from './types';

interface ObservableNode<T extends Model.Type = Model.Type> {
  id: string;
  observable: Observable<T>;
  node: GraphTypes.Node<T>;
  observers: Set<ZenObservable.SubscriptionObserver<T>>;
  hasResult: boolean;
  lastResult: any;
}

type ResetRequestType = {
  promise: Promise<void>;
  resolve: () => void;
};

const POLL_INTERVAL = 30000;

export class BasicClient implements Client {
  readonly opStore: OpStore;
  private observables = new Map<string, ObservableNode>();
  private resolveRefreshRequest?: ResetRequestType;
  private loadingListeners = new Map<
    string,
    ZenObservable.SubscriptionObserver<boolean>
  >();
  private loading = new Observable<boolean>(observer => {
    const id = ID();
    this.loadingListeners.set(id, observer);

    return () => this.loadingListeners.delete(id);
  });

  public constructor(private readonly server: Server) {
    setTimeout(this.loopIteration.bind(this), 50);
    // Removing this for now because it's may be causing an invariant violation
    // https://weightsandbiases.slack.com/archives/C01T8BLDHKP/p1649955797252749
    // setTimeout(this.pollIteration.bind(this), POLL_INTERVAL);
    this.opStore = server.opStore;
  }

  public subscribe<T extends Model.Type>(
    node: GraphTypes.Node<T>
  ): Observable<any> {
    GlobalCGEventTracker.basicClientSubscriptions++;
    const observableId = ID();
    const observable = new Observable<Model.Type>(observer => {
      const obs = this.observables.get(observableId);
      if (obs == null) {
        return;
      }
      obs.observers.add(observer);
      if (obs.hasResult) {
        observer.next(obs.lastResult);
      } else {
        this.setIsLoading(true);
      }
      return () => {
        obs.observers.delete(observer);
      };
    });
    this.observables.set(observableId, {
      id: observableId,
      observable,
      observers: new Set(),
      node,
      hasResult: false,
      lastResult: undefined,
    });
    return observable;
  }

  // I'm adding this for now to make it easier to switch
  // refineNode to use client. But we should really make
  // refineNode() also subscribable, meaning we want to
  // wire observers through that code path.
  // TODO: really, fix this. It would mean we can poll correctly
  // and efficiently for both useNodeValue() and useNodeWithServerType()
  public query<T extends Model.Type>(node: GraphTypes.Node<T>): Promise<any> {
    const obs = this.subscribe(node);
    return new Promise((resolve, reject) => {
      const sub = obs.subscribe(
        nodeRes => {
          resolve(nodeRes);
          sub.unsubscribe();
        },
        caughtError => {
          reject(caughtError);
          sub.unsubscribe();
        }
      );
    });
  }

  public loadingObservable() {
    return this.loading;
  }

  public refreshAll(): Promise<void> {
    if (this.resolveRefreshRequest == null) {
      let res: ResetRequestType['resolve'] = () => {};
      const prom = new Promise<void>((resolve, reject) => {
        res = resolve;
      });
      this.resolveRefreshRequest = {
        promise: prom,
        resolve: res,
      };
    }
    return this.resolveRefreshRequest.promise;
  }

  private pollIteration() {
    this.refreshAll();
    setTimeout(this.pollIteration.bind(this), POLL_INTERVAL);
  }

  private setIsLoading(loading: boolean) {
    this.loadingListeners.forEach(observer => observer.next(loading));
  }

  private async loopIteration() {
    const notDoneObservables = Array.from(this.observables.values()).filter(
      l => !l.hasResult || this.resolveRefreshRequest != null
    );
    if (notDoneObservables.length > 0) {
      let error: any;
      let results: any[] = [];
      // console.time('graph execute');
      try {
        results = await this.executeForwardListeners(
          notDoneObservables.map(o => o.node),
          this.resolveRefreshRequest != null
        );
      } catch (e) {
        error = e;
        // TODO(np): Do we need to do anything to recover here?
      }
      // console.timeEnd('graph execute');

      // notify listeners
      // console.time('graph notify');
      if (error != null) {
        for (const observable of notDoneObservables) {
          observable.hasResult = true;
          for (const observer of observable.observers) {
            observer.error(error);
          }
        }
      } else {
        for (let i = 0; i < notDoneObservables.length; i++) {
          const observable = notDoneObservables[i];
          observable.hasResult = true;
          observable.lastResult = results[i];
          for (const observer of observable.observers) {
            observer.next(results[i]);
          }
        }
      }
      // console.timeEnd('graph notify');
    }

    if (Array.from(this.observables.values()).every(obs => obs.hasResult)) {
      this.setIsLoading(false);
    }

    if (this.resolveRefreshRequest != null) {
      const res = this.resolveRefreshRequest.resolve;
      this.resolveRefreshRequest = undefined;
      res();
    }
    // TODO: Remove from PR
    setTimeout(this.loopIteration.bind(this), 10);
  }

  private async executeForwardListeners(
    targetNodes: GraphTypes.Node[],
    resetBackendExecutionCache?: boolean
  ) {
    return this.server.query(targetNodes, resetBackendExecutionCache);
  }
}
