diff options
Diffstat (limited to 'subprojects/frontend/src/utils/PriorityMutex.ts')
-rw-r--r-- | subprojects/frontend/src/utils/PriorityMutex.ts | 69 |
1 files changed, 69 insertions, 0 deletions
diff --git a/subprojects/frontend/src/utils/PriorityMutex.ts b/subprojects/frontend/src/utils/PriorityMutex.ts new file mode 100644 index 00000000..78736141 --- /dev/null +++ b/subprojects/frontend/src/utils/PriorityMutex.ts | |||
@@ -0,0 +1,69 @@ | |||
1 | import CancelledError from './CancelledError'; | ||
2 | import PendingTask from './PendingTask'; | ||
3 | import getLogger from './getLogger'; | ||
4 | |||
5 | const log = getLogger('utils.PriorityMutex'); | ||
6 | |||
7 | export default class PriorityMutex { | ||
8 | private readonly lowPriorityQueue: PendingTask<void>[] = []; | ||
9 | |||
10 | private readonly highPriorityQueue: PendingTask<void>[] = []; | ||
11 | |||
12 | private _locked = false; | ||
13 | |||
14 | constructor(private readonly timeout: number) {} | ||
15 | |||
16 | get locked(): boolean { | ||
17 | return this._locked; | ||
18 | } | ||
19 | |||
20 | async runExclusive<T>( | ||
21 | callback: () => Promise<T>, | ||
22 | highPriority = false, | ||
23 | ): Promise<T> { | ||
24 | await this.acquire(highPriority); | ||
25 | try { | ||
26 | return await callback(); | ||
27 | } finally { | ||
28 | this.release(); | ||
29 | } | ||
30 | } | ||
31 | |||
32 | cancelAllWaiting(): void { | ||
33 | [this.highPriorityQueue, this.lowPriorityQueue].forEach((queue) => | ||
34 | queue.forEach((task) => task.reject(new CancelledError())), | ||
35 | ); | ||
36 | } | ||
37 | |||
38 | private acquire(highPriority: boolean): Promise<void> { | ||
39 | if (!this.locked) { | ||
40 | this._locked = true; | ||
41 | return Promise.resolve(); | ||
42 | } | ||
43 | const queue = highPriority ? this.highPriorityQueue : this.lowPriorityQueue; | ||
44 | return new Promise((resolve, reject) => { | ||
45 | const task = new PendingTask(resolve, reject, this.timeout, () => { | ||
46 | const index = queue.indexOf(task); | ||
47 | if (index < 0) { | ||
48 | log.error('Timed out task already removed from queue'); | ||
49 | return; | ||
50 | } | ||
51 | queue.splice(index, 1); | ||
52 | }); | ||
53 | queue.push(task); | ||
54 | }); | ||
55 | } | ||
56 | |||
57 | private release(): void { | ||
58 | if (!this.locked) { | ||
59 | throw new Error('Trying to release already released mutext'); | ||
60 | } | ||
61 | const task = | ||
62 | this.highPriorityQueue.shift() ?? this.lowPriorityQueue.shift(); | ||
63 | if (task === undefined) { | ||
64 | this._locked = false; | ||
65 | return; | ||
66 | } | ||
67 | task.resolve(); | ||
68 | } | ||
69 | } | ||