/*
* SPDX-FileCopyrightText: 2021-2023 The Refinery Authors
*
* SPDX-License-Identifier: EPL-2.0
*/
import CancelledError from './CancelledError';
import PendingTask from './PendingTask';
import getLogger from './getLogger';
const log = getLogger('utils.PriorityMutex');
export default class PriorityMutex {
private readonly lowPriorityQueue: PendingTask[] = [];
private readonly highPriorityQueue: PendingTask[] = [];
private _locked = false;
constructor(private readonly timeout: number) {}
get locked(): boolean {
return this._locked;
}
async runExclusive(
callback: () => Promise,
highPriority = false,
): Promise {
await this.acquire(highPriority);
try {
return await callback();
} finally {
this.release();
}
}
cancelAllWaiting(): void {
[this.highPriorityQueue, this.lowPriorityQueue].forEach((queue) =>
queue.forEach((task) => task.reject(new CancelledError())),
);
}
private acquire(highPriority: boolean): Promise {
if (!this.locked) {
this._locked = true;
return Promise.resolve();
}
const queue = highPriority ? this.highPriorityQueue : this.lowPriorityQueue;
return new Promise((resolve, reject) => {
const task = new PendingTask(resolve, reject, this.timeout, () => {
const index = queue.indexOf(task);
if (index < 0) {
log.error('Timed out task already removed from queue');
return;
}
queue.splice(index, 1);
});
queue.push(task);
});
}
private release(): void {
if (!this.locked) {
throw new Error('Trying to release already released mutext');
}
const task =
this.highPriorityQueue.shift() ?? this.lowPriorityQueue.shift();
if (task === undefined) {
this._locked = false;
return;
}
task.resolve();
}
}