import { Inject, Injectable } from '@angular/core';

// 3rd party
import {
  filter,
  map,
  pairwise,
  startWith,
  switchMap,
  take,
  tap
} from 'rxjs/operators';
import { BehaviorSubject, combineLatest, Subject } from 'rxjs';
import { io, Socket } from 'socket.io-client';

// App
import {
  RealtimeServerSocketMessage,
  SocketConnectionState,
  SocketMessage
} from './types';
import { ApiService } from '../api';
import { DeviceService } from '../device';
import { AuthService } from '../auth';
import { DISCONNECTED_STATE } from './constants';
import { ISocketService } from './socket.service.interface';
import { ApiSurfaces, SOCKET_SERVER_URL_TOKEN } from 'models';

@Injectable({
  providedIn: 'root'
})
export class SocketService implements ISocketService {
  private _uid: string;
  socket: Socket;

  // Internal stream of socket connection state
  private _connectionStateChanged: BehaviorSubject<SocketConnectionState> =
    new BehaviorSubject(DISCONNECTED_STATE);

  // Public version of socket state stream
  // Only actually emit if the uid or connected state have changed
  readonly connectionStateChanged$ = this._connectionStateChanged
    .asObservable()
    .pipe(
      startWith(DISCONNECTED_STATE),
      pairwise(),
      filter(([a, b]) => a?.uid !== b?.uid || a?.connected !== b?.connected),
      map(([, b]) => b)
    );

  // Internal stream of real time events coming over the socket
  private _socketMessages$: Subject<RealtimeServerSocketMessage> =
    new Subject();

  // Public version of socket stream
  readonly realtimeSocketServerMessages$ = this._socketMessages$.asObservable();

  constructor(
    @Inject(SOCKET_SERVER_URL_TOKEN) private _socketServerUrl: string,
    private _api: ApiService,
    private _auth: AuthService,
    private _device: DeviceService
  ) {
    // Whenever the user state changes or the current slug changes,
    // reset state to disconnected, fetch a new socket token, and
    // reinitialize the socket connection
    combineLatest([this._auth.authState$, this._device.currentSlug$])
      .pipe(
        tap(([user]) => this._setConnected(false, null, user?.uid)),
        filter(([user]) => !!user?.uid),
        switchMap(() =>
          this._api.post<{ token: string }>(
            ApiSurfaces.AUTH,
            '/auth/socket_token'
          )
        )
      )
      .subscribe((res) => this._initSocket(res?.token));
  }

  // If the socket is disconnected for any reason, messages will
  // be queued up and fired off when the socket comes back online
  sendMessage({ message, payload }: SocketMessage) {
    this._connectionStateChanged
      .pipe(
        filter(({ connected, error }) => connected && !error),
        take(1)
      )
      .subscribe(() => this.socket?.emit(message, payload));
  }

  private _setConnected(
    connected: boolean,
    error: Error = null,
    uid = this._uid
  ) {
    this._uid = uid;

    if (!connected) {
      this.socket?.close();
    }

    this._connectionStateChanged.next({
      connected,
      error,
      uid
    });
  }

  private _initSocket(token: string) {
    if (!token || !this._uid) {
      return;
    }

    const slug = this._device.currentSlug;
    this.socket = io(this._socketServerUrl, {
      path: '/transport',
      transports: ['websocket'],
      auth: { token },
      query: { token, slug }
    });

    // Set up socket callbacks
    // Publish events that come over the socket to the observable stream
    // Handle connect, disconnect, and errors events by setting the
    // connection state as appropriate
    this.socket.on('event', (args) => this._socketMessages$.next(args));
    this.socket.on('connect', () => this._setConnected(true));
    this.socket.on('disconnect', () => this._setConnected(false));
    this.socket.on('connect_error', (e) => this._setConnected(false, e));
  }
}
