blob: c1215c763af82fe8a78d65394870c19cbed0fd3b (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
/*
* SPDX-FileCopyrightText: 2021-2023 The Refinery Authors <https://refinery.tools/>
*
* SPDX-License-Identifier: EPL-2.0
*/
import CancelledError from './CancelledError';
import PendingTask from './PendingTask';
import getLogger from './getLogger';
const log = getLogger('utils.PriorityMutex');
export default class PriorityMutex {
private readonly lowPriorityQueue: PendingTask<void>[] = [];
private readonly highPriorityQueue: PendingTask<void>[] = [];
private _locked = false;
constructor(private readonly timeout: number) {}
get locked(): boolean {
return this._locked;
}
async runExclusive<T>(
callback: () => Promise<T>,
highPriority = false,
): Promise<T> {
await this.acquire(highPriority);
try {
return await callback();
} finally {
this.release();
}
}
cancelAllWaiting(): void {
[this.highPriorityQueue, this.lowPriorityQueue].forEach((queue) =>
queue.forEach((task) => task.reject(new CancelledError())),
);
}
private acquire(highPriority: boolean): Promise<void> {
if (!this.locked) {
this._locked = true;
return Promise.resolve();
}
const queue = highPriority ? this.highPriorityQueue : this.lowPriorityQueue;
return new Promise((resolve, reject) => {
const task = new PendingTask(resolve, reject, this.timeout, () => {
const index = queue.indexOf(task);
if (index < 0) {
log.error('Timed out task already removed from queue');
return;
}
queue.splice(index, 1);
});
queue.push(task);
});
}
private release(): void {
if (!this.locked) {
throw new Error('Trying to release already released mutext');
}
const task =
this.highPriorityQueue.shift() ?? this.lowPriorityQueue.shift();
if (task === undefined) {
this._locked = false;
return;
}
task.resolve();
}
}
|