import { callAsync, watchAsync } from '@insights-gaming/saga-utils';
import { Predicate, Task } from '@redux-saga/types';
import { PayloadAction } from '@reduxjs/toolkit';
import { StartPublicSession_Mutation, StartSession_Mutation } from 'apollo/mutations';
import { StartPublicSessionMutation_startPublicSession } from 'apollo/mutations/types/StartPublicSessionMutation';
import { StartSessionMutation_startSession } from 'apollo/mutations/types/StartSessionMutation';
import { GetUserProfileQuery_me } from 'apollo/queries/types/GetUserProfileQuery';
import { createGrantAction } from 'factories/kmsessionEventFactory';
import { colorGenerator } from 'helpers/colorGenerator';
import { watchAsyncMutation } from 'helpers/saga/effects';
import flow from 'lodash/flow';
import once from 'lodash/once';
import { END, MulticastChannel, multicastChannel, SagaIterator, TakeableChannel } from 'redux-saga';
import { all, call, cancel, cancelled, put, race, select, spawn, take, takeEvery, takeMaybe } from 'redux-saga/effects';
import { getMe } from 'selectors/getMe';
import { IErrorMessage, IRegistration, KMSessionIncomingEvent, KMSessionOutgoingAction } from 'types/pigeon/kmsession';

import { getLiveSessionGrantsType, getLiveSessionState } from './live-session-selector';
import { clearLiveSessionAC, ConnectAndRegisterParams, liveSessionAnnotationActionAC, liveSessionAnnotationEventAC, liveSessionColorActionAC, liveSessionColorEventAC, liveSessionConnectAC, liveSessionConnectAndRegisterAC, liveSessionDisconnectAC, liveSessionGrantActionAC, liveSessionGrantEventAC, liveSessionGrantsEventAC, liveSessionHostChangeEventAC, liveSessionIncomingMessageEventAC, liveSessionMessageActionAC, liveSessionPingEventAC, liveSessionRegisteredEventAC, liveSessionRegistrationActionAC, liveSessionRegistrationErrorAC, liveSessionSendActionAC, liveSessionSendActionWithIdAC, LiveSessionState, liveSessionStateEventAC, liveSessionTerminatedEventAC, liveSessionUserConnectedEventAC, liveSessionUserDisconnectedEventAC, liveSessionUserKickedEventAC, liveSessionVideoActionAC, liveSessionVideoEventAC, liveSessionVideoLoadEventAC, reconnectLiveSessionAC, setLiveSessionErrorAC, startPrivateSessionAsyncAC, startPublicSessionAsyncAC, startSessionAsyncAC, StartSessionAsyncParams } from './live-session-slice';

type IncomingEventWithId = KMSessionIncomingEvent & { id?: any };

function makeSessionUrl(proto: string, token: string): string {
  return proto + window.location.protocol.substr(4) + '//' + window.location.host + '/session/' + token;
}

async function checkSessionHead(token: string): Promise<boolean> {
  return (await fetch(makeSessionUrl('http', token), { method: 'HEAD' })).ok;
}

function websocketConnect(token: string): Promise<WebSocket> {
  return new Promise<WebSocket>((resolve, reject) => {
    const websocket = new WebSocket(makeSessionUrl('ws', token));
    websocket.onerror = () => reject(new Error('failed to connect'));
    websocket.onopen = () => {
      websocket.onerror = null;
      resolve(websocket);
    };
  });
}

function multicastEventChannel<T>(subscribe: (emit: (value: T | END) => void) => () => void): MulticastChannel<T> {
  const chan = multicastChannel<T>();
  const unsubscribe = subscribe((value) => {
    if (value === END) {
      chan.close();
    } else {
      chan.put(value);
    }
  });
  const originalTake = chan.take.bind(chan);
  return Object.assign(chan, {
    close: once(flow(unsubscribe, chan.close.bind(chan))),
    take: (cb: (message: T | END) => void, matcher?: Predicate<T>) => {
      // NOTE: This is workaround for multicast channels not working properly
      //       with take effects. This should probably be fixed upstream, but
      //       submitting such a pull request is out of scope :kappa:.
      if (typeof matcher !== 'function' && typeof matcher !== 'undefined') {
        matcher = undefined;
      }

      return originalTake(cb, matcher);
    },
  });
}

function createWebSocketMulticastChannel(websocket: WebSocket): MulticastChannel<IncomingEventWithId> {
  return multicastEventChannel((emit) => {
    websocket.onmessage = ({ data }: MessageEvent) => emit(JSON.parse(data));
    websocket.onclose = () => emit(END);
    return () => websocket.close();
  });
}

const actionTypeToACDict = {
  message: liveSessionMessageActionAC,
  grant: liveSessionGrantActionAC,
  annotation: liveSessionAnnotationActionAC,
  video: liveSessionVideoActionAC,
  undefined: liveSessionRegistrationActionAC,
  color: liveSessionColorActionAC,
};

function* watchSendSessionActionAC(
  chan: TakeableChannel<IncomingEventWithId>,
  send: (data: any) => void,
): SagaIterator {
  let counter = 0;

  const tasks: Task[] = yield all([
    watchAsync(liveSessionSendActionAC, function* (action) {
      const createAction = actionTypeToACDict[action.type!];
      if (createAction) {
        yield put(createAction(action));
      }

      yield callAsync(liveSessionSendActionWithIdAC, { ...action, id: ++counter });
    }),
    watchAsync(liveSessionSendActionWithIdAC, function* (action) {
      yield call(send, action);

      const { error }: IErrorMessage = yield take(chan, (event: IncomingEventWithId) => event.id === action.id);
      if (error) {
        throw new Error(error);
      }
    }),
  ]);

  yield takeMaybe(chan, () => false);
  yield cancel(tasks);
}

type WaitForLiveSessionConnectResult = [LiveSessionConnectResult[0], ConnectAndRegisterParams | undefined, any];

function* waitForLiveSessionConnect(): SagaIterator<WaitForLiveSessionConnectResult | null> {
  let token: string;
  let registration: IRegistration | undefined;
  let meta: any;
  let reconnecting = false;

  const state: LiveSessionState | undefined = yield select(getLiveSessionState);
  if (state?.type === 'reconnecting') {
    token = state.token;
    registration = {
      token: state.secret,
      name: state.guestName,
    };
    reconnecting = true;
  } else if (state?.type === 'pending') {
    token = state.token;
    reconnecting = true;
  } else {
    let payload: string | ConnectAndRegisterParams;
    ({ payload, meta } = yield take([
      liveSessionConnectAC.started,
      liveSessionConnectAndRegisterAC.started,
    ]));

    if (typeof payload === 'string') {
      token = payload;
    } else {
      ({ token, registration } = payload);
    }
  }

  try {
    const result: LiveSessionConnectResult | null = yield call(liveSessionConnect, token, registration);
    if (!result) {
      return null;
    }

    const [chan, requiresSecret] = result;

    let params: ConnectAndRegisterParams | undefined;
    if (!registration) {
      yield put(liveSessionConnectAC.done({ params: token, result: requiresSecret }));
    } else {
      params = { token, registration };
    }

    return [chan, params, meta] as WaitForLiveSessionConnectResult;
  } catch (error) {
    if (reconnecting) {
      throw error;
    }

    if (registration) {
      yield put(liveSessionConnectAndRegisterAC.failed({ params: { token, registration }, error }, meta));
    } else {
      yield put(liveSessionConnectAC.failed({ params: token, error }, meta));
    }

    return null;
  }
}

type LiveSessionConnectResult = [MulticastChannel<IncomingEventWithId> & { token: string }, boolean];

function* liveSessionConnect(
  token: string,
  registration?: IRegistration,
): SagaIterator<LiveSessionConnectResult | null> {
  const isValid = yield call(checkSessionHead, token);
  if (!isValid) {
    throw new Error('connection failed: live session not found');
  }

  const ws: WebSocket = yield call(websocketConnect, token);

  const chan = createWebSocketMulticastChannel(ws);

  yield spawn(watchSendSessionActionAC, chan, (data: any) => ws.send(JSON.stringify(data)));

  try {
    try {
      for (; ;) {
        const event: IncomingEventWithId = yield take(chan);

        if (event.type === 'hello') {
          if (registration) {
            yield put(liveSessionSendActionAC.started(registration));
          }

          return [Object.assign(chan, { token }), event.secret] as LiveSessionConnectResult;
        } else if (event.type === 'error' && event.error) {
          throw new Error(event.error);
        }
      }
    } finally {
      if ((yield cancelled()) as boolean) {
        throw new Error('connection closed unexpectedly');
      }
    }
  } catch (error) {
    ws.close();
    throw error;
  }
}

const simpleEventTypeToACDict = {
  video: liveSessionVideoEventAC,
  video_load: liveSessionVideoLoadEventAC,
  disconnected: liveSessionUserDisconnectedEventAC,
  host_change: liveSessionHostChangeEventAC,
  ping: liveSessionPingEventAC,
  color: liveSessionColorEventAC,
  message: liveSessionIncomingMessageEventAC,
  annotation: liveSessionAnnotationEventAC,
  state: liveSessionStateEventAC,
  grant: liveSessionGrantEventAC,
  grants: liveSessionGrantsEventAC,
  kicked: liveSessionUserKickedEventAC,
};

function* liveSessionConnectionWorker(
  chan: MulticastChannel<IncomingEventWithId> & { token: string },
  autoRegisterParams?: ConnectAndRegisterParams,
  meta?: any,
): SagaIterator<ConnectAndRegisterParams | void> {
  const colors = colorGenerator();
  let registered = false;
  try {
    for (; ;) {
      const [event]: [IncomingEventWithId | END | undefined] = yield race([
        takeMaybe(chan),
        take(liveSessionDisconnectAC),
      ]);

      if (event === END) {
        if (registered) {
          yield put(reconnectLiveSessionAC());
        } else {
          const sendAction: PayloadAction<KMSessionOutgoingAction> = yield take(liveSessionSendActionAC.started);

          const [result]: [WaitForLiveSessionConnectResult | null | undefined] = yield race([
            call(waitForLiveSessionConnect),
            take(liveSessionDisconnectAC),
          ]);

          if (result) {
            [chan] = result;
            yield put(sendAction);
            continue;
          }
        }

        yield cancel();
        return;
      } else if (!event) {
        yield put(clearLiveSessionAC());
        return;
      }

      if (!registered) {
        switch (event.type) {
          case 'error':
            if (event.error) {
              yield put(liveSessionRegistrationErrorAC(event.error));
            }
            break;

          case 'registered':
            yield put(liveSessionRegisteredEventAC({
              ...event,
              token: chan.token,
              color: event.color || colors.next().value,
              users: event.users.map((user) => ({
                ...user,
                color: user.color || colors.next().value,
              })),
            }));
            registered = true;
            if (autoRegisterParams) {
              yield put(liveSessionConnectAndRegisterAC.done({ params: autoRegisterParams }, meta));
            }
        }
      } else {
        switch (event.type) {
          case 'terminated':
            yield put(liveSessionTerminatedEventAC(event));
            return;

          case 'connected':
            yield put(liveSessionUserConnectedEventAC({ ...event, color: event.user.color || colors.next().value }));
            yield put(liveSessionSendActionAC.started(createGrantAction({
              action: 'grant',
              targets: [event.user.id],
              capabilities: ['annotate'],
            })));
            break;

          default:
            if (event.type in simpleEventTypeToACDict) {
              yield put(simpleEventTypeToACDict[event.type](event));
            }
        }
      }
    }
  } catch (error) {
    if (!registered && autoRegisterParams) {
      yield put(liveSessionConnectAndRegisterAC.failed({ params: autoRegisterParams, error }));
    } else {
      throw error;
    }
  }
}

function* watchConnectSessionAC(): SagaIterator {
  try {
    for (; ;) {
      const result: WaitForLiveSessionConnectResult | null = yield call(waitForLiveSessionConnect);
      if (!result) {
        continue;
      }

      try {
        yield call(liveSessionConnectionWorker, ...result);
      } catch (error) {
        yield put(setLiveSessionErrorAC({ token: result[0].token, error }));
      } finally {
        result[0].close();
      }
    }
  } catch (error) {
    yield put(setLiveSessionErrorAC({ error }));
  } finally {
    // make sure this never goes away
    yield spawn(watchConnectSessionAC);
  }
}

function* watchHostChangeEventAC() {
  yield takeEvery(liveSessionHostChangeEventAC, function* ({ payload }) {
    const me: GetUserProfileQuery_me | undefined = yield select(getMe);
    const grantsType: string | undefined = yield select(getLiveSessionGrantsType);
    if (!me || !grantsType) {
      return;
    }

    if (payload.host === me.id || grantsType === 'host') {
      yield put(liveSessionSendActionAC.started({ type: 'query', query: 'grants' }));
    }
  });
}

function* startSessionWorker(
  params: StartSessionAsyncParams,
): SagaIterator<StartSessionMutation_startSession | StartPublicSessionMutation_startPublicSession> {
  switch (params.type) {
    case 'public':
      return yield callAsync(startPublicSessionAsyncAC, params.input);

    case 'private':
      return yield callAsync(startPrivateSessionAsyncAC, params.input);
  }
}

export default function* liveSessionSaga() {
  yield all([
    watchConnectSessionAC(),
    watchAsyncMutation(startPublicSessionAsyncAC, StartPublicSession_Mutation, ['startPublicSession']),
    watchAsyncMutation(startPrivateSessionAsyncAC, StartSession_Mutation, ['startSession']),
    watchAsync(startSessionAsyncAC, startSessionWorker),
    watchHostChangeEventAC(),
  ]);
}
