import {check} from './checks';

interface RunnableSlot<T, ARGS> {
  run: (opts: ARGS) => Promise<T>;
}

/**
 * Creates a promise queue to coordinate running concurrent tasks.
 *
 * Important properties:
 *
 * - Concurrency limit: a max number of inlight promises
 * - FIFO: promises are executed in the order they are enqueued
 * - Each task executed in this promise has access to task local storage (TLS)
 *   that can be used to share allocations between tasks
 * - Backpressure: when the concurrency limit has been met, clients can await
 *   their turn
 */
export function promiseQueue<T, ARGS, TLS>({
  limit,
  work,
}: {
  limit: number;
  work: (opt: ARGS, tls?: Partial<TLS>) => Promise<T>;
}) {
  type Slot = {tls: Partial<TLS>};
  const idle: Slot[] = new Array(limit).fill(0).map(() => ({tls: {}}));

  // Queue of promises waiting their turn to start
  const queue: ((slot: Slot) => void)[] = [];

  // Slot the next task in the queue or start idling
  const startNext = (slot: Slot) => {
    const next = queue.shift();
    if (next !== undefined) {
      next(slot);
    } else {
      idle.push(slot);
    }
  };

  return {
    /**
     * Waits until there is space available to execute a promise, and then
     * returns a slot allows the caller to start their task.
     */
    async take(): Promise<RunnableSlot<T, ARGS>> {
      const slot = idle.pop() ?? (await new Promise<Slot>((res) => queue.push(res)));
      let attempts = 0;
      return {
        run: async (args: ARGS) => {
          try {
            check((attempts += 1) === 1, 'Reattempting run without queuing');
            return await work(args, slot.tls);
          } finally {
            startNext(slot);
          }
        },
      };
    },
  };
}
