diff options
author | Kristóf Marussy <kristof@marussy.com> | 2022-08-26 17:19:36 +0200 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2022-08-26 17:45:20 +0200 |
commit | d510b07aededd59443e877c4e7c7b6e2b9822dfe (patch) | |
tree | 64e4675ac200d4d85f5818472acd908666758969 /subprojects/frontend/src/utils | |
parent | refactor(frontend): simplify UpdateService further (diff) | |
download | refinery-d510b07aededd59443e877c4e7c7b6e2b9822dfe.tar.gz refinery-d510b07aededd59443e877c4e7c7b6e2b9822dfe.tar.zst refinery-d510b07aededd59443e877c4e7c7b6e2b9822dfe.zip |
refactor(frontend): custom mutex implementation
Lets us track priorities of tasks without cancellation.
Diffstat (limited to 'subprojects/frontend/src/utils')
-rw-r--r-- | subprojects/frontend/src/utils/CancelledError.ts | 5 | ||||
-rw-r--r-- | subprojects/frontend/src/utils/PendingTask.ts | 9 | ||||
-rw-r--r-- | subprojects/frontend/src/utils/PriorityMutex.ts | 69 | ||||
-rw-r--r-- | subprojects/frontend/src/utils/TimeoutError.ts | 5 |
4 files changed, 83 insertions, 5 deletions
diff --git a/subprojects/frontend/src/utils/CancelledError.ts b/subprojects/frontend/src/utils/CancelledError.ts new file mode 100644 index 00000000..8d3e55d8 --- /dev/null +++ b/subprojects/frontend/src/utils/CancelledError.ts | |||
@@ -0,0 +1,5 @@ | |||
1 | export default class CancelledError extends Error { | ||
2 | constructor() { | ||
3 | super('Operation cancelled'); | ||
4 | } | ||
5 | } | ||
diff --git a/subprojects/frontend/src/utils/PendingTask.ts b/subprojects/frontend/src/utils/PendingTask.ts index 205c8452..d0b24c1f 100644 --- a/subprojects/frontend/src/utils/PendingTask.ts +++ b/subprojects/frontend/src/utils/PendingTask.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | import TimeoutError from './TimeoutError'; | ||
1 | import getLogger from './getLogger'; | 2 | import getLogger from './getLogger'; |
2 | 3 | ||
3 | const log = getLogger('utils.PendingTask'); | 4 | const log = getLogger('utils.PendingTask'); |
@@ -15,16 +16,14 @@ export default class PendingTask<T> { | |||
15 | resolveCallback: (value: T) => void, | 16 | resolveCallback: (value: T) => void, |
16 | rejectCallback: (reason?: unknown) => void, | 17 | rejectCallback: (reason?: unknown) => void, |
17 | timeoutMs: number | undefined, | 18 | timeoutMs: number | undefined, |
18 | timeoutCallback: () => void | undefined, | 19 | timeoutCallback?: (() => void) | undefined, |
19 | ) { | 20 | ) { |
20 | this.resolveCallback = resolveCallback; | 21 | this.resolveCallback = resolveCallback; |
21 | this.rejectCallback = rejectCallback; | 22 | this.rejectCallback = rejectCallback; |
22 | this.timeout = setTimeout(() => { | 23 | this.timeout = setTimeout(() => { |
23 | if (!this.resolved) { | 24 | if (!this.resolved) { |
24 | this.reject(new Error('Request timed out')); | 25 | this.reject(new TimeoutError()); |
25 | if (timeoutCallback) { | 26 | timeoutCallback?.(); |
26 | timeoutCallback(); | ||
27 | } | ||
28 | } | 27 | } |
29 | }, timeoutMs); | 28 | }, timeoutMs); |
30 | } | 29 | } |
diff --git a/subprojects/frontend/src/utils/PriorityMutex.ts b/subprojects/frontend/src/utils/PriorityMutex.ts new file mode 100644 index 00000000..78736141 --- /dev/null +++ b/subprojects/frontend/src/utils/PriorityMutex.ts | |||
@@ -0,0 +1,69 @@ | |||
1 | import CancelledError from './CancelledError'; | ||
2 | import PendingTask from './PendingTask'; | ||
3 | import getLogger from './getLogger'; | ||
4 | |||
5 | const log = getLogger('utils.PriorityMutex'); | ||
6 | |||
7 | export default class PriorityMutex { | ||
8 | private readonly lowPriorityQueue: PendingTask<void>[] = []; | ||
9 | |||
10 | private readonly highPriorityQueue: PendingTask<void>[] = []; | ||
11 | |||
12 | private _locked = false; | ||
13 | |||
14 | constructor(private readonly timeout: number) {} | ||
15 | |||
16 | get locked(): boolean { | ||
17 | return this._locked; | ||
18 | } | ||
19 | |||
20 | async runExclusive<T>( | ||
21 | callback: () => Promise<T>, | ||
22 | highPriority = false, | ||
23 | ): Promise<T> { | ||
24 | await this.acquire(highPriority); | ||
25 | try { | ||
26 | return await callback(); | ||
27 | } finally { | ||
28 | this.release(); | ||
29 | } | ||
30 | } | ||
31 | |||
32 | cancelAllWaiting(): void { | ||
33 | [this.highPriorityQueue, this.lowPriorityQueue].forEach((queue) => | ||
34 | queue.forEach((task) => task.reject(new CancelledError())), | ||
35 | ); | ||
36 | } | ||
37 | |||
38 | private acquire(highPriority: boolean): Promise<void> { | ||
39 | if (!this.locked) { | ||
40 | this._locked = true; | ||
41 | return Promise.resolve(); | ||
42 | } | ||
43 | const queue = highPriority ? this.highPriorityQueue : this.lowPriorityQueue; | ||
44 | return new Promise((resolve, reject) => { | ||
45 | const task = new PendingTask(resolve, reject, this.timeout, () => { | ||
46 | const index = queue.indexOf(task); | ||
47 | if (index < 0) { | ||
48 | log.error('Timed out task already removed from queue'); | ||
49 | return; | ||
50 | } | ||
51 | queue.splice(index, 1); | ||
52 | }); | ||
53 | queue.push(task); | ||
54 | }); | ||
55 | } | ||
56 | |||
57 | private release(): void { | ||
58 | if (!this.locked) { | ||
59 | throw new Error('Trying to release already released mutext'); | ||
60 | } | ||
61 | const task = | ||
62 | this.highPriorityQueue.shift() ?? this.lowPriorityQueue.shift(); | ||
63 | if (task === undefined) { | ||
64 | this._locked = false; | ||
65 | return; | ||
66 | } | ||
67 | task.resolve(); | ||
68 | } | ||
69 | } | ||
diff --git a/subprojects/frontend/src/utils/TimeoutError.ts b/subprojects/frontend/src/utils/TimeoutError.ts new file mode 100644 index 00000000..eb800f40 --- /dev/null +++ b/subprojects/frontend/src/utils/TimeoutError.ts | |||
@@ -0,0 +1,5 @@ | |||
1 | export default class TimeoutError extends Error { | ||
2 | constructor() { | ||
3 | super('Operation timed out'); | ||
4 | } | ||
5 | } | ||