import { AwsIotCoreService } from '../iot/aws-iot-core.service';
import { ConnectionState } from '@aws-amplify/pubsub';
import { debounceTime, filter, take } from 'rxjs/operators';

/**
 * Created by Wilton Oliveira Ferreira on 23/10/2022
 */

export abstract class BaseMqttService {
  protected _deviceId: string;
  protected _connectionId: string;

  protected constructor(
    protected readonly awsIotCoreService: AwsIotCoreService
  ) {}

  protected abstract subscribe(...params): void;

  async init(deviceId: string, connectionId: string = null): Promise<void> {
    this._deviceId = deviceId;
    this._connectionId = connectionId;
    this.subscribe();
  }

  subscribeTopic(topicName: string, callback: any): void {
    this.verifyTopicName(topicName);
    this.awsIotCoreService.mqttStateConnection$
      .pipe(
        filter((state: ConnectionState) => state === ConnectionState.Connected),
        debounceTime(200),
        take(1)
      )
      .subscribe(() => {
        this.awsIotCoreService.subscribeTopic(topicName, callback);
      });
  }

  private verifyTopicName(topicName: string): void {
    if (topicName.includes('undefined')) {
      console.warn(
        `Topic name constructed with undefined variable: ${topicName}.`
      );
    }
  }

  async publish(topicName: string, message: any): Promise<void> {
    this.verifyTopicName(topicName);
    try {
      await this.awsIotCoreService.publish(topicName, message);
    } catch (error) {
      console.error(
        'Failed to publish message in topic',
        topicName,
        'Message:',
        message
      );
    }
  }

  schedulePublish(topicName: string, message: any): void {
    this.verifyTopicName(topicName);
    this.awsIotCoreService.mqttStateConnection$
      .pipe(
        filter((state) => state === ConnectionState.Connected),
        debounceTime(200),
        take(1)
      )
      .subscribe(async () => await this.publish(topicName, message));
  }
}
