import { Amplify, Auth, Hub, PubSub } from 'aws-amplify';
import * as AWS from 'aws-sdk';
import {
  AWSIoTProvider,
  CONNECTION_STATE_CHANGE,
  ConnectionState
} from '@aws-amplify/pubsub';
import { Constants } from '../../../constants';
import { Injectable, OnDestroy } from '@angular/core';
import { ReplaySubject, Subscription } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { SetOperatorMqttStatus } from '../../../pages/device/device-control/store/actions/participants.actions';
import { Store } from '@ngxs/store';
import { debounceTime, delay, filter } from 'rxjs/operators';

export interface MqttSubscription {
  subscribeTopic(topic: string, callback: any): void;
}

export interface MqttSendMessage {
  publish(topicName: string, message: any): void;
}

export interface OnInitMqttService {
  init(deviceId: string, connectionId: string): void;
}

@Injectable({
  providedIn: 'root'
})
export class AwsIotCoreService implements OnDestroy {
  private subscriptionsMap = new Map<string, Subscription>();
  private subscriptionsCallback = new Map<string, any>();
  protected _subscribed: boolean;
  public clientId: string = uuidv4();

  mqttStateConnection: ConnectionState;
  mqttStateConnection$ = new ReplaySubject<ConnectionState>(1);

  constructor(protected readonly store: Store) {
    console.log('Starting AWS IOT CORE service');
    this._subscribed = false;
    Hub.listen('pubsub', (data: any) => {
      const { payload } = data;
      if (payload.event === CONNECTION_STATE_CHANGE) {
        this.mqttStateConnection = payload.data
          .connectionState as ConnectionState;
        this.mqttStateConnection$.next(this.mqttStateConnection);
        if (this.mqttStateConnection === ConnectionState.Disconnected) {
          this._subscribed = false;
        }
        if (this.mqttStateConnection === ConnectionState.Connected) {
          this._subscribed = true;
        }
      }
    });

    this.connect().then(() => {
      this.mqttStateConnection$
        .pipe(
          filter((state) => state === ConnectionState.Disconnected),
          debounceTime(1500),
          delay(10000)
        )
        .subscribe(async () => {
          console.log('MQTT Status disconnected');
        });
    });
  }

  async ngOnDestroy(): Promise<void> {
    await this.removePluggable();
    await this.unsubscribeAll();
    this.store.dispatch(new SetOperatorMqttStatus(false));
  }

  private async reconnect() {
    console.log('Trying AWS IOT CORE reconnection');
    await this.unsubscribeAll();
    await this.removePluggable();
    await this.signOutFromCognito();
    await this.addPluggable();
    await this.connect();
    await this.subscribeAll();
  }

  async addPluggable() {
    try {
      Amplify.addPluggable(
        new AWSIoTProvider({
          aws_pubsub_region: Constants.AWS_REGION,
          aws_pubsub_endpoint: Constants.AWS_IOT_CORE_ENDPOINT,
          clientId: this.clientId
        })
      );
    } catch (e) {
      console.log('AwsIotCoreService::addPluggable(): Error:', e);
    }
  }

  async removePluggable(): Promise<void> {
    console.log('Remove pluggable');
    await Amplify.PubSub.removePluggable('AWSIoTProvider');
  }

  subscribeTopic(topicName: string, cb: any): void {
    if (this.subscriptionsCallback.has(topicName)) {
      console.log(`Topic already ${topicName} subscribed`);
      this.unsubscribe(topicName);
    }
    const observable = PubSub.subscribe(topicName);
    this.subscriptionsCallback.set(topicName, cb);
    this.subscriptionsMap.set(topicName, this.registerCallBack(observable, cb));
  }

  private registerCallBack(observable: any, callback: any): Subscription {
    return observable.subscribe({
      next: (data) => {
        callback(data, null);
      },
      error: (error) => {
        console.error(error);
        callback(null, error);
      },
      complete: () => console.info('Subscription complete!')
    });
  }

  private async connect(): Promise<void> {
    try {
      await this.addPluggable();
      const data: any = await Auth.signIn('wiltonof@gmail.com', '1q2w#E4r5t');
      let user = null;
      if (data.challengeName === 'NEW_PASSWORD_REQUIRED') {
        const { requiredAttributes } = data.challengeParam;
        user = await Auth.completeNewPassword(data, '1q2w#E4r5t', {
          name: 'Wilton Ferreira',
          profile: 'ADMINISTRATOR'
        });
        console.log(user);
      }

      const credentials = await Auth.currentCredentials();
      if (!credentials) {
        throw new Error('Could not get current AWS Amplify credentials');
      }

      const iot = new AWS.Iot({
        region: 'us-east-1',
        credentials: Auth.essentialCredentials(credentials)
      });

      const policyName = 'rpi-robo-01-Policy';
      const target = credentials.identityId;

      const { policies } = await iot.listAttachedPolicies({ target }).promise();

      if (!policies.find((policy) => policy.policyName === policyName)) {
        await iot.attachPolicy({ policyName, target }).promise();
      }
    } catch (e) {
      console.error('Error in iot core connection.', e);
      this.store.dispatch(new SetOperatorMqttStatus(false));
    }
  }

  private async signOutFromCognito() {
    try {
      await Auth.signOut();
    } catch (error) {
      console.log('error signing out: ', error);
    }
  }

  async subscribeAll(): Promise<void> {
    for (const value of this.subscriptionsMap.keys()) {
      this.registerCallBack(
        this.subscriptionsMap.get(value),
        this.subscriptionsCallback.get(value)
      );
    }
  }

  async unsubscribeAll(): Promise<void> {
    for (const value of this.subscriptionsMap.keys()) {
      this.unsubscribe(value);
    }
  }

  async unsubscribeTopics(topics: string[]): Promise<void> {
    topics.forEach((topic) => this.unsubscribe(topic));
  }

  unsubscribe(topicName: string) {
    try {
      if (this.subscriptionsMap.has(topicName)) {
        this.subscriptionsMap.get(topicName).unsubscribe();
        this.subscriptionsMap.delete(topicName);
        this.subscriptionsCallback.delete(topicName);
      }
    } catch (e) {
      console.error(`Error: failed to unsubscribe topic ${topicName}!`, e);
    }
  }

  async publish(topicName: string, message: any): Promise<void> {
    try {
      await PubSub.publish(topicName, message);
    } catch (e) {
      console.error('Failed to publish MQTT message.', e);
    }
  }

  getSubscribedTopics(): string[] {
    return [...this.subscriptionsCallback.keys()];
  }
}
