import * as AWS from 'aws-sdk';
import {
  CONNECTION_STATE_CHANGE,
  ConnectionState,
  PubSub
} from '@aws-amplify/pubsub';
import { signIn, signOut, confirmSignIn, fetchAuthSession, getCurrentUser } from 'aws-amplify/auth';
import { Hub } from 'aws-amplify/utils';
import { Constants } from '../../../constants';
import { Injectable, OnDestroy } from '@angular/core';
import { ReplaySubject, Subscription } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { SetOperatorMqttStatus } from '../../../pages/device/device-control/store/actions/participants.actions';
import { Store } from '@ngxs/store';
import { debounceTime, delay, filter } from 'rxjs/operators';

export interface MqttSubscription {
  subscribeTopic(topic: string, callback: any): void;
}

export interface MqttSendMessage {
  publish(topicName: string, message: any): void;
}

export interface OnInitMqttService {
  init(deviceId: string, connectionId: string): void;
}

@Injectable({
  providedIn: 'root'
})
export class AwsIotCoreService implements OnDestroy {
  private subscriptionsMap = new Map<string, Subscription>();
  private subscriptionsCallback = new Map<string, any>();
  protected _subscribed: boolean;
  public clientId: string = uuidv4();
  private pubsub: PubSub;

  mqttStateConnection: ConnectionState;
  mqttStateConnection$ = new ReplaySubject<ConnectionState>(1);

  constructor(protected readonly store: Store) {
    console.log('Starting AWS IOT CORE service');
    this._subscribed = false;
    Hub.listen('pubsub', (data: any) => {
      const { payload } = data;
      if (payload.event === CONNECTION_STATE_CHANGE) {
        this.mqttStateConnection = payload.data
          .connectionState as ConnectionState;
        this.mqttStateConnection$.next(this.mqttStateConnection);
        if (this.mqttStateConnection === ConnectionState.Disconnected) {
          this._subscribed = false;
        }
        if (this.mqttStateConnection === ConnectionState.Connected) {
          this._subscribed = true;
        }
      }
    });

    this.connect().then(() => {
      this.mqttStateConnection$
        .pipe(
          filter((state) => state === ConnectionState.Disconnected),
          debounceTime(1500),
          delay(10000)
        )
        .subscribe(async () => {
          console.log('MQTT Status disconnected');
        });
    });
  }

  async isSignedIn(): Promise<boolean> {
    try {
      await getCurrentUser();
      return true;
    } catch (error) {
      return false;
    }
  }

  async ngOnDestroy(): Promise<void> {
    await this.unsubscribeAll();
    this.store.dispatch(new SetOperatorMqttStatus(false));
  }

  private async reconnect() {
    console.log('Trying AWS IOT CORE reconnection');
    await this.unsubscribeAll();
    await this.signOutFromCognito();
    await this.connect();
    await this.subscribeAll();
  }

  createPubsub() {
    try {
      this.pubsub = new PubSub({
        region: Constants.AWS_REGION,
        endpoint: Constants.AWS_IOT_CORE_ENDPOINT,
        clientId: this.clientId
      });
    } catch (e) {
      console.log('AwsIotCoreService::createPubsub(): Error:', e);
    }
  }

  subscribeTopic(topicName: string, cb: any): void {
    if (this.subscriptionsCallback.has(topicName)) {
      console.log(`Topic already ${topicName} subscribed`);
      this.unsubscribe(topicName);
    }
    const observable = this.pubsub.subscribe({ topics: topicName });
    this.subscriptionsCallback.set(topicName, cb);
    this.subscriptionsMap.set(topicName, this.registerCallBack(observable, cb));
  }

  private registerCallBack(observable: any, callback: any): Subscription {
    return observable.subscribe({
      next: (data) => {
        callback(data, null);
      },
      error: (error) => {
        console.error(error);
        callback(null, error);
      },
      complete: () => console.info('Subscription complete!')
    });
  }

  private async connect(): Promise<void> {
    try {
      const signedIn = await this.isSignedIn();
      this.createPubsub();
      if (!signedIn) {
        const { nextStep } = await signIn({ username: 'wiltonof@gmail.com', password: '1q2w#E4r5t' });
        if (nextStep.signInStep === 'CONFIRM_SIGN_IN_WITH_NEW_PASSWORD_REQUIRED') {
          await confirmSignIn({
            challengeResponse: '1q2w#E4r5t',
            options: {
              clientMetadata: {
                name: 'Wilton Ferreira',
                profile: 'ADMINISTRATOR'
              }
            },
          });
        }
      }

      const authSession = await fetchAuthSession();
      if (!authSession.credentials) {
        throw new Error('Could not get current AWS Amplify credentials');
      }

      const iot = new AWS.Iot({
        region: 'us-east-1',
        credentials: authSession.credentials
      });


      const policyName = 'rpi-robo-01-Policy';
      const target = authSession.identityId;

      const { policies } = await iot.listAttachedPolicies({ target }).promise();

      if (!policies.find((policy) => policy.policyName === policyName)) {
        await iot.attachPolicy({ policyName, target }).promise();
      }
    } catch (e) {
      console.error('Error in iot core connection.', e);
      this.store.dispatch(new SetOperatorMqttStatus(false));
    }
  }

  private async signOutFromCognito() {
    try {
      await signOut();
    } catch (error) {
      console.log('error signing out: ', error);
    }
  }

  async subscribeAll(): Promise<void> {
    for (const value of this.subscriptionsMap.keys()) {
      this.registerCallBack(
        this.subscriptionsMap.get(value),
        this.subscriptionsCallback.get(value)
      );
    }
  }

  async unsubscribeAll(): Promise<void> {
    for (const value of this.subscriptionsMap.keys()) {
      this.unsubscribe(value);
    }
  }

  async unsubscribeTopics(topics: string[]): Promise<void> {
    topics.forEach((topic) => this.unsubscribe(topic));
  }

  unsubscribe(topicName: string) {
    try {
      if (this.subscriptionsMap.has(topicName)) {
        this.subscriptionsMap.get(topicName).unsubscribe();
        this.subscriptionsMap.delete(topicName);
        this.subscriptionsCallback.delete(topicName);
      }
    } catch (e) {
      console.error(`Error: failed to unsubscribe topic ${topicName}!`, e);
    }
  }

  async publish(topicName: string, message: any): Promise<void> {
    try {
      await this.pubsub.publish({ topics: topicName, message });
    } catch (e) {
      console.error('Failed to publish MQTT message.', e);
    }
  }

  getSubscribedTopics(): string[] {
    return [...this.subscriptionsCallback.keys()];
  }
}
