From d510b07aededd59443e877c4e7c7b6e2b9822dfe Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Fri, 26 Aug 2022 17:19:36 +0200 Subject: refactor(frontend): custom mutex implementation Lets us track priorities of tasks without cancellation. --- subprojects/frontend/src/utils/CancelledError.ts | 5 ++ subprojects/frontend/src/utils/PendingTask.ts | 9 ++- subprojects/frontend/src/utils/PriorityMutex.ts | 69 ++++++++++++++++++++++ subprojects/frontend/src/utils/TimeoutError.ts | 5 ++ subprojects/frontend/src/xtext/UpdateService.ts | 23 ++++---- .../frontend/src/xtext/UpdateStateTracker.ts | 48 +++++---------- 6 files changed, 111 insertions(+), 48 deletions(-) create mode 100644 subprojects/frontend/src/utils/CancelledError.ts create mode 100644 subprojects/frontend/src/utils/PriorityMutex.ts create mode 100644 subprojects/frontend/src/utils/TimeoutError.ts (limited to 'subprojects/frontend/src') diff --git a/subprojects/frontend/src/utils/CancelledError.ts b/subprojects/frontend/src/utils/CancelledError.ts new file mode 100644 index 00000000..8d3e55d8 --- /dev/null +++ b/subprojects/frontend/src/utils/CancelledError.ts @@ -0,0 +1,5 @@ +export default class CancelledError extends Error { + constructor() { + super('Operation cancelled'); + } +} diff --git a/subprojects/frontend/src/utils/PendingTask.ts b/subprojects/frontend/src/utils/PendingTask.ts index 205c8452..d0b24c1f 100644 --- a/subprojects/frontend/src/utils/PendingTask.ts +++ b/subprojects/frontend/src/utils/PendingTask.ts @@ -1,3 +1,4 @@ +import TimeoutError from './TimeoutError'; import getLogger from './getLogger'; const log = getLogger('utils.PendingTask'); @@ -15,16 +16,14 @@ export default class PendingTask { resolveCallback: (value: T) => void, rejectCallback: (reason?: unknown) => void, timeoutMs: number | undefined, - timeoutCallback: () => void | undefined, + timeoutCallback?: (() => void) | undefined, ) { this.resolveCallback = resolveCallback; this.rejectCallback = rejectCallback; this.timeout = setTimeout(() => { if (!this.resolved) { - this.reject(new Error('Request timed out')); - if (timeoutCallback) { - timeoutCallback(); - } + this.reject(new TimeoutError()); + timeoutCallback?.(); } }, timeoutMs); } diff --git a/subprojects/frontend/src/utils/PriorityMutex.ts b/subprojects/frontend/src/utils/PriorityMutex.ts new file mode 100644 index 00000000..78736141 --- /dev/null +++ b/subprojects/frontend/src/utils/PriorityMutex.ts @@ -0,0 +1,69 @@ +import CancelledError from './CancelledError'; +import PendingTask from './PendingTask'; +import getLogger from './getLogger'; + +const log = getLogger('utils.PriorityMutex'); + +export default class PriorityMutex { + private readonly lowPriorityQueue: PendingTask[] = []; + + private readonly highPriorityQueue: PendingTask[] = []; + + private _locked = false; + + constructor(private readonly timeout: number) {} + + get locked(): boolean { + return this._locked; + } + + async runExclusive( + callback: () => Promise, + highPriority = false, + ): Promise { + await this.acquire(highPriority); + try { + return await callback(); + } finally { + this.release(); + } + } + + cancelAllWaiting(): void { + [this.highPriorityQueue, this.lowPriorityQueue].forEach((queue) => + queue.forEach((task) => task.reject(new CancelledError())), + ); + } + + private acquire(highPriority: boolean): Promise { + if (!this.locked) { + this._locked = true; + return Promise.resolve(); + } + const queue = highPriority ? this.highPriorityQueue : this.lowPriorityQueue; + return new Promise((resolve, reject) => { + const task = new PendingTask(resolve, reject, this.timeout, () => { + const index = queue.indexOf(task); + if (index < 0) { + log.error('Timed out task already removed from queue'); + return; + } + queue.splice(index, 1); + }); + queue.push(task); + }); + } + + private release(): void { + if (!this.locked) { + throw new Error('Trying to release already released mutext'); + } + const task = + this.highPriorityQueue.shift() ?? this.lowPriorityQueue.shift(); + if (task === undefined) { + this._locked = false; + return; + } + task.resolve(); + } +} diff --git a/subprojects/frontend/src/utils/TimeoutError.ts b/subprojects/frontend/src/utils/TimeoutError.ts new file mode 100644 index 00000000..eb800f40 --- /dev/null +++ b/subprojects/frontend/src/utils/TimeoutError.ts @@ -0,0 +1,5 @@ +export default class TimeoutError extends Error { + constructor() { + super('Operation timed out'); + } +} diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts index d8782d90..f1abce52 100644 --- a/subprojects/frontend/src/xtext/UpdateService.ts +++ b/subprojects/frontend/src/xtext/UpdateService.ts @@ -1,9 +1,10 @@ import type { ChangeDesc, Transaction } from '@codemirror/state'; -import { E_CANCELED, E_TIMEOUT } from 'async-mutex'; import { debounce } from 'lodash-es'; import { nanoid } from 'nanoid'; import type EditorStore from '../editor/EditorStore'; +import CancelledError from '../utils/CancelledError'; +import TimeoutError from '../utils/TimeoutError'; import getLogger from '../utils/getLogger'; import UpdateStateTracker from './UpdateStateTracker'; @@ -66,7 +67,7 @@ export default class UpdateService { this.updateFullTextOrThrow().catch((error) => { // Let E_TIMEOUT errors propagate, since if the first update times out, // we can't use the connection. - if (error === E_CANCELED) { + if (error instanceof CancelledError) { // Content assist will perform a full-text update anyways. log.debug('Full text update cancelled'); return; @@ -87,7 +88,7 @@ export default class UpdateService { } if (!this.tracker.lockedForUpdate) { this.updateOrThrow().catch((error) => { - if (error === E_CANCELED || error === E_TIMEOUT) { + if (error instanceof CancelledError || error instanceof TimeoutError) { log.debug('Idle update cancelled'); return; } @@ -163,11 +164,15 @@ export default class UpdateService { return this.fetchContentAssistFetchOnly(params, this.xtextStateId); } try { - return await this.tracker.runExclusiveHighPriority(() => - this.fetchContentAssistExclusive(params, signal), + return await this.tracker.runExclusive( + () => this.fetchContentAssistExclusive(params, signal), + true, ); } catch (error) { - if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { + if ( + (error instanceof CancelledError || error instanceof TimeoutError) && + signal.aborted + ) { return []; } throw error; @@ -261,9 +266,7 @@ export default class UpdateService { } formatText(): Promise { - return this.tracker.runExclusiveWithRetries(() => - this.formatTextExclusive(), - ); + return this.tracker.runExclusive(() => this.formatTextExclusive()); } private async formatTextExclusive(): Promise { @@ -294,7 +297,7 @@ export default class UpdateService { try { await this.updateOrThrow(); } catch (error) { - if (error === E_CANCELED || error === E_TIMEOUT) { + if (error instanceof CancelledError || error instanceof TimeoutError) { return { cancelled: true }; } throw error; diff --git a/subprojects/frontend/src/xtext/UpdateStateTracker.ts b/subprojects/frontend/src/xtext/UpdateStateTracker.ts index a529f9a0..5d4ce49e 100644 --- a/subprojects/frontend/src/xtext/UpdateStateTracker.ts +++ b/subprojects/frontend/src/xtext/UpdateStateTracker.ts @@ -5,9 +5,9 @@ import { StateEffect, type Transaction, } from '@codemirror/state'; -import { E_CANCELED, Mutex, withTimeout } from 'async-mutex'; import type EditorStore from '../editor/EditorStore'; +import PriorityMutex from '../utils/PriorityMutex'; const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; @@ -31,7 +31,7 @@ export interface Delta { } export default class UpdateStateTracker { - xtextStateId: string | undefined; + private _xtextStateId: string | undefined; /** * The changes marked for synchronization to the server if a full or delta text update @@ -54,12 +54,16 @@ export default class UpdateStateTracker { /** * Locked when we try to modify the state on the server. */ - private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); + private readonly mutex = new PriorityMutex(WAIT_FOR_UPDATE_TIMEOUT_MS); constructor(private readonly store: EditorStore) { this.dirtyChanges = this.newEmptyChangeSet(); } + get xtextStateId(): string | undefined { + return this._xtextStateId; + } + private get hasDirtyChanges(): boolean { return !this.dirtyChanges.empty; } @@ -69,7 +73,7 @@ export default class UpdateStateTracker { } get lockedForUpdate(): boolean { - return this.mutex.isLocked(); + return this.mutex.locked; } get hasPendingChanges(): boolean { @@ -111,7 +115,7 @@ export default class UpdateStateTracker { } invalidateStateId(): void { - this.xtextStateId = undefined; + this._xtextStateId = undefined; } /** @@ -180,7 +184,7 @@ export default class UpdateStateTracker { if (remoteChanges !== undefined) { this.applyRemoteChangesExclusive(remoteChanges); } - this.xtextStateId = newStateId; + this._xtextStateId = newStateId; this.pendingChanges = undefined; } @@ -205,7 +209,10 @@ export default class UpdateStateTracker { } } - runExclusive(callback: () => Promise): Promise { + runExclusive( + callback: () => Promise, + highPriority = false, + ): Promise { return this.mutex.runExclusive(async () => { try { return await callback(); @@ -215,31 +222,6 @@ export default class UpdateStateTracker { this.pendingChanges = undefined; } } - }); - } - - runExclusiveHighPriority(callback: () => Promise): Promise { - this.mutex.cancel(); - return this.runExclusive(callback); - } - - async runExclusiveWithRetries( - callback: () => Promise, - maxRetries = 5, - ): Promise { - let retries = 0; - while (retries < maxRetries) { - try { - // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. - return await this.runExclusive(callback); - } catch (error) { - // Let timeout errors propagate to give up retrying on a flaky connection. - if (error !== E_CANCELED) { - throw error; - } - retries += 1; - } - } - throw E_CANCELED; + }, highPriority); } } -- cgit v1.2.3-54-g00ecf