// import { Observable } from 'rxjs/Observable'
import { Subscription, Subject, PartialObserver } from 'rxjs'
import { filter, sampleTime, map } from 'rxjs/operators'

import { Event as MT4Message, MT4Connector, Tick } from '@bdswiss/mt4-connector'

import { Bar, LibrarySymbolInfo, SubscribeBarsCallback } from '../types/datafeed-api'
import { SymbolSessions, convertSymbolSessions, resolution2Seconds, removeServerOffset } from '../utils'

interface DataSubscriber {
  symbol: string
  resolution: string
  sessions: SymbolSessions
  subscription: Subscription
  lastBarTime?: number
}
interface DataSubscribers {
  [guid: string]: DataSubscriber
}

export class DataPulseProvider {
  private static instance: DataPulseProvider
  private readonly subscribers: DataSubscribers = {}
  private readonly mt4: MT4Connector = MT4Connector.Instance

  public static get Instance(): DataPulseProvider {
    return this.instance || (this.instance = new this())
  }

  public subscribeBars(
    symbolInfo: LibrarySymbolInfo,
    resolution: string,
    newDataCallback: SubscribeBarsCallback,
    listenerGuid: string,
    lastBar: Bar,
  ): void {
    if (this.subscribers.hasOwnProperty(listenerGuid)) {
      // console.log(`DataPulseProvider: already has subscription with id=${listenerGuid}`);
      return
    }

    const seconds = resolution2Seconds(resolution)
    const symbol = symbolInfo.name
    // const ticksObservable: Observable<Tick> = this.mt4.emitAndSubscribe(MT4Command.SUBSCRIBE, { symbol }, MT4Message.TICK);
    const ticksObservable = new Subject<Tick>()
    const subscription = ticksObservable
      .pipe(
        filter((tick) => tick.s === symbol),
        sampleTime(200),
        map((tick) => (seconds < 86400 ? Object.assign(tick, { t: removeServerOffset(tick.t) }) : tick)),
      )
      .subscribe((data) => {
        // set lastBarTime
        const { high, low, time } = lastBar
        let { lastBarTime } = this.subscribers[listenerGuid]
        if (!lastBarTime) this.subscribers[listenerGuid].lastBarTime = lastBarTime = time

        // Price calculation taken from android implementation
        const newPrice = seconds === 0 ? (data.mb + data.ma) / 2 : data.b
        // no volume in Ticks so zeroing it
        lastBar.volume = undefined
        lastBar.close = newPrice
        const nextBarTime = lastBarTime + seconds * 1000
        // console.log('LAST', new Date(lastBar.time).toISOString())
        // console.log('STORED_LAST', new Date(lastBarTime).toISOString())
        // console.log('NEXT', new Date(nextBarTime).toISOString())
        // console.log('CURRENT', new Date(data.t * 1000).toISOString())
        if (nextBarTime > data.t * 1000) {
          if (high < newPrice) {
            lastBar.high = newPrice
          }
          if (low > newPrice) {
            lastBar.low = newPrice
          }
        } else {
          // console.log('==SET NEXT')
          lastBar.high = lastBar.low = lastBar.open = newPrice
          lastBar.time = this.subscribers[listenerGuid].lastBarTime = data.t * 1000
        }
        newDataCallback(lastBar)
      })
    this.mt4.onMessage(MT4Message.TICK).subscribe(ticksObservable as PartialObserver<unknown>)
    this.mt4.subscribeToSymbol(symbol)

    this.subscribers[listenerGuid] = {
      symbol,
      resolution,
      sessions: convertSymbolSessions(symbolInfo),
      subscription,
    }

    // console.log(`DataPulseProvider: subscribed #${listenerGuid} for {${symbolInfo.name}, ${resolution}}`);
  }

  public unsubscribeBars(listenerGuid: string): void {
    const subscriber = this.subscribers[listenerGuid]
    subscriber.subscription.unsubscribe()
    this.mt4.unsubscribeFromSymbol(subscriber.symbol)
    delete this.subscribers[listenerGuid]
    // console.log(`DataPulseProvider: unsubscribed #${listenerGuid} for {${subscriber.symbol}, ${subscriber.resolution}}`);
  }
}
