import { Injectable } from '@angular/core';
import { PubSub } from '@aws-amplify/pubsub';
import { fetchAuthSession } from 'aws-amplify/auth';
import { defer, firstValueFrom, Observable, throwError, timer } from 'rxjs';
import { IoTDataPlaneClient, GetThingShadowCommand } from '@aws-sdk/client-iot-data-plane';
import { toUtf8 } from '@aws-sdk/util-utf8-browser';
import { finalize, mergeMap, retryWhen } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
import { environment } from 'src/environments/environment';

export type DeviceShadow<D> = {
  metadata: any;
  state: {
    reported: D;
    desired?: D;
    delta?: D;
  };
  timestamp: number;
  version: number;
  clientToken?: string;
};

export const genericRetryStrategy =
  ({
    maxRetryAttempts = undefined,
    timeout = 1000,
    excludedStatusCodes = [],
  }: {
    maxRetryAttempts?: number;
    timeout?: number;
    excludedStatusCodes?: number[];
  } = {}) =>
  (attempts: Observable<any>) =>
    attempts.pipe(
      mergeMap((error, i) => {
        console.debug(`Attempt ${i}:`, error);
        // if maximum number of retries have been met
        // or response is a status code we don't wish to retry, throw error
        if ((maxRetryAttempts && i >= maxRetryAttempts) || excludedStatusCodes.find((e) => e === error.status)) {
          return throwError(() => error);
        }

        return timer(timeout);
      }),
      finalize(() => console.debug('Retry done!')),
    );

@Injectable({
  providedIn: 'root',
})
export class IoTClientService {
  public readonly isEnabled: boolean;

  private pubsub: PubSub;

  constructor(private http: HttpClient) {
    this.isEnabled = 'pubsub' in environment.awsConfig;
  }

  public async initialize(): Promise<void> {
    if (!this.isEnabled) return;

    const config = environment.awsConfig.pubsub;
    if (!config) {
      return;
    }

    this.pubsub = new PubSub({ region: config.region, endpoint: `wss://${config.endpoint}/mqtt` });
  }

  /**
   * Subscribe to topics and execute callback on message
   *
   * @param topics topics list to subscribe on
   * @param onMessage callback for processing message
   * @param onError optional error callback (e.g. network disconnect). Is called immediatly (with original error) or after all retries failed (with last retry failed error).
   * @param onRetry optional retry on error callback, can be used for e.g. automatic restart
   * @param maxRetryAttempts optional number of retries (e.g. network connection is still not awailable). Default is 'undefined' meaning retry forever. Set to <=0 for no retry.
   */
  public subscribe({
    topics,
    onMessage,
    onError,
    onRetry,
    maxRetryAttempts,
  }: {
    topics: string[];
    onMessage: (data: any) => void | Promise<void>;
    onError?: (err?: any) => void | Promise<void>;
    onRetry?: (err?: any) => void | Promise<void>;
    maxRetryAttempts?: number;
  }) {
    // subscribe to updates and push to subject
    const obs = this.pubsub.subscribe({ topics });
    const sub = obs.subscribe({
      next: (data: any) => {
        console.debug('Next:', data);
        onMessage(data);
      },
      error: (error: any) => {
        console.warn(error);
        sub.unsubscribe();

        if (onRetry && maxRetryAttempts && maxRetryAttempts > 0) {
          defer(async () => onRetry(error))
            .pipe(retryWhen(genericRetryStrategy({ maxRetryAttempts })))
            .subscribe({
              error: (lastRetryError) => {
                if (onError) onError(lastRetryError);
              },
            });
        } else {
          if (onError) onError(error);
        }
      },
      complete: () => console.debug('Completed:', topics),
    });
    console.debug('Subscribed:', topics);

    return sub;
  }

  public async getShadowViaREST<T>(thingName: string): Promise<DeviceShadow<T> | null> {
    const { credentials } = await fetchAuthSession();
    const config = environment.awsConfig.pubsub;

    if (!config || !credentials) {
      return null;
    }
    try {
      const client = new IoTDataPlaneClient({
        region: config.region,
        endpoint: `https://${config.endpoint}`,
        serviceId: 'iotdevicegateway',
        credentials: {
          accessKeyId: credentials.accessKeyId,
          secretAccessKey: credentials.secretAccessKey,
          sessionToken: credentials.sessionToken,
        },
      });
      const command = new GetThingShadowCommand({ thingName });
      const response = await client.send(command);
      const payload = response.payload ? JSON.parse(toUtf8(response.payload)) : undefined;
      console.debug('Shadow REST get: ', payload);

      return payload;
    } catch (err) {
      console.error(err);

      return null;
    }
  }

  public async attachIoTPolicy(identityId: string) {
    try {
      firstValueFrom(
        this.http.post(`${environment.apiUrl}/${environment.apiVersion}/attach_iot_policy/`, {
          policy_name: 'terabase-webapp-policy',
          identity_id: identityId,
        }),
      );
      console.debug('IoT policy attached.');
    } catch (err) {
      console.error('Attach IoT policy failed!', err);
    }
  }
}
