From d774c3d2c4fc5948483438d8304af5baa6bb7a91 Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Thu, 25 Aug 2022 20:03:41 +0200 Subject: refactor(frontend): extract xtextStateId tracking --- subprojects/frontend/src/xtext/UpdateService.ts | 470 ++++++--------------- .../frontend/src/xtext/UpdateStateTracker.ts | 357 ++++++++++++++++ 2 files changed, 495 insertions(+), 332 deletions(-) create mode 100644 subprojects/frontend/src/xtext/UpdateStateTracker.ts diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts index 3b4ae259..94e01ca2 100644 --- a/subprojects/frontend/src/xtext/UpdateService.ts +++ b/subprojects/frontend/src/xtext/UpdateService.ts @@ -1,17 +1,16 @@ -import { - type ChangeDesc, - ChangeSet, - type ChangeSpec, - StateEffect, - type Transaction, -} from '@codemirror/state'; -import { E_CANCELED, E_TIMEOUT, Mutex, withTimeout } from 'async-mutex'; +import type { ChangeDesc, Transaction } from '@codemirror/state'; +import { E_CANCELED, E_TIMEOUT } from 'async-mutex'; import { debounce } from 'lodash-es'; import { nanoid } from 'nanoid'; import type EditorStore from '../editor/EditorStore'; import getLogger from '../utils/getLogger'; +import UpdateStateTracker, { + type LockedState, + type PendingUpdate, +} from './UpdateStateTracker'; +import type { StateUpdateResult, Delta } from './UpdateStateTracker'; import type XtextWebSocketClient from './XtextWebSocketClient'; import { type ContentAssistEntry, @@ -24,98 +23,51 @@ import { const UPDATE_TIMEOUT_MS = 500; -const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; - -const FORMAT_TEXT_RETRIES = 5; - const log = getLogger('xtext.UpdateService'); -/** - * State effect used to override the dirty changes after a transaction. - * - * If this state effect is _not_ present in a transaction, - * the transaction will be appended to the current dirty changes. - * - * If this state effect is present, the current dirty changes will be replaced - * by the value of this effect. - */ -const setDirtyChanges = StateEffect.define(); - export interface AbortSignal { aborted: boolean; } -export interface ContentAssistParams { - caretOffset: number; - - proposalsLimit: number; -} - export type CancellableResult = | { cancelled: false; data: T } | { cancelled: true }; -interface StateUpdateResult { - newStateId: string; - - data: T; -} - -interface Delta { - deltaOffset: number; - - deltaReplaceLength: number; +export interface ContentAssistParams { + caretOffset: number; - deltaText: string; + proposalsLimit: number; } export default class UpdateService { readonly resourceName: string; - xtextStateId: string | undefined; - - private readonly store: EditorStore; - - private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); - - /** - * The changes being synchronized to the server if a full or delta text update is running - * withing a `withUpdateExclusive` block, `undefined` otherwise. - * - * Must be `undefined` before and after entering the critical section of `mutex` - * and may only be changes in the critical section of `mutex`. - * - * Methods named with an `Exclusive` suffix in this class assume that the mutex is held - * and may call `withUpdateExclusive` or `doFallbackUpdateFullTextExclusive` - * to mutate this field. - * - * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` - * block and require this field to be non-`undefined`. - */ - private pendingUpdate: ChangeSet | undefined; - - /** - * Local changes not yet sychronized to the server and not part of the running update, if any. - */ - private dirtyChanges: ChangeSet; - - private readonly webSocketClient: XtextWebSocketClient; + private readonly tracker: UpdateStateTracker; private readonly idleUpdateLater = debounce( () => this.idleUpdate(), UPDATE_TIMEOUT_MS, ); - constructor(store: EditorStore, webSocketClient: XtextWebSocketClient) { + constructor( + private readonly store: EditorStore, + private readonly webSocketClient: XtextWebSocketClient, + ) { this.resourceName = `${nanoid(7)}.problem`; - this.store = store; - this.dirtyChanges = this.newEmptyChangeSet(); - this.webSocketClient = webSocketClient; + this.tracker = new UpdateStateTracker(store); + } + + get xtextStateId(): string | undefined { + return this.tracker.xtextStateId; + } + + computeChangesSinceLastUpdate(): ChangeDesc { + return this.tracker.computeChangesSinceLastUpdate(); } onReconnect(): void { - this.xtextStateId = undefined; - this.updateFullText().catch((error) => { + this.tracker.invalidateStateId(); + this.updateFullTextOrThrow().catch((error) => { // Let E_TIMEOUT errors propagate, since if the first update times out, // we can't use the connection. if (error === E_CANCELED) { @@ -128,46 +80,17 @@ export default class UpdateService { } onTransaction(transaction: Transaction): void { - const setDirtyChangesEffect = transaction.effects.find((effect) => - effect.is(setDirtyChanges), - ) as StateEffect | undefined; - if (setDirtyChangesEffect) { - const { value } = setDirtyChangesEffect; - if (this.pendingUpdate !== undefined) { - // Do not clear `pendingUpdate`, because that would indicate an update failure - // to `withUpdateExclusive`. - this.pendingUpdate = ChangeSet.empty(value.length); - } - this.dirtyChanges = value; - return; - } - if (transaction.docChanged) { - this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); + if (this.tracker.onTransaction(transaction)) { this.idleUpdateLater(); } } - /** - * Computes the summary of any changes happened since the last complete update. - * - * The result reflects any changes that happened since the `xtextStateId` - * version was uploaded to the server. - * - * @returns the summary of changes since the last update - */ - computeChangesSinceLastUpdate(): ChangeDesc { - return ( - this.pendingUpdate?.composeDesc(this.dirtyChanges.desc) ?? - this.dirtyChanges.desc - ); - } - private idleUpdate(): void { - if (!this.webSocketClient.isOpen || this.dirtyChanges.empty) { + if (!this.webSocketClient.isOpen || !this.tracker.hasDirtyChanges) { return; } - if (!this.mutex.isLocked()) { - this.update().catch((error) => { + if (!this.tracker.locekdForUpdate) { + this.updateOrThrow().catch((error) => { if (error === E_CANCELED || error === E_TIMEOUT) { log.debug('Idle update cancelled'); return; @@ -178,28 +101,6 @@ export default class UpdateService { this.idleUpdateLater(); } - private newEmptyChangeSet(): ChangeSet { - return ChangeSet.of([], this.store.state.doc.length); - } - - private updateFullText(): Promise { - return this.runExclusive(() => this.updateFullTextExclusive()); - } - - private async updateFullTextExclusive(): Promise { - await this.withVoidUpdateExclusive(() => this.doUpdateFullTextExclusive()); - } - - private async doUpdateFullTextExclusive(): Promise { - const result = await this.webSocketClient.send({ - resource: this.resourceName, - serviceType: 'update', - fullText: this.store.state.doc.sliceString(0), - }); - const { stateId } = DocumentStateResult.parse(result); - return stateId; - } - /** * Makes sure that the document state on the server reflects recent * local changes. @@ -209,26 +110,34 @@ export default class UpdateService { * * @returns a promise resolving when the update is completed */ - private async update(): Promise { + private async updateOrThrow(): Promise { // We may check here for the delta to avoid locking, // but we'll need to recompute the delta in the critical section, // because it may have changed by the time we can acquire the lock. - if (this.dirtyChanges.empty) { + if ( + !this.tracker.hasDirtyChanges && + this.tracker.xtextStateId !== undefined + ) { return; } - await this.runExclusive(() => this.updateExclusive()); + await this.tracker.runExclusive((lockedState) => + this.updateExclusive(lockedState), + ); } - private async updateExclusive(): Promise { + private async updateExclusive(lockedState: LockedState): Promise { if (this.xtextStateId === undefined) { - await this.updateFullTextExclusive(); + await this.updateFullTextExclusive(lockedState); } - const delta = this.computeDelta(); - if (delta === undefined) { + if (!this.tracker.hasDirtyChanges) { return; } - log.trace('Editor delta', delta); - await this.withVoidUpdateExclusive(async () => { + await lockedState.updateExclusive(async (pendingUpdate) => { + const delta = pendingUpdate.prepareDeltaUpdateExclusive(); + if (delta === undefined) { + return undefined; + } + log.trace('Editor delta', delta); const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'update', @@ -240,79 +149,50 @@ export default class UpdateService { return parsedDocumentStateResult.data.stateId; } if (isConflictResult(result, 'invalidStateId')) { - return this.doFallbackUpdateFullTextExclusive(); + return this.doUpdateFullTextExclusive(pendingUpdate); } throw parsedDocumentStateResult.error; }); } - async fetchOccurrences( - getCaretOffset: () => CancellableResult, - ): Promise> { - try { - await this.update(); - } catch (error) { - if (error === E_CANCELED || error === E_TIMEOUT) { - return { cancelled: true }; - } - throw error; - } - if (!this.dirtyChanges.empty || this.mutex.isLocked()) { - // Just give up if another update is in progress. - return { cancelled: true }; - } - const caretOffsetResult = getCaretOffset(); - if (caretOffsetResult.cancelled) { - return { cancelled: true }; - } - const expectedStateId = this.xtextStateId; - const data = await this.webSocketClient.send({ + private updateFullTextOrThrow(): Promise { + return this.tracker.runExclusive((lockedState) => + this.updateFullTextExclusive(lockedState), + ); + } + + private async updateFullTextExclusive( + lockedState: LockedState, + ): Promise { + await lockedState.updateExclusive((pendingUpdate) => + this.doUpdateFullTextExclusive(pendingUpdate), + ); + } + + private async doUpdateFullTextExclusive( + pendingUpdate: PendingUpdate, + ): Promise { + log.debug('Performing full text update'); + pendingUpdate.extendPendingUpdateExclusive(); + const result = await this.webSocketClient.send({ resource: this.resourceName, - serviceType: 'occurrences', - caretOffset: caretOffsetResult.data, - expectedStateId, + serviceType: 'update', + fullText: this.store.state.doc.sliceString(0), }); - if ( - // The query must have reached the server without being conflicted with an update - // or cancelled server-side. - isConflictResult(data) || - // And no state update should have occurred since then. - this.xtextStateId !== expectedStateId || - // And there should be no change to the editor text since then. - !this.dirtyChanges.empty || - // And there should be no state update in progress. - this.mutex.isLocked() - ) { - return { cancelled: true }; - } - const parsedOccurrencesResult = OccurrencesResult.safeParse(data); - if (!parsedOccurrencesResult.success) { - log.error( - 'Unexpected occurences result', - data, - 'not an OccurrencesResult:', - parsedOccurrencesResult.error, - ); - throw parsedOccurrencesResult.error; - } - if (parsedOccurrencesResult.data.stateId !== expectedStateId) { - return { cancelled: true }; - } - return { cancelled: false, data: parsedOccurrencesResult.data }; + const { stateId } = DocumentStateResult.parse(result); + return stateId; } async fetchContentAssist( params: ContentAssistParams, signal: AbortSignal, ): Promise { - if (!this.mutex.isLocked && this.xtextStateId !== undefined) { + if (this.tracker.upToDate && this.xtextStateId !== undefined) { return this.fetchContentAssistFetchOnly(params, this.xtextStateId); } - // Content assist updates should have priority over other updates. - this.mutex.cancel(); try { - return await this.runExclusive(() => - this.fetchContentAssistExclusive(params, signal), + return await this.tracker.runExclusiveHighPriority((lockedState) => + this.fetchContentAssistExclusive(params, lockedState, signal), ); } catch (error) { if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { @@ -324,20 +204,30 @@ export default class UpdateService { private async fetchContentAssistExclusive( params: ContentAssistParams, + lockedState: LockedState, signal: AbortSignal, ): Promise { if (this.xtextStateId === undefined) { - await this.updateFullTextExclusive(); + await this.updateFullTextExclusive(lockedState); } if (signal.aborted) { return []; } - const delta = this.computeDelta(); - if (delta !== undefined) { - log.trace('Editor delta', delta); + if (this.tracker.hasDirtyChanges) { // Try to fetch while also performing a delta update. - const fetchUpdateEntries = await this.withUpdateExclusive(() => - this.doFetchContentAssistWithDeltaExclusive(params, delta), + const fetchUpdateEntries = await lockedState.withUpdateExclusive( + async (pendingUpdate) => { + const delta = pendingUpdate.prepareDeltaUpdateExclusive(); + if (delta === undefined) { + return { newStateId: undefined, data: undefined }; + } + log.trace('Editor delta', delta); + return this.doFetchContentAssistWithDeltaExclusive( + params, + pendingUpdate, + delta, + ); + }, ); if (fetchUpdateEntries !== undefined) { return fetchUpdateEntries; @@ -354,6 +244,7 @@ export default class UpdateService { private async doFetchContentAssistWithDeltaExclusive( params: ContentAssistParams, + pendingUpdate: PendingUpdate, delta: Delta, ): Promise> { const fetchUpdateResult = await this.webSocketClient.send({ @@ -372,7 +263,7 @@ export default class UpdateService { } if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { log.warn('Server state invalid during content assist'); - const newStateId = await this.doFallbackUpdateFullTextExclusive(); + const newStateId = await this.doUpdateFullTextExclusive(pendingUpdate); // We must finish this state update transaction to prepare for any push events // before querying for content assist, so we just return `undefined` and will query // the content assist service later. @@ -402,33 +293,21 @@ export default class UpdateService { return fetchOnlyEntries; } - async formatText(): Promise { - let retries = 0; - while (retries < FORMAT_TEXT_RETRIES) { - try { - // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. - await this.runExclusive(() => this.formatTextExclusive()); - return; - } catch (error) { - // Let timeout errors propagate to give up formatting on a flaky connection. - if (error === E_CANCELED && retries < FORMAT_TEXT_RETRIES) { - retries += 1; - } else { - throw error; - } - } - } + formatText(): Promise { + return this.tracker.runExclusiveWithRetries((lockedState) => + this.formatTextExclusive(lockedState), + ); } - private async formatTextExclusive(): Promise { - await this.updateExclusive(); + private async formatTextExclusive(lockedState: LockedState): Promise { + await this.updateExclusive(lockedState); let { from, to } = this.store.state.selection.main; if (to <= from) { from = 0; to = this.store.state.doc.length; } log.debug('Formatting from', from, 'to', to); - await this.withVoidUpdateExclusive(async () => { + await lockedState.updateExclusive(async (pendingUpdate) => { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'format', @@ -436,7 +315,7 @@ export default class UpdateService { selectionEnd: to, }); const { stateId, formattedText } = FormattingResult.parse(result); - this.applyBeforeDirtyChanges({ + pendingUpdate.applyBeforeDirtyChangesExclusive({ from, to, insert: formattedText, @@ -445,119 +324,46 @@ export default class UpdateService { }); } - private computeDelta(): Delta | undefined { - if (this.dirtyChanges.empty) { - return undefined; - } - let minFromA = Number.MAX_SAFE_INTEGER; - let maxToA = 0; - let minFromB = Number.MAX_SAFE_INTEGER; - let maxToB = 0; - this.dirtyChanges.iterChangedRanges((fromA, toA, fromB, toB) => { - minFromA = Math.min(minFromA, fromA); - maxToA = Math.max(maxToA, toA); - minFromB = Math.min(minFromB, fromB); - maxToB = Math.max(maxToB, toB); - }); - return { - deltaOffset: minFromA, - deltaReplaceLength: maxToA - minFromA, - deltaText: this.store.state.doc.sliceString(minFromB, maxToB), - }; - } - - private applyBeforeDirtyChanges(changeSpec: ChangeSpec): void { - const pendingChanges = - this.pendingUpdate?.compose(this.dirtyChanges) ?? this.dirtyChanges; - const revertChanges = pendingChanges.invert(this.store.state.doc); - const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength); - const redoChanges = pendingChanges.map(applyBefore.desc); - const changeSet = revertChanges.compose(applyBefore).compose(redoChanges); - this.store.dispatch({ - changes: changeSet, - // Keep the current set of dirty changes (but update them according the re-formatting) - // and to not add the formatting the dirty changes. - effects: [setDirtyChanges.of(redoChanges)], - }); - } - - private runExclusive(callback: () => Promise): Promise { - return this.mutex.runExclusive(async () => { - if (this.pendingUpdate !== undefined) { - throw new Error('Update is pending before entering critical section'); - } - const result = await callback(); - if (this.pendingUpdate !== undefined) { - throw new Error('Update is pending after entering critical section'); - } - return result; - }); - } - - private withVoidUpdateExclusive( - callback: () => Promise, - ): Promise { - return this.withUpdateExclusive(async () => { - const newStateId = await callback(); - return { newStateId, data: undefined }; - }); - } - - /** - * Executes an asynchronous callback that updates the state on the server. - * - * Ensures that updates happen sequentially and manages `pendingUpdate` - * and `dirtyChanges` to reflect changes being synchronized to the server - * and not yet synchronized to the server, respectively. - * - * Optionally, `callback` may return a second value that is retured by this function. - * - * Once the remote procedure call to update the server state finishes - * and returns the new `stateId`, `callback` must return _immediately_ - * to ensure that the local `stateId` is updated likewise to be able to handle - * push messages referring to the new `stateId` from the server. - * If additional work is needed to compute the second value in some cases, - * use `T | undefined` instead of `T` as a return type and signal the need for additional - * computations by returning `undefined`. Thus additional computations can be performed - * outside of the critical section. - * - * @param callback the asynchronous callback that updates the server state - * @returns a promise resolving to the second value returned by `callback` - */ - private async withUpdateExclusive( - callback: () => Promise>, - ): Promise { - if (this.pendingUpdate !== undefined) { - throw new Error('Delta updates are not reentrant'); - } - this.pendingUpdate = this.dirtyChanges; - this.dirtyChanges = this.newEmptyChangeSet(); - let data: T; + async fetchOccurrences( + getCaretOffset: () => CancellableResult, + ): Promise> { try { - ({ newStateId: this.xtextStateId, data } = await callback()); - this.pendingUpdate = undefined; - } catch (e) { - log.error('Error while update', e); - if (this.pendingUpdate === undefined) { - log.error('pendingUpdate was cleared during update'); - } else { - this.dirtyChanges = this.pendingUpdate.compose(this.dirtyChanges); + await this.updateOrThrow(); + } catch (error) { + if (error === E_CANCELED || error === E_TIMEOUT) { + return { cancelled: true }; } - this.pendingUpdate = undefined; - this.webSocketClient.forceReconnectOnError(); - throw e; + throw error; } - return data; - } - - private doFallbackUpdateFullTextExclusive(): Promise { - if (this.pendingUpdate === undefined) { - throw new Error('Only a pending update can be extended'); + if (!this.tracker.upToDate) { + // Just give up if another update is in progress. + return { cancelled: true }; + } + const caretOffsetResult = getCaretOffset(); + if (caretOffsetResult.cancelled) { + return { cancelled: true }; + } + const expectedStateId = this.xtextStateId; + if (expectedStateId === undefined) { + // If there is no state on the server, don't bother with finding occurrences. + return { cancelled: true }; + } + const data = await this.webSocketClient.send({ + resource: this.resourceName, + serviceType: 'occurrences', + caretOffset: caretOffsetResult.data, + expectedStateId, + }); + if ( + isConflictResult(data) || + this.tracker.hasChangesSince(expectedStateId) + ) { + return { cancelled: true }; + } + const parsedOccurrencesResult = OccurrencesResult.parse(data); + if (parsedOccurrencesResult.stateId !== expectedStateId) { + return { cancelled: true }; } - log.warn('Delta update failed, performing full text update'); - this.xtextStateId = undefined; - this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); - this.dirtyChanges = this.newEmptyChangeSet(); - return this.doUpdateFullTextExclusive(); + return { cancelled: false, data: parsedOccurrencesResult }; } } diff --git a/subprojects/frontend/src/xtext/UpdateStateTracker.ts b/subprojects/frontend/src/xtext/UpdateStateTracker.ts new file mode 100644 index 00000000..04359060 --- /dev/null +++ b/subprojects/frontend/src/xtext/UpdateStateTracker.ts @@ -0,0 +1,357 @@ +/** + * @file State tracker for pushing updates to the Xtext server. + * + * This file implements complex logic to avoid missing or overwriting state updates + * and to avoid sending conflicting updates to the Xtext server. + * + * The `LockedState` and `PendingUpdate` objects are used as capabilities to + * signify whether the socket to the Xtext server is locked for updates and + * whether an update is in progress, respectively. + * Always use these objects only received as an argument of a lambda expression + * or method and never leak them into class field or global variables. + * The presence of such an objects in the scope should always imply that + * the corresponding condition holds. + */ + +import { + type ChangeDesc, + ChangeSet, + type ChangeSpec, + StateEffect, + type Transaction, +} from '@codemirror/state'; +import { E_CANCELED, Mutex, withTimeout } from 'async-mutex'; + +import type EditorStore from '../editor/EditorStore'; +import getLogger from '../utils/getLogger'; + +const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; + +const log = getLogger('xtext.UpdateStateTracker'); + +/** + * State effect used to override the dirty changes after a transaction. + * + * If this state effect is _not_ present in a transaction, + * the transaction will be appended to the current dirty changes. + * + * If this state effect is present, the current dirty changes will be replaced + * by the value of this effect. + */ +const setDirtyChanges = StateEffect.define(); + +export interface StateUpdateResult { + /** The new state ID on the server or `undefined` if no update was performed. */ + newStateId: string | undefined; + + /** Optional data payload received during the update. */ + data: T; +} + +/** + * Signifies a capability that the Xtext server state is locked for update. + */ +export interface LockedState { + /** + * + * @param callback the asynchronous callback that updates the server state + * @returns a promise resolving after the update + */ + updateExclusive( + callback: (pendingUpdate: PendingUpdate) => Promise, + ): Promise; + + /** + * Executes an asynchronous callback that updates the state on the server. + * + * If the callback returns `undefined` as the `newStateId`, + * the update is assumed to be aborted and any pending changes will be marked as dirt again. + * Any exceptions thrown in `callback` will also cause the update to be aborted. + * + * Ensures that updates happen sequentially and manages `pendingUpdate` + * and `dirtyChanges` to reflect changes being synchronized to the server + * and not yet synchronized to the server, respectively. + * + * Optionally, `callback` may return a second value that is retured by this function. + * + * Once the remote procedure call to update the server state finishes + * and returns the new `stateId`, `callback` must return _immediately_ + * to ensure that the local `stateId` is updated likewise to be able to handle + * push messages referring to the new `stateId` from the server. + * If additional asynchronous work is needed to compute the second value in some cases, + * use `T | undefined` instead of `T` as a return type and signal the need for additional + * computations by returning `undefined`. Thus additional computations can be performed + * outside of the critical section. + * + * @param callback the asynchronous callback that updates the server state + * @returns a promise resolving to the second value returned by `callback` + */ + withUpdateExclusive( + callback: (pendingUpdate: PendingUpdate) => Promise>, + ): Promise; +} + +export interface Delta { + deltaOffset: number; + + deltaReplaceLength: number; + + deltaText: string; +} + +/** + * Signifies a capability that dirty changes are being marked for uploading. + */ +export interface PendingUpdate { + prepareDeltaUpdateExclusive(): Delta | undefined; + + extendPendingUpdateExclusive(): void; + + applyBeforeDirtyChangesExclusive(changeSpec: ChangeSpec): void; +} + +export default class UpdateStateTracker { + xtextStateId: string | undefined; + + /** + * The changes being synchronized to the server if a full or delta text update is running + * withing a `withUpdateExclusive` block, `undefined` otherwise. + * + * Must be `undefined` before and after entering the critical section of `mutex` + * and may only be changes in the critical section of `mutex`. + * + * Methods named with an `Exclusive` suffix in this class assume that the mutex is held + * and may call `updateExclusive` or `withUpdateExclusive` to mutate this field. + * + * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` + * block and require this field to be non-`undefined`. + */ + private pendingChanges: ChangeSet | undefined; + + /** + * Local changes not yet sychronized to the server and not part of the current update, if any. + */ + private dirtyChanges: ChangeSet; + + /** + * Locked when we try to modify the state on the server. + */ + private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); + + constructor(private readonly store: EditorStore) { + this.dirtyChanges = this.newEmptyChangeSet(); + } + + get locekdForUpdate(): boolean { + return this.mutex.isLocked(); + } + + get hasDirtyChanges(): boolean { + return !this.dirtyChanges.empty; + } + + get upToDate(): boolean { + return !this.locekdForUpdate && !this.hasDirtyChanges; + } + + hasChangesSince(xtextStateId: string): boolean { + return ( + this.xtextStateId !== xtextStateId || + this.locekdForUpdate || + this.hasDirtyChanges + ); + } + + /** + * Extends the current set of changes with `transaction`. + * + * Also determines if the transaction has made local changes + * that will have to be synchronized to the server + * + * @param transaction the transaction that affected the editor + * @returns `true` if the transaction requires and idle update, `false` otherwise + */ + onTransaction(transaction: Transaction): boolean { + const setDirtyChangesEffect = transaction.effects.find((effect) => + effect.is(setDirtyChanges), + ) as StateEffect | undefined; + if (setDirtyChangesEffect) { + const { value } = setDirtyChangesEffect; + if (this.pendingChanges !== undefined) { + // Do not clear `pendingUpdate`, because that would indicate an update failure + // to `withUpdateExclusive`. + this.pendingChanges = ChangeSet.empty(value.length); + } + this.dirtyChanges = value; + return false; + } + if (transaction.docChanged) { + this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); + return true; + } + return false; + } + + invalidateStateId(): void { + this.xtextStateId = undefined; + } + + /** + * Computes the summary of any changes happened since the last complete update. + * + * The result reflects any changes that happened since the `xtextStateId` + * version was uploaded to the server. + * + * @returns the summary of changes since the last update + */ + computeChangesSinceLastUpdate(): ChangeDesc { + return ( + this.pendingChanges?.composeDesc(this.dirtyChanges.desc) ?? + this.dirtyChanges.desc + ); + } + + private newEmptyChangeSet(): ChangeSet { + return ChangeSet.of([], this.store.state.doc.length); + } + + private readonly pendingUpdate: PendingUpdate = { + prepareDeltaUpdateExclusive: (): Delta | undefined => { + this.pendingUpdate.extendPendingUpdateExclusive(); + if (this.pendingChanges === undefined || this.pendingChanges.empty) { + return undefined; + } + let minFromA = Number.MAX_SAFE_INTEGER; + let maxToA = 0; + let minFromB = Number.MAX_SAFE_INTEGER; + let maxToB = 0; + this.pendingChanges.iterChangedRanges((fromA, toA, fromB, toB) => { + minFromA = Math.min(minFromA, fromA); + maxToA = Math.max(maxToA, toA); + minFromB = Math.min(minFromB, fromB); + maxToB = Math.max(maxToB, toB); + }); + return { + deltaOffset: minFromA, + deltaReplaceLength: maxToA - minFromA, + deltaText: this.store.state.doc.sliceString(minFromB, maxToB), + }; + }, + extendPendingUpdateExclusive: (): void => { + if (!this.locekdForUpdate) { + throw new Error('Cannot update state without locking the mutex'); + } + if (this.hasDirtyChanges) { + this.pendingChanges = + this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges; + this.dirtyChanges = this.newEmptyChangeSet(); + } + }, + applyBeforeDirtyChangesExclusive: (changeSpec: ChangeSpec): void => { + if (!this.locekdForUpdate) { + throw new Error('Cannot update state without locking the mutex'); + } + const pendingChanges = + this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges; + const revertChanges = pendingChanges.invert(this.store.state.doc); + const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength); + const redoChanges = pendingChanges.map(applyBefore.desc); + const changeSet = revertChanges.compose(applyBefore).compose(redoChanges); + this.store.dispatch({ + changes: changeSet, + // Keep the current set of dirty changes (but update them according the re-formatting) + // and to not add the formatting the dirty changes. + effects: [setDirtyChanges.of(redoChanges)], + }); + }, + }; + + private readonly lockedState: LockedState = { + updateExclusive: ( + callback: (pendingUpdate: PendingUpdate) => Promise, + ): Promise => { + return this.lockedState.withUpdateExclusive( + async (pendingUpdate) => { + const newStateId = await callback(pendingUpdate); + return { newStateId, data: undefined }; + }, + ); + }, + withUpdateExclusive: async ( + callback: (pendingUpdate: PendingUpdate) => Promise>, + ): Promise => { + if (!this.locekdForUpdate) { + throw new Error('Cannot update state without locking the mutex'); + } + if (this.pendingChanges !== undefined) { + throw new Error('Delta updates are not reentrant'); + } + let newStateId: string | undefined; + let data: T; + try { + ({ newStateId, data } = await callback(this.pendingUpdate)); + } catch (e) { + log.error('Error while update', e); + this.cancelUpdate(); + throw e; + } + if (newStateId === undefined) { + this.cancelUpdate(); + } else { + this.xtextStateId = newStateId; + this.pendingChanges = undefined; + } + return data; + }, + }; + + private cancelUpdate(): void { + if (this.pendingChanges === undefined) { + return; + } + this.dirtyChanges = this.pendingChanges.compose(this.dirtyChanges); + this.pendingChanges = undefined; + } + + runExclusive( + callback: (lockedState: LockedState) => Promise, + ): Promise { + return this.mutex.runExclusive(async () => { + if (this.pendingChanges !== undefined) { + throw new Error('Update is pending before entering critical section'); + } + const result = await callback(this.lockedState); + if (this.pendingChanges !== undefined) { + throw new Error('Update is pending after entering critical section'); + } + return result; + }); + } + + runExclusiveHighPriority( + callback: (lockedState: LockedState) => Promise, + ): Promise { + this.mutex.cancel(); + return this.runExclusive(callback); + } + + async runExclusiveWithRetries( + callback: (lockedState: LockedState) => Promise, + maxRetries = 5, + ): Promise { + let retries = 0; + while (retries < maxRetries) { + try { + // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. + return await this.runExclusive(callback); + } catch (error) { + // Let timeout errors propagate to give up retrying on a flaky connection. + if (error !== E_CANCELED) { + throw error; + } + retries += 1; + } + } + throw E_CANCELED; + } +} -- cgit v1.2.3-54-g00ecf