"use strict";

var __awaiter = this && this.__awaiter || function (thisArg, _arguments, P, generator) {
  function adopt(value) {
    return value instanceof P ? value : new P(function (resolve) {
      resolve(value);
    });
  }
  return new (P || (P = Promise))(function (resolve, reject) {
    function fulfilled(value) {
      try {
        step(generator.next(value));
      } catch (e) {
        reject(e);
      }
    }
    function rejected(value) {
      try {
        step(generator["throw"](value));
      } catch (e) {
        reject(e);
      }
    }
    function step(result) {
      result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected);
    }
    step((generator = generator.apply(thisArg, _arguments || [])).next());
  });
};
Object.defineProperty(exports, "__esModule", {
  value: true
});
exports.BbitWebSocketClient = void 0;
const rxjs_1 = require("rxjs");
const operators_1 = require("rxjs/operators");
const log_1 = require("../log/log");
const utils_1 = require("../utils/utils");
const interfaces_1 = require("./interfaces");
class BbitWebSocketClient {
  constructor(config, unsubscriber) {
    this._log = log_1.BbitLog.scope({
      class: 'BbitWebSocketClient'
    });
    this._isConnectedSubject = new rxjs_1.BehaviorSubject(false);
    this._connectingPromise = null;
    this._messageSubject = new rxjs_1.Subject();
    this._unsubscribe = new rxjs_1.Subject();
    this._subscribedTopics = {};
    this._wsSubscribe = topic => __awaiter(this, void 0, void 0, function* () {
      const _id = utils_1.BbitUtils.makeId();
      yield new Promise((resolve, reject) => __awaiter(this, void 0, void 0, function* () {
        this._messageSubject.asObservable().pipe((0, operators_1.filter)(o => {
          return o._id === _id && o.type === interfaces_1.BbitWebSocketMessageType.SUBSCRIBE;
        }), (0, operators_1.timeout)(this._timeoutInMs), (0, operators_1.take)(1), (0, operators_1.takeUntil)(this._unsubscribe)).subscribe({
          error: err => {
            this._log.error('Error while subscribing', {
              _id,
              topic,
              err
            });
            reject(err);
          },
          complete: () => {
            this._log.success('Successfully subscribed', {
              _id,
              topic
            });
            resolve();
          }
        });
        this._send({
          _id,
          type: interfaces_1.BbitWebSocketMessageType.SUBSCRIBE,
          topic
        });
      }));
    });
    if (!unsubscriber) {
      unsubscriber = new rxjs_1.Subject();
    }
    unsubscriber.subscribe(this._unsubscribe);
    config.subscribe(c => __awaiter(this, void 0, void 0, function* () {
      if (this._connectingPromise) {
        this._log.warning('Websocket client is currently connecting, new config will be ignored', c);
        return;
      }
      if (!c || !c.url || !c.token) {
        if (this._isConnectedSubject.value) {
          this.disconnect();
        }
        this._url = null;
        this._token = null;
        return;
      }
      this._url = c.url;
      this._token = c.token;
      this._timeoutInMs = c.timeoutInMs || 15000;
      if (this._isConnectedSubject.value) {
        try {
          yield this.reconnect(true);
        } catch (err) {
          this._log.error('Could not reconnect websocket connection');
        }
      } else {
        yield this.connect();
      }
    }));
    this._isConnectedSubject.pipe((0, operators_1.takeUntil)(unsubscriber)).subscribe(isConnected => {
      this._connectingPromise = null;
      if (isConnected) {
        this._pingInterval = setInterval(() => {
          this._send({
            type: interfaces_1.BbitWebSocketMessageType.PING,
            topic: 'ping'
          });
        }, 10000);
      } else {
        if (this._pingInterval) {
          clearInterval(this._pingInterval);
        }
      }
    });
    this._messageSubject.pipe((0, operators_1.takeUntil)(unsubscriber)).subscribe(msg => {
      if (msg.type !== interfaces_1.BbitWebSocketMessageType.PING) {
        this._log.info('Received message', msg);
      }
    });
  }
  connect() {
    return __awaiter(this, arguments, void 0, function* (reconnectCurrentSubject = true) {
      if (!this._url) {
        const errorMsg = 'Url is not defined';
        this._log.error(errorMsg);
        return Promise.reject(errorMsg);
      }
      if (this._isConnectedSubject.value) {
        return;
      }
      if (this._connectingPromise !== null) {
        return yield this._connectingPromise;
      }
      this._connectingPromise = new Promise((resolve, reject) => {
        this._log.info('Connect websocket');
        this._ws = new WebSocket(this._url);
        this._ws.onopen = () => __awaiter(this, void 0, void 0, function* () {
          this._log.success('Successfully connected websocket');
          this._isConnectedSubject.next(true);
          if (reconnectCurrentSubject) {
            for (const key of Object.keys(this._subscribedTopics)) {
              yield this._wsSubscribe(key);
            }
          }
          resolve();
        });
        this._ws.onmessage = msg => {
          try {
            this._messageSubject.next(JSON.parse(msg.data));
          } catch (err) {
            this._log.warning('Could not parse websocket message', msg.data);
          }
        };
        this._ws.onclose = () => {
          this._log.info('Closed websocket connection');
          this._isConnectedSubject.next(false);
        };
        this._ws.onerror = err => {
          this._log.error('Error on websocket connection', err);
          this._isConnectedSubject.next(false);
          reject(err);
        };
      });
      return yield this._connectingPromise;
    });
  }
  isConnected() {
    return this._isConnectedSubject.value;
  }
  observeIsConnected() {
    return this._isConnectedSubject.asObservable();
  }
  _send(message) {
    message.timestamp = new Date();
    message.token = this._token;
    if (!message._id) {
      message._id = utils_1.BbitUtils.makeId();
    }
    if (message.type !== interfaces_1.BbitWebSocketMessageType.PING) {
      this._log.info('Send websocket message', message);
    }
    this._ws.send(JSON.stringify(message));
  }
  send(topic, data) {
    return __awaiter(this, void 0, void 0, function* () {
      yield this.connect();
      this._send({
        type: interfaces_1.BbitWebSocketMessageType.MESSAGE,
        topic,
        data
      });
    });
  }
  disconnect() {
    return __awaiter(this, arguments, void 0, function* (keepCurrentSubscriptions = false) {
      return yield new Promise((resolve, reject) => {
        if (this._isConnectedSubject.value) {
          this._isConnectedSubject.pipe((0, operators_1.filter)(isConnected => {
            return !isConnected;
          }), (0, operators_1.timeout)(this._timeoutInMs), (0, operators_1.take)(1)).subscribe({
            complete: () => {
              if (!keepCurrentSubscriptions) {
                this._unsubscribe.next(undefined);
                this._subscribedTopics = {};
              }
              resolve();
            },
            error: err => {
              this._log.error('Error on disconnect', err);
              reject(err);
            }
          });
          this._ws.close();
        }
      });
    });
  }
  reconnect() {
    return __awaiter(this, arguments, void 0, function* (keepCurrentSubscriptions = true) {
      yield this.disconnect(keepCurrentSubscriptions);
      yield this.connect(keepCurrentSubscriptions);
    });
  }
  subscribe(topic) {
    if (!this._subscribedTopics[topic]) {
      this._log.info('Subscribe to websocket topic', topic);
      const unsubscriber = new rxjs_1.Subject();
      const observable = this._messageSubject.asObservable().pipe((0, operators_1.filter)(o => {
        var _a;
        return (_a = o.topic) === null || _a === void 0 ? void 0 : _a.startsWith(topic);
      }), (0, operators_1.takeUntil)((0, rxjs_1.merge)(this._unsubscribe, unsubscriber)));
      this._subscribedTopics[topic] = {
        count: 1,
        unsubscriber,
        observable
      };
      return (0, rxjs_1.of)(null).pipe((0, operators_1.mergeMap)(() => __awaiter(this, void 0, void 0, function* () {
        yield this.connect();
        yield this._wsSubscribe(topic);
      })), (0, operators_1.catchError)(err => {
        this._log.error('Error while subscribing', {
          topic,
          err
        });
        return (0, rxjs_1.throwError)(() => err);
      }), (0, operators_1.mergeMap)(() => observable));
    }
    this._log.success('Topic is already subscribed, active Observable will be used', topic);
    this._subscribedTopics[topic].count++;
    return this._subscribedTopics[topic].observable;
  }
  unsubscribe(topic) {
    return __awaiter(this, void 0, void 0, function* () {
      this._log.info('Unsubscribe from websocket', {
        topic
      });
      yield new Promise((resolve, reject) => __awaiter(this, void 0, void 0, function* () {
        const _id = utils_1.BbitUtils.makeId();
        this._messageSubject.asObservable().pipe((0, operators_1.filter)(o => {
          return o._id === _id && o.type === interfaces_1.BbitWebSocketMessageType.UNSUBSCRIBE;
        }), (0, operators_1.timeout)(this._timeoutInMs), (0, operators_1.take)(1), (0, operators_1.takeUntil)(this._unsubscribe)).subscribe({
          error: err => {
            this._log.error('Error while unsubscribing', {
              _id,
              topic
            });
            reject(err);
          },
          complete: () => {
            const currentSub = this._subscribedTopics[topic];
            if (currentSub) {
              if (currentSub.count === 1) {
                currentSub.unsubscriber.next();
                currentSub.unsubscriber.complete();
                delete this._subscribedTopics[topic];
              } else {
                currentSub.count = currentSub.count - 1;
              }
            }
            this._log.success('Successfully unsubscribed', {
              _id,
              topic
            });
            resolve();
          }
        });
        this._send({
          _id,
          type: interfaces_1.BbitWebSocketMessageType.UNSUBSCRIBE,
          topic
        });
      }));
    });
  }
}
exports.BbitWebSocketClient = BbitWebSocketClient;
