/**
 * @flow
 */

import type {Action, TradingSettings} from "../actions/types";
import * as types from "../actions/types";
import saga from "./lib";
import ByBitWebSocket, * as bybitws from "../lib/ByBitWebSocket";
import ByBitAPI, * as bybitapi from "../lib/ByBitAPI";
import * as bybitutils from "../lib/ByBitUtils";
import actions from "../actions";

type BarSubscriptionMap = Map<string, Map<string, (types.tvutils.Bar) => void>>;
const ORDER_BOOK_TOPIC = "orderBook_200.100ms.BTCUSD";
// const ORDER_BOOK_TOPIC = 'orderBookL2_25.BTCUSD';
const INSTRUMENT_INFO_TOPIC = "instrument_info.100ms.BTCUSD";
const currentTopicSubscriptions: Array<string> = [
  ORDER_BOOK_TOPIC,
  INSTRUMENT_INFO_TOPIC,
  "position",
  "execution",
  "order",
  "stop_order",
  "wallet",
];
let lastBarBySymbol: Map<
  string,
  {
    bar: bybitws.BarUpdate,
    callbacks: Array<(types.tvutils.Bar) => void>,
    // markPricecallback: (types.tvutils.Bar) => void,
    resolution: types.tvutils.Resolution,
  },
> = new Map();
// {
//   start: 0,
//   end: 0,
//   open: 0,
//   close: 0,
//   high: 0,
//   low: 0,
//   volume: 0,
//   turnover: 0,
//   confirm: false,
//   cross_seq: 0,
//   timestamp: 0,
// };

const TVResolutions: {[types.tvutils.Resolution]: number} = {
  "1": 1 * 60 * 1000,
  "3": 3 * 60 * 1000,
  "5": 5 * 60 * 1000,
  "15": 15 * 60 * 1000,
  "30": 30 * 60 * 1000,
  "60": 60 * 60 * 1000,
  "120": 120 * 60 * 1000,
  "240": 240 * 60 * 1000,
  "360": 360 * 60 * 1000,
  "720": 720 * 60 * 1000,
  "1D": 24 * 3600 * 1000,
};
function normalizeBarTimestamp(
  resolution: types.tvutils.Resolution,
  time: number,
): number {
  // Esto hay que cambiarlo por un mecanismo para convertir entre
  // resolucion de TradingView por las de klineV2 de bybit.
  if (["D", "W", "Y"].indexOf(resolution) !== -1) {
    // $FlowFixMe
    resolution = resolution
      .replace("D", "1D")
      .replace("W", "1W")
      .replace("Y", "1Y");
  }
  const rounder = TVResolutions[resolution];
  return Math.floor(time / rounder) * rounder;
}

function* updatePositionStopLossIfNeeded(position: bybitws.Position) {
  const stopLossPercentage = yield* saga.select(
    (store) => store.user.tradingSettings.autoStopLossPercentage,
  );
  if (stopLossPercentage != null && position.size > 0) {
    const stopLoss = bybitutils.calculateStopLoss(
      position,
      stopLossPercentage,
      bybitutils.TAKER_FEE,
    );
    if (parseFloat(position.stop_loss) !== stopLoss) {
      const action = actions.bybit.setTradingStop({
        symbol: position.symbol,
        stop_loss: stopLoss,
        sl_trigger_by: "LastPrice",
      });
      yield saga.put(action);
    }
  }
}

function* handleUpdateTradingSettings(
  api: ByBitAPI,
  update: $Shape<TradingSettings>,
): * {
  if (update.autoStopLossPercentage != null) {
    const position = yield* saga.select((store) =>
      store.bybit.account.positions.get(store.bybit.account.symbol),
    );
    if (position) {
      yield saga.orig.call(updatePositionStopLossIfNeeded, position);
    }
  }
}

function* handleApiAction(api: ByBitAPI, action: Action): * {
  if (action.type === "BYBIT_REPLACE_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: saga.orig.call(api.replaceOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_REPLACE_ORDER_FAILED",
          request: action.request,
          order: action.order,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_REPLACE_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_REPLACE_ORDER_FAILED",
        request: action.request,
        order: action.order,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_REPLACE_STOP_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: saga.orig.call(api.replaceStopOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_REPLACE_STOP_ORDER_FAILED",
          request: action.request,
          order: action.order,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_REPLACE_STOP_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_REPLACE_STOP_ORDER_FAILED",
        request: action.request,
        order: action.order,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_PLACE_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: saga.orig.call(api.placeOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_PLACE_ORDER_FAILED",
          request: action.request,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_PLACE_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_PLACE_ORDER_FAILED",
        request: action.request,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_PLACE_STOP_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: saga.orig.call(api.placeStopOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_PLACE_STOP_ORDER_FAILED",
          request: action.request,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_PLACE_STOP_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_PLACE_STOP_ORDER_FAILED",
        request: action.request,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_CANCEL_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: yield saga.orig.call(api.cancelOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_CANCEL_ORDER_FAILED",
          request: action.request,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_CANCEL_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_CANCEL_ORDER_FAILED",
        request: action.request,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_CANCEL_STOP_ORDER_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: yield saga.orig.call(api.cancelStopOrder, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_CANCEL_STOP_ORDER_FAILED",
          request: action.request,
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_CANCEL_STOP_ORDER_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_CANCEL_STOP_ORDER_FAILED",
        request: action.request,
        error: e,
      });
    }
  } else if (action.type === "BYBIT_CANCEL_ALL_ORDERS_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: yield saga.orig.call(api.cancelAllOrders, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_CANCEL_ALL_ORDERS_FAILED",
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_CANCEL_ALL_ORDERS_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_CANCEL_ALL_ORDERS_FAILED",
        error: e,
      });
    }
  } else if (action.type === "BYBIT_SET_TRADING_STOP_REQUEST") {
    try {
      const {result, timeout} = yield saga.orig.race({
        result: yield saga.orig.call(api.setTradingStop, action.request),
        timeout: saga.orig.delay(5000),
      });
      if (timeout) {
        yield saga.put({
          type: "BYBIT_SET_TRADING_STOP_FAILED",
          error: "timeout",
        });
      } else {
        yield saga.put({type: "BYBIT_SET_TRADING_STOP_SUCCESS"});
      }
    } catch (e) {
      yield saga.put({
        type: "BYBIT_SET_TRADING_STOP_FAILED",
        error: e,
      });
    }
  }
}

function* calculateRealizedPnL(
  position: bybitws.Position,
  api: ByBitAPI,
): Generator<*, {realizedPnL: number, executions: Array<bybitws.OrderExecution>}, *> {
  if (position.side === "None") {
    return {realizedPnL: 0, executions: []};
  }

  // Conseguimos la fecha de creacion de la posicion recorriendo la lista de ejecuciones a la inversa.
  let page = 1;
  let positionCreatedAt = 0;
  let positionSize = position.size;
  let executions = [];
  while (true) {
    const result = yield saga.orig.call(api.getExecutions, {
      symbol: position.symbol,
      order: "desc",
      page,
    });
    if (result.length > 0) {
      result
        .filter((i) => i.exec_type === "Trade")
        .find((i) => {
          executions.push(i);
          positionSize += i.closed_size > 0 ? i.closed_size : -i.exec_qty;
          if (positionSize <= 0) {
            positionCreatedAt = Math.round(new Date(i.trade_time).getTime() / 1000);
            return true;
          }
        });
      if (positionCreatedAt > 0) {
        break;
      }
      page += 1;
      if (page >= 50) {
        throw new Error(
          "Execution history for position is taking too many pages",
        );
      }
    } else {
      break;
    }
  }
  if (positionCreatedAt === 0) {
    return {realizedPnL: 0, executions: []};
  }

  // Sumamos la renta realizada.
  let realizedPnL = 0;
  page = 1;
  while (true) {
    const result = yield saga.orig.call(api.getClosedPnL, {
      symbol: position.symbol,
      order: "desc",
      page,
      start_time: positionCreatedAt,
    });
    if (result.data?.length > 0) {
      result.data.forEach((i) => {
        realizedPnL += i.closed_pnl;
      });
      page += 1;
      if (page >= 50) {
        throw new Error("PnL history for position is taking too many pages");
      }
    } else {
      break;
    }
  }
  if (realizedPnL === 0) {
    return {realizedPnL: 0, executions};
  }
  return {realizedPnL, executions};
}

function* onByBitStartConnectionRequest(action: Action) {
  if (action.type !== "BYBIT_START_CONNECTION_REQUEST")
    throw new Error("Invalid Action");

  const ws = new ByBitWebSocket({
    apiKey: action.apiKey,
    apiSecret: action.apiSecret,
    endpoint: "wss://stream.bybit.com/realtime",
  });

  const api = new ByBitAPI({
    apiKey: action.apiKey,
    apiSecret: action.apiSecret,
    endpoint: "", // proxy "https://api.bybit.com",
  });

  yield saga.orig.fork(function*() {
    let wallets = yield saga.orig.call(api.getWallets);
    yield* saga.put({
      type: "BYBIT_WALLETS_UPDATE",
      wallets,
    });
  });

  yield saga.orig.fork(function*() {
    let orders = yield saga.orig.call(api.getOrders, {symbol: action.symbol});
    yield* saga.put({
      type: "BYBIT_ORDERS_UPDATE",
      orders,
    });
  });

  yield saga.orig.fork(function*() {
    let orders = yield saga.orig.call(api.getStopOrders, {
      symbol: action.symbol,
    });
    yield* saga.put({
      type: "BYBIT_STOP_ORDERS_UPDATE",
      orders,
    });
  });

  yield saga.orig.fork(function*() {
    let position: bybitws.Position = yield saga.orig.call(api.getPositions, {
      symbol: action.symbol,
    });
    yield* saga.put({
      type: "BYBIT_POSITIONS_UPDATE",
      positions: [position],
    });
    const {realizedPnL, executions} = yield* calculateRealizedPnL(position, api);
    yield* saga.put({
      type: "BYBIT_POSITION_EXTRA_INFO_UPDATE",
      symbol: position.symbol,
      sizeValidation: position.size,
      realizedPnL,
      executions,
    });
  });

  let wsChannel = yield saga.orig.call(ws.createEventChannel);

  const channels = {
    wsEvent: saga.orig.take(wsChannel),
    subscriptionAction: saga.orig.take(
      yield saga.orig.actionChannel([
        "BYBIT_UNSUBSCRIBE_BAR_UPDATES",
        "BYBIT_SUBSCRIBE_BAR_UPDATES",
      ]),
    ),
    tradingSettingsAction: saga.orig.take(
      yield saga.orig.actionChannel(["UPDATE_TRADING_SETTINGS"]),
    ),
    apiAction: saga.orig.take(
      yield saga.orig.actionChannel([
        "BYBIT_CANCEL_ORDER_REQUEST",
        "BYBIT_CANCEL_ALL_ORDERS_REQUEST",
        "BYBIT_SET_TRADING_STOP_REQUEST",
        "BYBIT_PLACE_ORDER_REQUEST",
        "BYBIT_REPLACE_ORDER_REQUEST",
        "BYBIT_DISCONNECT_REQUEST",
        "BYBIT_PLACE_STOP_ORDER_REQUEST",
        "BYBIT_CANCEL_STOP_ORDER_REQUEST",
        "BYBIT_REPLACE_STOP_ORDER_REQUEST",
      ]),
    ),
  };

  const barSubscriptions: BarSubscriptionMap = new Map();
  const barSubscriptionIdToKey: Map<string, string> = new Map();

  // Wait for the main loop until websocket connects.
  while (true) {
    const wsEvent = yield saga.orig.take(wsChannel);
    yield* handleWSEvent(ws, wsEvent, barSubscriptions);
    break;
  }

  while (true) {
    const {
      wsEvent,
      subscriptionAction,
      tradingSettingsAction,
      apiAction,
    }: {
      wsEvent: ?bybitws.WSEvent,
      subscriptionAction: ?Action,
      apiAction: ?Action,
      tradingSettingsAction: ?Action,
    } = yield saga.orig.race(channels);
    if (wsEvent) {
      yield* handleWSEvent(ws, wsEvent, barSubscriptions);
    } else if (
      tradingSettingsAction &&
      tradingSettingsAction.type === "UPDATE_TRADING_SETTINGS"
    ) {
      yield saga.orig.fork(
        handleUpdateTradingSettings,
        api,
        tradingSettingsAction.settings,
      );
    } else if (apiAction) {
      if (apiAction.type === "BYBIT_DISCONNECT_REQUEST") {
        break;
      } else {
        yield saga.orig.fork(handleApiAction, api, apiAction);
      }
    } else if (subscriptionAction?.type === "BYBIT_SUBSCRIBE_BAR_UPDATES") {
      const key = (
        subscriptionAction.resolution +
        "." +
        subscriptionAction.symbol
      )
        .replace("1D", "D")
        .replace("1W", "W")
        .replace("1Y", "Y");
      if (!subscriptionAction.symbol.startsWith(".")) {
        // solo subscribimos si no es mark price.
        const topic = "klineV2." + key;
        ws.sendMessage({
          op: "subscribe",
          args: [topic],
        });
        currentTopicSubscriptions.push(topic);
      }
      const subs = barSubscriptions.get(key) || new Map();
      subs.set(subscriptionAction.id, subscriptionAction.callback);
      barSubscriptions.set(key, subs);
      barSubscriptionIdToKey.set(subscriptionAction.id, key);
    } else if (subscriptionAction?.type === "BYBIT_UNSUBSCRIBE_BAR_UPDATES") {
      const key = barSubscriptionIdToKey.get(subscriptionAction.id);
      if (key) {
        const subs = barSubscriptions.get(key) || new Map();
        subs.delete(subscriptionAction.id);
        barSubscriptionIdToKey.delete(subscriptionAction.id);
        if (key.indexOf("..") === -1) {
          // solo desubscribimos si no es mark price.
          const topic =
            "klineV2." +
            key
              .replace("1D", "D")
              .replace("1W", "W")
              .replace("1Y", "Y");
          ws.sendMessage({
            op: "unsubscribe",
            args: [topic],
          });
          const index = currentTopicSubscriptions.indexOf(topic);
          if (index !== -1) {
            currentTopicSubscriptions.splice(index, 1);
          }
        }
      }
    }
  }
  wsChannel.close();
}

function* handleWSEvent(
  ws: ByBitWebSocket,
  wsEvent: bybitws.WSEvent,
  barSubscriptions: BarSubscriptionMap,
): * {
  switch (wsEvent.type) {
    case "connection_opened":
      ws.sendMessage({
        op: "subscribe",
        args: [...currentTopicSubscriptions],
      });
      break;
    case "inbound_message":
      yield* handleWSInboundMessage(ws, wsEvent.payload, barSubscriptions);
      break;
    default:
      console.log("wsEvent", wsEvent);
  }
}

function* handleWSInboundMessage(
  ws: ByBitWebSocket,
  message: bybitws.InboundMessage,
  barSubscriptions: BarSubscriptionMap,
) {
  if (message.topic === ORDER_BOOK_TOPIC && message.success === undefined) {
    switch (message.type) {
      case "snapshot":
        yield* saga.put({
          type: "BYBIT_ORDER_BOOK_SNAPSHOT_RECEIVED",
          orders: message.data,
        });
        break;
      case "delta":
        yield* saga.put({
          type: "BYBIT_ORDER_BOOK_DELTA_RECEIVED",
          delete: message.data.delete,
          update: message.data.update,
          insert: message.data.insert,
        });
        break;
      default:
        (message: empty);
        console.log(
          "websocket: unexpected order book inbound message",
          message,
        );
        break;
    }
  } else if (
    message.topic === INSTRUMENT_INFO_TOPIC &&
    message.success === undefined
  ) {
    switch (message.type) {
      case "snapshot":
        yield* saga.put({
          type: "BYBIT_INSTRUMENT_INFO_SNAPSHOT_RECEIVED",
          data: message.data,
        });
        break;
      case "delta":
        yield* saga.put({
          type: "BYBIT_INSTRUMENT_INFO_DELTA_RECEIVED",
          delete: message.data.delete,
          update: message.data.update,
          insert: message.data.insert,
        });

        // Actualizamos listeners de updates de bars.
        message.data.update.forEach((instrument) => {
          const lastBar = lastBarBySymbol.get(instrument.symbol);
          if (lastBar) {
            const {bar, callbacks, resolution} = lastBar;
            const {close, low, high, confirm} = bar;
            const {mark_price_e4, last_price_e4} = instrument;
            const time = normalizeBarTimestamp(
              resolution,
              bar.timestamp / 1000,
            );

            // Actualizamos callbacks de precio normal.
            if (
              !confirm &&
              last_price_e4 &&
              Math.round(Number(close) * 1e4) !== last_price_e4
            ) {
              const price = last_price_e4 / 1e4;
              bar.close = price;
              bar.low = price < low ? price : low;
              bar.high = price > high ? price : high;
              callbacks.forEach((callback) =>
                callback({
                  time,
                  open: bar.open,
                  high: bar.high,
                  low: bar.low,
                  close: bar.close,
                  volume: bar.volume,
                }),
              );
            }

            // Actualizamos callback de mark price.
            const markSymbol = "." + instrument.symbol;
            const markCallbacks = Array.from(
              (
                barSubscriptions.get(resolution + "." + markSymbol) || new Map()
              ).values(),
            );
            let lastMarkBar = lastBarBySymbol.get(markSymbol);
            if (!lastMarkBar) {
              lastMarkBar = {
                bar: {
                  start: 0,
                  end: 0,
                  turnover: 0,
                  confirm: false,
                  cross_seq: 0,
                  timestamp: 0,
                  open: 0,
                  close: 0,
                  low: Infinity,
                  high: -Infinity,
                  volume: 0,
                },
                resolution,
                callbacks: [], // no lo usamos.
              };
              lastBarBySymbol.set(markSymbol, lastMarkBar);
            }
            const markBar = lastMarkBar.bar;
            if (
              !confirm &&
              mark_price_e4 &&
              Math.round(Number(markBar.close) * 1e4) !== mark_price_e4
            ) {
              const price = mark_price_e4 / 1e4;
              markBar.close = price;
              markBar.low = price < low ? price : low;
              markBar.high = price > high ? price : high;
              markCallbacks.forEach((callback) =>
                callback({
                  time,
                  open: markBar.open,
                  high: markBar.high,
                  low: markBar.low,
                  close: markBar.close,
                  volume: markBar.volume,
                }),
              );
            }
          }
        });

        break;
      default:
        (message: empty);
        console.log(
          "websocket: unexpected instrument info inbound message",
          message,
        );
        break;
    }
  } else if (message.topic === "order" && message.success === undefined) {
    yield* saga.put({
      type: "BYBIT_ORDERS_UPDATE",
      orders: message.data,
    });
  } else if (message.topic === "stop_order" && message.success === undefined) {
    yield* saga.put({
      type: "BYBIT_STOP_ORDERS_UPDATE",
      orders: message.data,
    });
  } else if (message.topic === "position" && message.success === undefined) {
    yield* saga.put({
      type: "BYBIT_POSITIONS_UPDATE",
      positions: message.data,
    });
    yield* message.data.map((i) =>
      saga.orig.call(updatePositionStopLossIfNeeded, i),
    );
  } else if (message.topic === "wallet" && message.success === undefined) {
    yield* saga.put({
      type: "BYBIT_WALLETS_UPDATE",
      wallets: message.data,
    });
  } else if (message.topic === "execution" && message.success === undefined) {
    yield* saga.put({
      type: "BYBIT_ORDERS_EXECUTION_UPDATE",
      executions: message.data,
    });
  } else if (message.topic && message.topic.startsWith("klineV2.")) {
    message = ((message: any): bybitws.KlineMessage);
    const data = message.data;
    const [_, resolution, symbol] = message.topic.split(".");
    const key = resolution + "." + symbol;
    const subs = barSubscriptions.get(key) || new Map();
    const resolutionAny: any = resolution;
    const callbacks = Array.from(subs.values());
    // Actualizamos listeners de updates de bars.
    data.forEach((bar) => {
      callbacks.forEach((callback) =>
        callback({
          time: normalizeBarTimestamp(resolutionAny, bar.timestamp / 1000),
          open: bar.open,
          high: bar.high,
          low: bar.low,
          close: bar.close,
          volume: bar.volume,
        }),
      );
      const lastBar = lastBarBySymbol.get(symbol);
      if (!lastBar || lastBar.bar.timestamp < bar.timestamp) {
        lastBarBySymbol.set(symbol, {
          bar,
          callbacks,
          resolution: resolutionAny,
        });
      }
    });
    // @FlowFixMe
  } else if (message.success === true) {
    // No hacemos nada
  } else if (message.success === false) {
    console.log("websocket: failed operation", message);
  } else {
    console.log("websocket: unexpected inbound message", message);
  }
}

function* mySagas(): Generator<any, any, any> {
  yield saga.orig.all([
    saga.orig.takeEvery(
      "BYBIT_START_CONNECTION_REQUEST",
      onByBitStartConnectionRequest,
    ),
  ]);
}

export default mySagas;
