From f6de69943df6c669c81b10279bbfaf3158f1fd61 Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Thu, 25 Aug 2022 01:25:40 +0200 Subject: fix(frontend): UpdateService synchronization Also bumps frontend dependencies --- subprojects/frontend/src/xtext/UpdateService.ts | 296 ++++++++++++++++++------ 1 file changed, 223 insertions(+), 73 deletions(-) (limited to 'subprojects/frontend/src/xtext/UpdateService.ts') diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts index f8b71160..3b4ae259 100644 --- a/subprojects/frontend/src/xtext/UpdateService.ts +++ b/subprojects/frontend/src/xtext/UpdateService.ts @@ -5,11 +5,11 @@ import { StateEffect, type Transaction, } from '@codemirror/state'; +import { E_CANCELED, E_TIMEOUT, Mutex, withTimeout } from 'async-mutex'; +import { debounce } from 'lodash-es'; import { nanoid } from 'nanoid'; import type EditorStore from '../editor/EditorStore'; -import ConditionVariable from '../utils/ConditionVariable'; -import Timer from '../utils/Timer'; import getLogger from '../utils/getLogger'; import type XtextWebSocketClient from './XtextWebSocketClient'; @@ -19,12 +19,15 @@ import { DocumentStateResult, FormattingResult, isConflictResult, + OccurrencesResult, } from './xtextServiceResults'; const UPDATE_TIMEOUT_MS = 500; const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; +const FORMAT_TEXT_RETRIES = 5; + const log = getLogger('xtext.UpdateService'); /** @@ -38,10 +41,20 @@ const log = getLogger('xtext.UpdateService'); */ const setDirtyChanges = StateEffect.define(); -export interface IAbortSignal { +export interface AbortSignal { aborted: boolean; } +export interface ContentAssistParams { + caretOffset: number; + + proposalsLimit: number; +} + +export type CancellableResult = + | { cancelled: false; data: T } + | { cancelled: true }; + interface StateUpdateResult { newStateId: string; @@ -57,15 +70,27 @@ interface Delta { } export default class UpdateService { - resourceName: string; + readonly resourceName: string; xtextStateId: string | undefined; private readonly store: EditorStore; + private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); + /** - * The changes being synchronized to the server if a full or delta text update is running, - * `undefined` otherwise. + * The changes being synchronized to the server if a full or delta text update is running + * withing a `withUpdateExclusive` block, `undefined` otherwise. + * + * Must be `undefined` before and after entering the critical section of `mutex` + * and may only be changes in the critical section of `mutex`. + * + * Methods named with an `Exclusive` suffix in this class assume that the mutex is held + * and may call `withUpdateExclusive` or `doFallbackUpdateFullTextExclusive` + * to mutate this field. + * + * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` + * block and require this field to be non-`undefined`. */ private pendingUpdate: ChangeSet | undefined; @@ -76,15 +101,11 @@ export default class UpdateService { private readonly webSocketClient: XtextWebSocketClient; - private readonly updatedCondition = new ConditionVariable( - () => this.pendingUpdate === undefined && this.xtextStateId !== undefined, - WAIT_FOR_UPDATE_TIMEOUT_MS, + private readonly idleUpdateLater = debounce( + () => this.idleUpdate(), + UPDATE_TIMEOUT_MS, ); - private readonly idleUpdateTimer = new Timer(() => { - this.handleIdleUpdate(); - }, UPDATE_TIMEOUT_MS); - constructor(store: EditorStore, webSocketClient: XtextWebSocketClient) { this.resourceName = `${nanoid(7)}.problem`; this.store = store; @@ -95,6 +116,13 @@ export default class UpdateService { onReconnect(): void { this.xtextStateId = undefined; this.updateFullText().catch((error) => { + // Let E_TIMEOUT errors propagate, since if the first update times out, + // we can't use the connection. + if (error === E_CANCELED) { + // Content assist will perform a full-text update anyways. + log.debug('Full text update cancelled'); + return; + } log.error('Unexpected error during initial update', error); }); } @@ -106,6 +134,8 @@ export default class UpdateService { if (setDirtyChangesEffect) { const { value } = setDirtyChangesEffect; if (this.pendingUpdate !== undefined) { + // Do not clear `pendingUpdate`, because that would indicate an update failure + // to `withUpdateExclusive`. this.pendingUpdate = ChangeSet.empty(value.length); } this.dirtyChanges = value; @@ -113,7 +143,7 @@ export default class UpdateService { } if (transaction.docChanged) { this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); - this.idleUpdateTimer.reschedule(); + this.idleUpdateLater(); } } @@ -132,34 +162,42 @@ export default class UpdateService { ); } - private handleIdleUpdate(): void { + private idleUpdate(): void { if (!this.webSocketClient.isOpen || this.dirtyChanges.empty) { return; } - if (this.pendingUpdate === undefined) { + if (!this.mutex.isLocked()) { this.update().catch((error) => { + if (error === E_CANCELED || error === E_TIMEOUT) { + log.debug('Idle update cancelled'); + return; + } log.error('Unexpected error during scheduled update', error); }); } - this.idleUpdateTimer.reschedule(); + this.idleUpdateLater(); } private newEmptyChangeSet(): ChangeSet { return ChangeSet.of([], this.store.state.doc.length); } - async updateFullText(): Promise { - await this.withUpdate(() => this.doUpdateFullText()); + private updateFullText(): Promise { + return this.runExclusive(() => this.updateFullTextExclusive()); } - private async doUpdateFullText(): Promise> { + private async updateFullTextExclusive(): Promise { + await this.withVoidUpdateExclusive(() => this.doUpdateFullTextExclusive()); + } + + private async doUpdateFullTextExclusive(): Promise { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'update', fullText: this.store.state.doc.sliceString(0), }); const { stateId } = DocumentStateResult.parse(result); - return { newStateId: stateId, data: undefined }; + return stateId; } /** @@ -171,14 +209,26 @@ export default class UpdateService { * * @returns a promise resolving when the update is completed */ - async update(): Promise { - await this.prepareForDeltaUpdate(); + private async update(): Promise { + // We may check here for the delta to avoid locking, + // but we'll need to recompute the delta in the critical section, + // because it may have changed by the time we can acquire the lock. + if (this.dirtyChanges.empty) { + return; + } + await this.runExclusive(() => this.updateExclusive()); + } + + private async updateExclusive(): Promise { + if (this.xtextStateId === undefined) { + await this.updateFullTextExclusive(); + } const delta = this.computeDelta(); if (delta === undefined) { return; } log.trace('Editor delta', delta); - await this.withUpdate(async () => { + await this.withVoidUpdateExclusive(async () => { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'update', @@ -187,34 +237,98 @@ export default class UpdateService { }); const parsedDocumentStateResult = DocumentStateResult.safeParse(result); if (parsedDocumentStateResult.success) { - return { - newStateId: parsedDocumentStateResult.data.stateId, - data: undefined, - }; + return parsedDocumentStateResult.data.stateId; } if (isConflictResult(result, 'invalidStateId')) { - return this.doFallbackToUpdateFullText(); + return this.doFallbackUpdateFullTextExclusive(); } throw parsedDocumentStateResult.error; }); } - private doFallbackToUpdateFullText(): Promise> { - if (this.pendingUpdate === undefined) { - throw new Error('Only a pending update can be extended'); + async fetchOccurrences( + getCaretOffset: () => CancellableResult, + ): Promise> { + try { + await this.update(); + } catch (error) { + if (error === E_CANCELED || error === E_TIMEOUT) { + return { cancelled: true }; + } + throw error; } - log.warn('Delta update failed, performing full text update'); - this.xtextStateId = undefined; - this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); - this.dirtyChanges = this.newEmptyChangeSet(); - return this.doUpdateFullText(); + if (!this.dirtyChanges.empty || this.mutex.isLocked()) { + // Just give up if another update is in progress. + return { cancelled: true }; + } + const caretOffsetResult = getCaretOffset(); + if (caretOffsetResult.cancelled) { + return { cancelled: true }; + } + const expectedStateId = this.xtextStateId; + const data = await this.webSocketClient.send({ + resource: this.resourceName, + serviceType: 'occurrences', + caretOffset: caretOffsetResult.data, + expectedStateId, + }); + if ( + // The query must have reached the server without being conflicted with an update + // or cancelled server-side. + isConflictResult(data) || + // And no state update should have occurred since then. + this.xtextStateId !== expectedStateId || + // And there should be no change to the editor text since then. + !this.dirtyChanges.empty || + // And there should be no state update in progress. + this.mutex.isLocked() + ) { + return { cancelled: true }; + } + const parsedOccurrencesResult = OccurrencesResult.safeParse(data); + if (!parsedOccurrencesResult.success) { + log.error( + 'Unexpected occurences result', + data, + 'not an OccurrencesResult:', + parsedOccurrencesResult.error, + ); + throw parsedOccurrencesResult.error; + } + if (parsedOccurrencesResult.data.stateId !== expectedStateId) { + return { cancelled: true }; + } + return { cancelled: false, data: parsedOccurrencesResult.data }; } async fetchContentAssist( - params: Record, - signal: IAbortSignal, + params: ContentAssistParams, + signal: AbortSignal, + ): Promise { + if (!this.mutex.isLocked && this.xtextStateId !== undefined) { + return this.fetchContentAssistFetchOnly(params, this.xtextStateId); + } + // Content assist updates should have priority over other updates. + this.mutex.cancel(); + try { + return await this.runExclusive(() => + this.fetchContentAssistExclusive(params, signal), + ); + } catch (error) { + if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { + return []; + } + throw error; + } + } + + private async fetchContentAssistExclusive( + params: ContentAssistParams, + signal: AbortSignal, ): Promise { - await this.prepareForDeltaUpdate(); + if (this.xtextStateId === undefined) { + await this.updateFullTextExclusive(); + } if (signal.aborted) { return []; } @@ -222,8 +336,8 @@ export default class UpdateService { if (delta !== undefined) { log.trace('Editor delta', delta); // Try to fetch while also performing a delta update. - const fetchUpdateEntries = await this.withUpdate(() => - this.doFetchContentAssistWithDelta(params, delta), + const fetchUpdateEntries = await this.withUpdateExclusive(() => + this.doFetchContentAssistWithDeltaExclusive(params, delta), ); if (fetchUpdateEntries !== undefined) { return fetchUpdateEntries; @@ -235,15 +349,17 @@ export default class UpdateService { if (this.xtextStateId === undefined) { throw new Error('failed to obtain Xtext state id'); } - return this.doFetchContentAssistFetchOnly(params, this.xtextStateId); + return this.fetchContentAssistFetchOnly(params, this.xtextStateId); } - private async doFetchContentAssistWithDelta( - params: Record, + private async doFetchContentAssistWithDeltaExclusive( + params: ContentAssistParams, delta: Delta, ): Promise> { const fetchUpdateResult = await this.webSocketClient.send({ ...params, + resource: this.resourceName, + serviceType: 'assist', requiredStateId: this.xtextStateId, ...delta, }); @@ -256,7 +372,7 @@ export default class UpdateService { } if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { log.warn('Server state invalid during content assist'); - const { newStateId } = await this.doFallbackToUpdateFullText(); + const newStateId = await this.doFallbackUpdateFullTextExclusive(); // We must finish this state update transaction to prepare for any push events // before querying for content assist, so we just return `undefined` and will query // the content assist service later. @@ -265,14 +381,16 @@ export default class UpdateService { throw parsedContentAssistResult.error; } - private async doFetchContentAssistFetchOnly( - params: Record, + private async fetchContentAssistFetchOnly( + params: ContentAssistParams, requiredStateId: string, ): Promise { // Fallback to fetching without a delta update. const fetchOnlyResult = await this.webSocketClient.send({ ...params, - requiredStateId: this.xtextStateId, + resource: this.resourceName, + serviceType: 'assist', + requiredStateId, }); const { stateId, entries: fetchOnlyEntries } = ContentAssistResult.parse(fetchOnlyResult); @@ -285,14 +403,32 @@ export default class UpdateService { } async formatText(): Promise { - await this.update(); + let retries = 0; + while (retries < FORMAT_TEXT_RETRIES) { + try { + // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. + await this.runExclusive(() => this.formatTextExclusive()); + return; + } catch (error) { + // Let timeout errors propagate to give up formatting on a flaky connection. + if (error === E_CANCELED && retries < FORMAT_TEXT_RETRIES) { + retries += 1; + } else { + throw error; + } + } + } + } + + private async formatTextExclusive(): Promise { + await this.updateExclusive(); let { from, to } = this.store.state.selection.main; if (to <= from) { from = 0; to = this.store.state.doc.length; } log.debug('Formatting from', from, 'to', to); - await this.withUpdate(async () => { + await this.withVoidUpdateExclusive(async () => { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'format', @@ -305,7 +441,7 @@ export default class UpdateService { to, insert: formattedText, }); - return { newStateId: stateId, data: undefined }; + return stateId; }); } @@ -345,6 +481,28 @@ export default class UpdateService { }); } + private runExclusive(callback: () => Promise): Promise { + return this.mutex.runExclusive(async () => { + if (this.pendingUpdate !== undefined) { + throw new Error('Update is pending before entering critical section'); + } + const result = await callback(); + if (this.pendingUpdate !== undefined) { + throw new Error('Update is pending after entering critical section'); + } + return result; + }); + } + + private withVoidUpdateExclusive( + callback: () => Promise, + ): Promise { + return this.withUpdateExclusive(async () => { + const newStateId = await callback(); + return { newStateId, data: undefined }; + }); + } + /** * Executes an asynchronous callback that updates the state on the server. * @@ -366,20 +524,18 @@ export default class UpdateService { * @param callback the asynchronous callback that updates the server state * @returns a promise resolving to the second value returned by `callback` */ - private async withUpdate( + private async withUpdateExclusive( callback: () => Promise>, ): Promise { if (this.pendingUpdate !== undefined) { - throw new Error('Another update is pending, will not perform update'); + throw new Error('Delta updates are not reentrant'); } this.pendingUpdate = this.dirtyChanges; this.dirtyChanges = this.newEmptyChangeSet(); + let data: T; try { - const { newStateId, data } = await callback(); - this.xtextStateId = newStateId; + ({ newStateId: this.xtextStateId, data } = await callback()); this.pendingUpdate = undefined; - this.updatedCondition.notifyAll(); - return data; } catch (e) { log.error('Error while update', e); if (this.pendingUpdate === undefined) { @@ -389,25 +545,19 @@ export default class UpdateService { } this.pendingUpdate = undefined; this.webSocketClient.forceReconnectOnError(); - this.updatedCondition.rejectAll(e); throw e; } + return data; } - /** - * Ensures that there is some state available on the server (`xtextStateId`) - * and that there is no pending update. - * - * After this function resolves, a delta text update is possible. - * - * @returns a promise resolving when there is a valid state id but no pending update - */ - private async prepareForDeltaUpdate(): Promise { - // If no update is pending, but the full text hasn't been uploaded to the server yet, - // we must start a full text upload. - if (this.pendingUpdate === undefined && this.xtextStateId === undefined) { - await this.updateFullText(); + private doFallbackUpdateFullTextExclusive(): Promise { + if (this.pendingUpdate === undefined) { + throw new Error('Only a pending update can be extended'); } - await this.updatedCondition.waitFor(); + log.warn('Delta update failed, performing full text update'); + this.xtextStateId = undefined; + this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); + this.dirtyChanges = this.newEmptyChangeSet(); + return this.doUpdateFullTextExclusive(); } } -- cgit v1.2.3-70-g09d2