import type {Dropbox} from '@dropbox/api-v2-client';

import type {FileUpload} from './types';
import {
  BLOCK_SIZE,
  MAX_PARALLEL_BLOCK_UPLOAD_REQUESTS,
  MAX_PARALLEL_DIRECT_UPLOADS,
  UploadErrorType,
} from './types';
import {dropboxSdk} from '../client';
import {ReplayError, ReplayErrorCategory, reportException} from '../error_reporting';
import {Mutex} from '../mutex';
import {promiseQueue} from '../promises';
import {chunks} from '../streams';
import withRetries from '../with_retries';

// Control how many 4MB chunks can be uploading at once
const chunkQueue = promiseQueue({limit: MAX_PARALLEL_BLOCK_UPLOAD_REQUESTS, work: appendChunk});

// Control how many files are uploading chunks at a given time
const uploadQueue = promiseQueue({limit: 2, work: uploadFileData});

// Control how many files can be in an upload session (starting and finishing an
// upload take some time and don't suck up bandwidth, so we increase parallelism
// at this level)
const sessionQueue = promiseQueue({limit: MAX_PARALLEL_DIRECT_UPLOADS, work: uploadFile});

async function uploadFile({
  upload,
  onProgress,
  throwIfAborted,
}: {
  upload: FileUpload;
  throwIfAborted: () => void;
} & Pick<UploadParameters, 'onProgress'>) {
  throwIfAborted();
  const client = dropboxSdk.withPathRoot(upload.nsId);
  const startSession = withRetries(
    () =>
      client.filesUploadSessionStart({
        contents: '',
        session_type: {'.tag': 'concurrent'},
      }),
    3,
  );
  const session = await startSession();
  const sessionId = session.result.session_id;
  throwIfAborted();

  // Wait for our turn to start uploading
  const file = streamableFile(upload);
  const uploadSlot = await uploadQueue.take();
  await uploadSlot.run({
    client,
    sessionId,
    onProgress: (updatedBytesUploaded) =>
      onProgress(
        upload.uploadId,
        99 * Math.min(1, updatedBytesUploaded / file.size),
        updatedBytesUploaded,
      ),
    file,
    throwIfAborted,
  });

  throwIfAborted();
  const finishSession = withRetries(
    () =>
      client.filesUploadSessionFinish({
        cursor: {
          session_id: sessionId,
          offset: file.size,
        },
        commit: {
          path: `ns:${file.nsId}/${file.name}`, // name is ok as we get rename automatically
          autorename: true,
          mute: true,
          strict_conflict: false,
          mode: {'.tag': 'add'},
        },
      }),
    3,
  );

  return await finishSession();
}

async function appendChunk(
  {
    chunk,
    sessionId,
    client,
    throwIfAborted,
  }: {
    chunk: {buffer: ArrayBuffer; offset: number};
    client: Dropbox;
    sessionId: string;
    throwIfAborted: () => void;
  },
  tls?: Partial<{buffer: ArrayBuffer}>,
) {
  const backingBuffer = new Uint8Array(chunk.buffer);
  const contentLength = chunk.buffer.byteLength;

  // Instead of allocating new memory for every append, we leverage the existing
  // Task Local Storage (tls), and copy to that.
  const contents = tls ? (tls.buffer ??= new ArrayBuffer(BLOCK_SIZE)) : new ArrayBuffer(BLOCK_SIZE);

  // We must make a copy of the chunk's contents as every file read reuses the
  // provided buffer.
  new Uint8Array(contents, 0, contentLength).set(backingBuffer);

  throwIfAborted();
  const append = withRetries(
    () =>
      client.filesUploadSessionAppendV2({
        contents: contents.slice(0, contentLength),
        cursor: {
          session_id: sessionId,
          offset: chunk.offset,
        },
        close: chunk.buffer.byteLength < BLOCK_SIZE,
      }),
    3,
  );
  return await append();
}

type StreamableFile = {
  nsId: number;
  name: string;
  size: number;
  newStream: () => Promise<ReadableStreamBYOBReader> | ReadableStreamBYOBReader;
};

export function streamableFile(file: FileUpload): StreamableFile {
  const s = () =>
    'newStream' in file.file
      ? file.file.newStream().then((x) => x.getReader({mode: 'byob'}))
      : file.file.stream().getReader({mode: 'byob'});
  return {
    nsId: file.nsId,
    name: file.file.name,
    size: file.file.size,
    newStream: s,
  };
}

export class UploadCancelledError extends Error {
  constructor() {
    super('upload cancelled');
  }
}

export async function uploadFileData({
  file,
  sessionId,
  onProgress,
  client,
  throwIfAborted: throwIfCancelled,
}: {
  file: StreamableFile;
  sessionId: string;
  onProgress?: (updatedBytesUploaded: number) => any;
  client: Dropbox;
  throwIfAborted: () => void;
}) {
  const failure = new AbortController();
  const tasks: Promise<unknown>[] = [];

  // A poor man's way to combine two signals when cancellation isn't a signal.
  const throwIfAborted = () => {
    if (failure.signal.aborted) {
      throw new failure.signal.reason();
    }
    throwIfCancelled();
  };

  throwIfAborted();
  const stream = await file.newStream();
  let uploaded = 0;
  for await (const chunk of chunks(stream, {signal: failure.signal})) {
    const bytes = chunk.buffer.byteLength;
    throwIfAborted();
    const slot = await chunkQueue.take();
    const task = slot.run({chunk, sessionId, client, throwIfAborted}).then(
      () => {
        if (!failure.signal.aborted) {
          uploaded += bytes;
          onProgress?.(uploaded);
        }
      },
      (e) => failure.abort(e),
    );
    tasks.push(task);
  }

  throwIfAborted();
  await Promise.all(tasks);
}

type UploadParameters = {
  kind: 'streaming' | 'synchronous';
  uploads: FileUpload[];
  isUploadCanceled: (uploadId: string) => boolean;
  onStart: (uploadId: string) => void;
  onProgress: (uploadId: string, percentage: number, uploadedSize: number) => void;
  onComplete: (uploadId: string, fileId: string) => void;
  onError: (uploadId: string, errorType: UploadErrorType) => void;
};

const parallelFileUpload = async ({
  kind,
  isUploadCanceled,
  uploads,
  onError,
  onStart,
  onProgress,
  onComplete,
}: UploadParameters) => {
  if (kind === 'synchronous') {
    return parallelFileUploadSynchronous({
      kind,
      isUploadCanceled,
      uploads,
      onError,
      onStart,
      onProgress,
      onComplete,
    });
  }

  // Loop through all uploads batched in groups of MAX_PARALLEL_DIRECT_UPLOADS.
  uploads.sort(({uploadId: uploadIdA}, {uploadId: uploadIdB}) => (uploadIdA > uploadIdB ? 1 : -1));

  const tasks = [];
  for (const upload of uploads) {
    const throwIfAborted = () => {
      if (isUploadCanceled(upload.uploadId)) {
        throw new UploadCancelledError();
      }
    };

    const slot = await sessionQueue.take();
    const task = slot
      .run({
        upload,
        onProgress,
        throwIfAborted,
      })
      .then(
        (result) => {
          onComplete(upload.uploadId, result.result.id);
        },
        (ex) => {
          if (!(ex instanceof UploadCancelledError)) {
            reportException(
              new ReplayError({
                severity: 'non-critical',
                category: ReplayErrorCategory.Upload,
                error: ex,
              }),
            );
            onError(upload.uploadId, UploadErrorType.UploadFailed);
          }
        },
      );

    onStart(upload.uploadId);
    tasks.push(task);
  }

  await Promise.all(tasks);

  return true;
};

enum QueueChunkStatus {
  Init,
  Active,
  Canceled,
  Done,
}

type QueueChunk = {
  client: Dropbox;
  sessionId: string;
  upload: FileUpload;
  offset: number;
  size: number;
  status: QueueChunkStatus;
};

const parallelFileUploadSynchronous = async ({
  isUploadCanceled,
  uploads,
  onError,
  onStart,
  onProgress,
  onComplete,
}: UploadParameters) => {
  const uploadQueue: QueueChunk[] = [];
  const bytesUploaded: Record<string, number> = {};
  const byteLock = new Mutex();

  const startUpload = async (upload: FileUpload) => {
    const client = dropboxSdk.withPathRoot(upload.nsId);
    const {uploadId, file} = upload;
    let sessionId = '';

    if (file.size === 0) {
      onError(uploadId, UploadErrorType.FileEmpty);
      return;
    }

    onStart(uploadId);

    // Create upload session for file
    try {
      const session = await withRetries(
        client.filesUploadSessionStart.bind(client),
        3,
      )({
        contents: '',
        session_type: {'.tag': 'concurrent'},
      });

      sessionId = session.result.session_id;
    } catch (err) {
      reportException(
        new ReplayError({
          severity: 'non-critical',
          category: ReplayErrorCategory.Upload,
          error: err,
        }),
      );
      onError(uploadId, UploadErrorType.UploadFailed);
      return;
    }

    const numberOfBlocks = Math.ceil(file.size / BLOCK_SIZE);
    const newChunks = Array.from({length: numberOfBlocks}, (_, i) => ({
      client,
      sessionId,
      upload,
      offset: i * BLOCK_SIZE,
      size: BLOCK_SIZE,
      status: QueueChunkStatus.Init,
    }));

    uploadQueue.push(...newChunks);
  };

  const finalizeUpload = async ({
    client,
    sessionId,
    upload: {uploadId, file, nsId},
  }: QueueChunk) => {
    // Commit session
    try {
      const result = await withRetries(
        client.filesUploadSessionFinish.bind(client),
        3,
      )({
        cursor: {
          session_id: sessionId,
          offset: file.size,
        },
        commit: {
          path: `ns:${nsId}/${file.name}`, // name is ok as we get rename automatically
          autorename: true,
          mute: true,
          strict_conflict: false,
          mode: {'.tag': 'add'},
        },
      });

      onComplete(uploadId, result.result.id);
    } catch (err) {
      reportException(
        new ReplayError({
          severity: 'non-critical',
          category: ReplayErrorCategory.Upload,
          error: err,
        }),
      );
      onError(uploadId, UploadErrorType.UploadFailed);
    }
  };

  const uploadChunk = async (chunk: QueueChunk) => {
    const {file} = chunk.upload;
    if (!('slice' in file)) {
      throw new Error('Synchronous uploader requires a slice');
    }

    const contents = file.slice(chunk.offset, Math.min(file.size, chunk.offset + chunk.size));
    const shouldClose = chunk.offset + chunk.size >= file.size;
    await withRetries(
      chunk.client.filesUploadSessionAppendV2.bind(chunk.client),
      3,
      true,
    )({
      contents,
      cursor: {
        session_id: chunk.sessionId,
        offset: chunk.offset,
      },
      close: shouldClose,
    });
  };

  const queueWorker = async () => {
    while (uploadQueue.some(({status}) => status === QueueChunkStatus.Init)) {
      const itemIndex = uploadQueue.findIndex((chunk) => chunk.status === QueueChunkStatus.Init);
      const item = uploadQueue[itemIndex];

      if (!item || item.status === QueueChunkStatus.Canceled) continue;

      const {uploadId, file} = item.upload;

      if (isUploadCanceled(uploadId)) {
        // Cancel all other chunks for this file
        uploadQueue.forEach(({upload}, index) => {
          if (upload.uploadId === uploadId) {
            uploadQueue[index].status = QueueChunkStatus.Canceled;
          }
        });
        continue;
      }

      uploadQueue[itemIndex].status = QueueChunkStatus.Active;

      try {
        await uploadChunk(item);
      } catch (err) {
        reportException(
          new ReplayError({
            severity: 'non-critical',
            category: ReplayErrorCategory.Upload,
            error: err,
          }),
        );
        onError(uploadId, UploadErrorType.UploadFailed);

        // Cancel all other chunks for this file
        uploadQueue.forEach(({upload}, index) => {
          if (upload.uploadId === uploadId) {
            uploadQueue[index].status = QueueChunkStatus.Canceled;
          }
        });

        continue;
      }

      uploadQueue[itemIndex].status = QueueChunkStatus.Done;

      byteLock.dispatch(async () => {
        const updatedBytesUploaded = (bytesUploaded[uploadId] || 0) + item.size;
        bytesUploaded[uploadId] = updatedBytesUploaded;

        if (isUploadCanceled(uploadId)) {
          // Cancel all other chunks for this file
          uploadQueue.forEach(({upload}, index) => {
            if (upload.uploadId === uploadId) {
              uploadQueue[index].status = QueueChunkStatus.Canceled;
            }
          });
        } else {
          // Setting max to 99 shows the file as not quite uploaded until the finish request returns
          onProgress(
            uploadId,
            99 * Math.min(1, updatedBytesUploaded / file.size),
            updatedBytesUploaded,
          );
        }
      });

      // If every chunk with this items uploadId is done, then we should finalize the upload
      const fileCompletelyUploaded = uploadQueue.every(
        ({upload, status}) => upload.uploadId !== uploadId || status === QueueChunkStatus.Done,
      );

      if (fileCompletelyUploaded) {
        await finalizeUpload(item);
      }
    }
  };

  // Loop through all uploads batched in groups of MAX_PARALLEL_DIRECT_UPLOADS.
  uploads.sort(({uploadId: uploadIdA}, {uploadId: uploadIdB}) => (uploadIdA > uploadIdB ? 1 : -1));

  while (uploads.length > 0) {
    const currentUploads = uploads.splice(0, MAX_PARALLEL_DIRECT_UPLOADS);

    await Promise.all(currentUploads.map(startUpload));

    // Upload chunks in parallel
    const workers: Promise<void>[] = [];

    for (let i = 0; i < MAX_PARALLEL_BLOCK_UPLOAD_REQUESTS; i += 1) {
      workers.push(queueWorker());
    }

    await Promise.all(workers);
  }

  return true;
};

export default parallelFileUpload;
