From f6de69943df6c669c81b10279bbfaf3158f1fd61 Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Thu, 25 Aug 2022 01:25:40 +0200 Subject: fix(frontend): UpdateService synchronization Also bumps frontend dependencies --- .../frontend/src/utils/ConditionVariable.ts | 64 ----- subprojects/frontend/src/utils/PendingTask.ts | 20 +- .../frontend/src/xtext/ContentAssistService.ts | 8 +- .../frontend/src/xtext/HighlightingService.ts | 12 +- .../frontend/src/xtext/OccurrencesService.ts | 119 ++++----- subprojects/frontend/src/xtext/UpdateService.ts | 296 ++++++++++++++++----- .../frontend/src/xtext/ValidationService.ts | 12 +- subprojects/frontend/src/xtext/XtextClient.ts | 6 +- .../frontend/src/xtext/xtextServiceResults.ts | 5 +- 9 files changed, 291 insertions(+), 251 deletions(-) delete mode 100644 subprojects/frontend/src/utils/ConditionVariable.ts (limited to 'subprojects/frontend/src') diff --git a/subprojects/frontend/src/utils/ConditionVariable.ts b/subprojects/frontend/src/utils/ConditionVariable.ts deleted file mode 100644 index 1d3431f7..00000000 --- a/subprojects/frontend/src/utils/ConditionVariable.ts +++ /dev/null @@ -1,64 +0,0 @@ -import PendingTask from './PendingTask'; -import getLogger from './getLogger'; - -const log = getLogger('utils.ConditionVariable'); - -export type Condition = () => boolean; - -export default class ConditionVariable { - private readonly condition: Condition; - - private readonly defaultTimeout: number; - - private listeners: PendingTask[] = []; - - constructor(condition: Condition, defaultTimeout = 0) { - this.condition = condition; - this.defaultTimeout = defaultTimeout; - } - - async waitFor(timeoutMs?: number | undefined): Promise { - if (this.condition()) { - return; - } - const timeoutOrDefault = timeoutMs ?? this.defaultTimeout; - let nowMs = Date.now(); - const endMs = nowMs + timeoutOrDefault; - while (!this.condition() && nowMs < endMs) { - const remainingMs = endMs - nowMs; - const promise = new Promise((resolve, reject) => { - if (this.condition()) { - resolve(); - return; - } - const task = new PendingTask(resolve, reject, remainingMs); - this.listeners.push(task); - }); - // We must keep waiting until the update has completed, - // so the tasks can't be started in parallel. - // eslint-disable-next-line no-await-in-loop - await promise; - nowMs = Date.now(); - } - if (!this.condition()) { - log.error('Condition still does not hold after', timeoutOrDefault, 'ms'); - throw new Error('Failed to wait for condition'); - } - } - - notifyAll(): void { - this.clearListenersWith((listener) => listener.resolve()); - } - - rejectAll(error: unknown): void { - this.clearListenersWith((listener) => listener.reject(error)); - } - - private clearListenersWith(callback: (listener: PendingTask) => void) { - // Copy `listeners` so that we don't get into a race condition - // if one of the listeners adds another listener. - const { listeners } = this; - this.listeners = []; - listeners.forEach(callback); - } -} diff --git a/subprojects/frontend/src/utils/PendingTask.ts b/subprojects/frontend/src/utils/PendingTask.ts index 3976bdf9..205c8452 100644 --- a/subprojects/frontend/src/utils/PendingTask.ts +++ b/subprojects/frontend/src/utils/PendingTask.ts @@ -14,21 +14,19 @@ export default class PendingTask { constructor( resolveCallback: (value: T) => void, rejectCallback: (reason?: unknown) => void, - timeoutMs?: number | undefined, - timeoutCallback?: () => void | undefined, + timeoutMs: number | undefined, + timeoutCallback: () => void | undefined, ) { this.resolveCallback = resolveCallback; this.rejectCallback = rejectCallback; - if (timeoutMs) { - this.timeout = setTimeout(() => { - if (!this.resolved) { - this.reject(new Error('Request timed out')); - if (timeoutCallback) { - timeoutCallback(); - } + this.timeout = setTimeout(() => { + if (!this.resolved) { + this.reject(new Error('Request timed out')); + if (timeoutCallback) { + timeoutCallback(); } - }, timeoutMs); - } + } + }, timeoutMs); } resolve(value: T): void { diff --git a/subprojects/frontend/src/xtext/ContentAssistService.ts b/subprojects/frontend/src/xtext/ContentAssistService.ts index 39042812..9e41f57b 100644 --- a/subprojects/frontend/src/xtext/ContentAssistService.ts +++ b/subprojects/frontend/src/xtext/ContentAssistService.ts @@ -104,13 +104,9 @@ function createCompletion(entry: ContentAssistEntry): Completion { } export default class ContentAssistService { - private readonly updateService: UpdateService; - private lastCompletion: CompletionResult | undefined; - constructor(updateService: UpdateService) { - this.updateService = updateService; - } + constructor(private readonly updateService: UpdateService) {} onTransaction(transaction: Transaction): void { if (this.shouldInvalidateCachedCompletion(transaction)) { @@ -159,8 +155,6 @@ export default class ContentAssistService { this.lastCompletion = undefined; const entries = await this.updateService.fetchContentAssist( { - resource: this.updateService.resourceName, - serviceType: 'assist', caretOffset: context.pos, proposalsLimit: PROPOSALS_LIMIT, }, diff --git a/subprojects/frontend/src/xtext/HighlightingService.ts b/subprojects/frontend/src/xtext/HighlightingService.ts index cf618b96..f9ab7b7e 100644 --- a/subprojects/frontend/src/xtext/HighlightingService.ts +++ b/subprojects/frontend/src/xtext/HighlightingService.ts @@ -5,14 +5,10 @@ import type UpdateService from './UpdateService'; import { highlightingResult } from './xtextServiceResults'; export default class HighlightingService { - private readonly store: EditorStore; - - private readonly updateService: UpdateService; - - constructor(store: EditorStore, updateService: UpdateService) { - this.store = store; - this.updateService = updateService; - } + constructor( + private readonly store: EditorStore, + private readonly updateService: UpdateService, + ) {} onPush(push: unknown): void { const { regions } = highlightingResult.parse(push); diff --git a/subprojects/frontend/src/xtext/OccurrencesService.ts b/subprojects/frontend/src/xtext/OccurrencesService.ts index 35913f43..c8d6fd7b 100644 --- a/subprojects/frontend/src/xtext/OccurrencesService.ts +++ b/subprojects/frontend/src/xtext/OccurrencesService.ts @@ -1,20 +1,15 @@ import { Transaction } from '@codemirror/state'; +import { debounce } from 'lodash-es'; import type EditorStore from '../editor/EditorStore'; import { type IOccurrence, isCursorWithinOccurence, } from '../editor/findOccurrences'; -import Timer from '../utils/Timer'; import getLogger from '../utils/getLogger'; import type UpdateService from './UpdateService'; -import type XtextWebSocketClient from './XtextWebSocketClient'; -import { - isConflictResult, - OccurrencesResult, - type TextRegion, -} from './xtextServiceResults'; +import type { TextRegion } from './xtextServiceResults'; const FIND_OCCURRENCES_TIMEOUT_MS = 1000; @@ -34,38 +29,23 @@ function transformOccurrences(regions: TextRegion[]): IOccurrence[] { } export default class OccurrencesService { - private readonly store: EditorStore; - - private readonly webSocketClient: XtextWebSocketClient; - - private readonly updateService: UpdateService; - private hasOccurrences = false; - private readonly findOccurrencesTimer = new Timer(() => { - this.handleFindOccurrences(); - }, FIND_OCCURRENCES_TIMEOUT_MS); - - private readonly clearOccurrencesTimer = new Timer(() => { - this.clearOccurrences(); - }); + private readonly findOccurrencesLater = debounce( + () => this.findOccurrences(), + FIND_OCCURRENCES_TIMEOUT_MS, + ); constructor( - store: EditorStore, - webSocketClient: XtextWebSocketClient, - updateService: UpdateService, - ) { - this.store = store; - this.webSocketClient = webSocketClient; - this.updateService = updateService; - } + private readonly store: EditorStore, + private readonly updateService: UpdateService, + ) {} onTransaction(transaction: Transaction): void { if (transaction.docChanged) { // Must clear occurrences asynchronously from `onTransaction`, // because we must not emit a conflicting transaction when handling the pending transaction. - this.clearOccurrencesTimer.schedule(); - this.findOccurrencesTimer.reschedule(); + this.clearAndFindOccurrencesLater(); return; } if (!transaction.isUserEvent('select')) { @@ -73,11 +53,10 @@ export default class OccurrencesService { } if (this.needsOccurrences) { if (!isCursorWithinOccurence(this.store.state)) { - this.clearOccurrencesTimer.schedule(); - this.findOccurrencesTimer.reschedule(); + this.clearAndFindOccurrencesLater(); } } else { - this.clearOccurrencesTimer.schedule(); + this.clearOccurrencesLater(); } } @@ -85,8 +64,26 @@ export default class OccurrencesService { return this.store.state.selection.main.empty; } - private handleFindOccurrences() { - this.clearOccurrencesTimer.cancel(); + private clearAndFindOccurrencesLater(): void { + this.clearOccurrencesLater(); + this.findOccurrencesLater(); + } + + /** + * Clears the occurences from a new immediate task to let the current editor transaction finish. + */ + private clearOccurrencesLater() { + setTimeout(() => this.clearOccurrences(), 0); + } + + private clearOccurrences() { + if (this.hasOccurrences) { + this.store.updateOccurrences([], []); + this.hasOccurrences = false; + } + } + + private findOccurrences() { this.updateOccurrences().catch((error) => { log.error('Unexpected error while updating occurrences', error); this.clearOccurrences(); @@ -98,43 +95,26 @@ export default class OccurrencesService { this.clearOccurrences(); return; } - await this.updateService.update(); - const result = await this.webSocketClient.send({ - resource: this.updateService.resourceName, - serviceType: 'occurrences', - expectedStateId: this.updateService.xtextStateId, - caretOffset: this.store.state.selection.main.head, + const fetchResult = await this.updateService.fetchOccurrences(() => { + return this.needsOccurrences + ? { + cancelled: false, + data: this.store.state.selection.main.head, + } + : { cancelled: true }; }); - const allChanges = this.updateService.computeChangesSinceLastUpdate(); - if (!allChanges.empty || isConflictResult(result, 'canceled')) { + if (fetchResult.cancelled) { // Stale occurrences result, the user already made some changes. // We can safely ignore the occurrences and schedule a new find occurrences call. this.clearOccurrences(); - this.findOccurrencesTimer.schedule(); - return; - } - const parsedOccurrencesResult = OccurrencesResult.safeParse(result); - if (!parsedOccurrencesResult.success) { - log.error( - 'Unexpected occurences result', - result, - 'not an OccurrencesResult: ', - parsedOccurrencesResult.error, - ); - this.clearOccurrences(); - return; - } - const { stateId, writeRegions, readRegions } = parsedOccurrencesResult.data; - if (stateId !== this.updateService.xtextStateId) { - log.error( - 'Unexpected state id, expected:', - this.updateService.xtextStateId, - 'got:', - stateId, - ); - this.clearOccurrences(); + if (this.needsOccurrences) { + this.findOccurrencesLater(); + } return; } + const { + data: { writeRegions, readRegions }, + } = fetchResult; const write = transformOccurrences(writeRegions); const read = transformOccurrences(readRegions); this.hasOccurrences = write.length > 0 || read.length > 0; @@ -147,11 +127,4 @@ export default class OccurrencesService { ); this.store.updateOccurrences(write, read); } - - private clearOccurrences() { - if (this.hasOccurrences) { - this.store.updateOccurrences([], []); - this.hasOccurrences = false; - } - } } diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts index f8b71160..3b4ae259 100644 --- a/subprojects/frontend/src/xtext/UpdateService.ts +++ b/subprojects/frontend/src/xtext/UpdateService.ts @@ -5,11 +5,11 @@ import { StateEffect, type Transaction, } from '@codemirror/state'; +import { E_CANCELED, E_TIMEOUT, Mutex, withTimeout } from 'async-mutex'; +import { debounce } from 'lodash-es'; import { nanoid } from 'nanoid'; import type EditorStore from '../editor/EditorStore'; -import ConditionVariable from '../utils/ConditionVariable'; -import Timer from '../utils/Timer'; import getLogger from '../utils/getLogger'; import type XtextWebSocketClient from './XtextWebSocketClient'; @@ -19,12 +19,15 @@ import { DocumentStateResult, FormattingResult, isConflictResult, + OccurrencesResult, } from './xtextServiceResults'; const UPDATE_TIMEOUT_MS = 500; const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; +const FORMAT_TEXT_RETRIES = 5; + const log = getLogger('xtext.UpdateService'); /** @@ -38,10 +41,20 @@ const log = getLogger('xtext.UpdateService'); */ const setDirtyChanges = StateEffect.define(); -export interface IAbortSignal { +export interface AbortSignal { aborted: boolean; } +export interface ContentAssistParams { + caretOffset: number; + + proposalsLimit: number; +} + +export type CancellableResult = + | { cancelled: false; data: T } + | { cancelled: true }; + interface StateUpdateResult { newStateId: string; @@ -57,15 +70,27 @@ interface Delta { } export default class UpdateService { - resourceName: string; + readonly resourceName: string; xtextStateId: string | undefined; private readonly store: EditorStore; + private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); + /** - * The changes being synchronized to the server if a full or delta text update is running, - * `undefined` otherwise. + * The changes being synchronized to the server if a full or delta text update is running + * withing a `withUpdateExclusive` block, `undefined` otherwise. + * + * Must be `undefined` before and after entering the critical section of `mutex` + * and may only be changes in the critical section of `mutex`. + * + * Methods named with an `Exclusive` suffix in this class assume that the mutex is held + * and may call `withUpdateExclusive` or `doFallbackUpdateFullTextExclusive` + * to mutate this field. + * + * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` + * block and require this field to be non-`undefined`. */ private pendingUpdate: ChangeSet | undefined; @@ -76,15 +101,11 @@ export default class UpdateService { private readonly webSocketClient: XtextWebSocketClient; - private readonly updatedCondition = new ConditionVariable( - () => this.pendingUpdate === undefined && this.xtextStateId !== undefined, - WAIT_FOR_UPDATE_TIMEOUT_MS, + private readonly idleUpdateLater = debounce( + () => this.idleUpdate(), + UPDATE_TIMEOUT_MS, ); - private readonly idleUpdateTimer = new Timer(() => { - this.handleIdleUpdate(); - }, UPDATE_TIMEOUT_MS); - constructor(store: EditorStore, webSocketClient: XtextWebSocketClient) { this.resourceName = `${nanoid(7)}.problem`; this.store = store; @@ -95,6 +116,13 @@ export default class UpdateService { onReconnect(): void { this.xtextStateId = undefined; this.updateFullText().catch((error) => { + // Let E_TIMEOUT errors propagate, since if the first update times out, + // we can't use the connection. + if (error === E_CANCELED) { + // Content assist will perform a full-text update anyways. + log.debug('Full text update cancelled'); + return; + } log.error('Unexpected error during initial update', error); }); } @@ -106,6 +134,8 @@ export default class UpdateService { if (setDirtyChangesEffect) { const { value } = setDirtyChangesEffect; if (this.pendingUpdate !== undefined) { + // Do not clear `pendingUpdate`, because that would indicate an update failure + // to `withUpdateExclusive`. this.pendingUpdate = ChangeSet.empty(value.length); } this.dirtyChanges = value; @@ -113,7 +143,7 @@ export default class UpdateService { } if (transaction.docChanged) { this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); - this.idleUpdateTimer.reschedule(); + this.idleUpdateLater(); } } @@ -132,34 +162,42 @@ export default class UpdateService { ); } - private handleIdleUpdate(): void { + private idleUpdate(): void { if (!this.webSocketClient.isOpen || this.dirtyChanges.empty) { return; } - if (this.pendingUpdate === undefined) { + if (!this.mutex.isLocked()) { this.update().catch((error) => { + if (error === E_CANCELED || error === E_TIMEOUT) { + log.debug('Idle update cancelled'); + return; + } log.error('Unexpected error during scheduled update', error); }); } - this.idleUpdateTimer.reschedule(); + this.idleUpdateLater(); } private newEmptyChangeSet(): ChangeSet { return ChangeSet.of([], this.store.state.doc.length); } - async updateFullText(): Promise { - await this.withUpdate(() => this.doUpdateFullText()); + private updateFullText(): Promise { + return this.runExclusive(() => this.updateFullTextExclusive()); } - private async doUpdateFullText(): Promise> { + private async updateFullTextExclusive(): Promise { + await this.withVoidUpdateExclusive(() => this.doUpdateFullTextExclusive()); + } + + private async doUpdateFullTextExclusive(): Promise { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'update', fullText: this.store.state.doc.sliceString(0), }); const { stateId } = DocumentStateResult.parse(result); - return { newStateId: stateId, data: undefined }; + return stateId; } /** @@ -171,14 +209,26 @@ export default class UpdateService { * * @returns a promise resolving when the update is completed */ - async update(): Promise { - await this.prepareForDeltaUpdate(); + private async update(): Promise { + // We may check here for the delta to avoid locking, + // but we'll need to recompute the delta in the critical section, + // because it may have changed by the time we can acquire the lock. + if (this.dirtyChanges.empty) { + return; + } + await this.runExclusive(() => this.updateExclusive()); + } + + private async updateExclusive(): Promise { + if (this.xtextStateId === undefined) { + await this.updateFullTextExclusive(); + } const delta = this.computeDelta(); if (delta === undefined) { return; } log.trace('Editor delta', delta); - await this.withUpdate(async () => { + await this.withVoidUpdateExclusive(async () => { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'update', @@ -187,34 +237,98 @@ export default class UpdateService { }); const parsedDocumentStateResult = DocumentStateResult.safeParse(result); if (parsedDocumentStateResult.success) { - return { - newStateId: parsedDocumentStateResult.data.stateId, - data: undefined, - }; + return parsedDocumentStateResult.data.stateId; } if (isConflictResult(result, 'invalidStateId')) { - return this.doFallbackToUpdateFullText(); + return this.doFallbackUpdateFullTextExclusive(); } throw parsedDocumentStateResult.error; }); } - private doFallbackToUpdateFullText(): Promise> { - if (this.pendingUpdate === undefined) { - throw new Error('Only a pending update can be extended'); + async fetchOccurrences( + getCaretOffset: () => CancellableResult, + ): Promise> { + try { + await this.update(); + } catch (error) { + if (error === E_CANCELED || error === E_TIMEOUT) { + return { cancelled: true }; + } + throw error; } - log.warn('Delta update failed, performing full text update'); - this.xtextStateId = undefined; - this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); - this.dirtyChanges = this.newEmptyChangeSet(); - return this.doUpdateFullText(); + if (!this.dirtyChanges.empty || this.mutex.isLocked()) { + // Just give up if another update is in progress. + return { cancelled: true }; + } + const caretOffsetResult = getCaretOffset(); + if (caretOffsetResult.cancelled) { + return { cancelled: true }; + } + const expectedStateId = this.xtextStateId; + const data = await this.webSocketClient.send({ + resource: this.resourceName, + serviceType: 'occurrences', + caretOffset: caretOffsetResult.data, + expectedStateId, + }); + if ( + // The query must have reached the server without being conflicted with an update + // or cancelled server-side. + isConflictResult(data) || + // And no state update should have occurred since then. + this.xtextStateId !== expectedStateId || + // And there should be no change to the editor text since then. + !this.dirtyChanges.empty || + // And there should be no state update in progress. + this.mutex.isLocked() + ) { + return { cancelled: true }; + } + const parsedOccurrencesResult = OccurrencesResult.safeParse(data); + if (!parsedOccurrencesResult.success) { + log.error( + 'Unexpected occurences result', + data, + 'not an OccurrencesResult:', + parsedOccurrencesResult.error, + ); + throw parsedOccurrencesResult.error; + } + if (parsedOccurrencesResult.data.stateId !== expectedStateId) { + return { cancelled: true }; + } + return { cancelled: false, data: parsedOccurrencesResult.data }; } async fetchContentAssist( - params: Record, - signal: IAbortSignal, + params: ContentAssistParams, + signal: AbortSignal, + ): Promise { + if (!this.mutex.isLocked && this.xtextStateId !== undefined) { + return this.fetchContentAssistFetchOnly(params, this.xtextStateId); + } + // Content assist updates should have priority over other updates. + this.mutex.cancel(); + try { + return await this.runExclusive(() => + this.fetchContentAssistExclusive(params, signal), + ); + } catch (error) { + if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { + return []; + } + throw error; + } + } + + private async fetchContentAssistExclusive( + params: ContentAssistParams, + signal: AbortSignal, ): Promise { - await this.prepareForDeltaUpdate(); + if (this.xtextStateId === undefined) { + await this.updateFullTextExclusive(); + } if (signal.aborted) { return []; } @@ -222,8 +336,8 @@ export default class UpdateService { if (delta !== undefined) { log.trace('Editor delta', delta); // Try to fetch while also performing a delta update. - const fetchUpdateEntries = await this.withUpdate(() => - this.doFetchContentAssistWithDelta(params, delta), + const fetchUpdateEntries = await this.withUpdateExclusive(() => + this.doFetchContentAssistWithDeltaExclusive(params, delta), ); if (fetchUpdateEntries !== undefined) { return fetchUpdateEntries; @@ -235,15 +349,17 @@ export default class UpdateService { if (this.xtextStateId === undefined) { throw new Error('failed to obtain Xtext state id'); } - return this.doFetchContentAssistFetchOnly(params, this.xtextStateId); + return this.fetchContentAssistFetchOnly(params, this.xtextStateId); } - private async doFetchContentAssistWithDelta( - params: Record, + private async doFetchContentAssistWithDeltaExclusive( + params: ContentAssistParams, delta: Delta, ): Promise> { const fetchUpdateResult = await this.webSocketClient.send({ ...params, + resource: this.resourceName, + serviceType: 'assist', requiredStateId: this.xtextStateId, ...delta, }); @@ -256,7 +372,7 @@ export default class UpdateService { } if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { log.warn('Server state invalid during content assist'); - const { newStateId } = await this.doFallbackToUpdateFullText(); + const newStateId = await this.doFallbackUpdateFullTextExclusive(); // We must finish this state update transaction to prepare for any push events // before querying for content assist, so we just return `undefined` and will query // the content assist service later. @@ -265,14 +381,16 @@ export default class UpdateService { throw parsedContentAssistResult.error; } - private async doFetchContentAssistFetchOnly( - params: Record, + private async fetchContentAssistFetchOnly( + params: ContentAssistParams, requiredStateId: string, ): Promise { // Fallback to fetching without a delta update. const fetchOnlyResult = await this.webSocketClient.send({ ...params, - requiredStateId: this.xtextStateId, + resource: this.resourceName, + serviceType: 'assist', + requiredStateId, }); const { stateId, entries: fetchOnlyEntries } = ContentAssistResult.parse(fetchOnlyResult); @@ -285,14 +403,32 @@ export default class UpdateService { } async formatText(): Promise { - await this.update(); + let retries = 0; + while (retries < FORMAT_TEXT_RETRIES) { + try { + // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. + await this.runExclusive(() => this.formatTextExclusive()); + return; + } catch (error) { + // Let timeout errors propagate to give up formatting on a flaky connection. + if (error === E_CANCELED && retries < FORMAT_TEXT_RETRIES) { + retries += 1; + } else { + throw error; + } + } + } + } + + private async formatTextExclusive(): Promise { + await this.updateExclusive(); let { from, to } = this.store.state.selection.main; if (to <= from) { from = 0; to = this.store.state.doc.length; } log.debug('Formatting from', from, 'to', to); - await this.withUpdate(async () => { + await this.withVoidUpdateExclusive(async () => { const result = await this.webSocketClient.send({ resource: this.resourceName, serviceType: 'format', @@ -305,7 +441,7 @@ export default class UpdateService { to, insert: formattedText, }); - return { newStateId: stateId, data: undefined }; + return stateId; }); } @@ -345,6 +481,28 @@ export default class UpdateService { }); } + private runExclusive(callback: () => Promise): Promise { + return this.mutex.runExclusive(async () => { + if (this.pendingUpdate !== undefined) { + throw new Error('Update is pending before entering critical section'); + } + const result = await callback(); + if (this.pendingUpdate !== undefined) { + throw new Error('Update is pending after entering critical section'); + } + return result; + }); + } + + private withVoidUpdateExclusive( + callback: () => Promise, + ): Promise { + return this.withUpdateExclusive(async () => { + const newStateId = await callback(); + return { newStateId, data: undefined }; + }); + } + /** * Executes an asynchronous callback that updates the state on the server. * @@ -366,20 +524,18 @@ export default class UpdateService { * @param callback the asynchronous callback that updates the server state * @returns a promise resolving to the second value returned by `callback` */ - private async withUpdate( + private async withUpdateExclusive( callback: () => Promise>, ): Promise { if (this.pendingUpdate !== undefined) { - throw new Error('Another update is pending, will not perform update'); + throw new Error('Delta updates are not reentrant'); } this.pendingUpdate = this.dirtyChanges; this.dirtyChanges = this.newEmptyChangeSet(); + let data: T; try { - const { newStateId, data } = await callback(); - this.xtextStateId = newStateId; + ({ newStateId: this.xtextStateId, data } = await callback()); this.pendingUpdate = undefined; - this.updatedCondition.notifyAll(); - return data; } catch (e) { log.error('Error while update', e); if (this.pendingUpdate === undefined) { @@ -389,25 +545,19 @@ export default class UpdateService { } this.pendingUpdate = undefined; this.webSocketClient.forceReconnectOnError(); - this.updatedCondition.rejectAll(e); throw e; } + return data; } - /** - * Ensures that there is some state available on the server (`xtextStateId`) - * and that there is no pending update. - * - * After this function resolves, a delta text update is possible. - * - * @returns a promise resolving when there is a valid state id but no pending update - */ - private async prepareForDeltaUpdate(): Promise { - // If no update is pending, but the full text hasn't been uploaded to the server yet, - // we must start a full text upload. - if (this.pendingUpdate === undefined && this.xtextStateId === undefined) { - await this.updateFullText(); + private doFallbackUpdateFullTextExclusive(): Promise { + if (this.pendingUpdate === undefined) { + throw new Error('Only a pending update can be extended'); } - await this.updatedCondition.waitFor(); + log.warn('Delta update failed, performing full text update'); + this.xtextStateId = undefined; + this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); + this.dirtyChanges = this.newEmptyChangeSet(); + return this.doUpdateFullTextExclusive(); } } diff --git a/subprojects/frontend/src/xtext/ValidationService.ts b/subprojects/frontend/src/xtext/ValidationService.ts index a0b27251..e78318f7 100644 --- a/subprojects/frontend/src/xtext/ValidationService.ts +++ b/subprojects/frontend/src/xtext/ValidationService.ts @@ -6,14 +6,10 @@ import type UpdateService from './UpdateService'; import { ValidationResult } from './xtextServiceResults'; export default class ValidationService { - private readonly store: EditorStore; - - private readonly updateService: UpdateService; - - constructor(store: EditorStore, updateService: UpdateService) { - this.store = store; - this.updateService = updateService; - } + constructor( + private readonly store: EditorStore, + private readonly updateService: UpdateService, + ) {} onPush(push: unknown): void { const { issues } = ValidationResult.parse(push); diff --git a/subprojects/frontend/src/xtext/XtextClient.ts b/subprojects/frontend/src/xtext/XtextClient.ts index 7297c674..6351c9fd 100644 --- a/subprojects/frontend/src/xtext/XtextClient.ts +++ b/subprojects/frontend/src/xtext/XtextClient.ts @@ -43,11 +43,7 @@ export default class XtextClient { this.updateService, ); this.validationService = new ValidationService(store, this.updateService); - this.occurrencesService = new OccurrencesService( - store, - this.webSocketClient, - this.updateService, - ); + this.occurrencesService = new OccurrencesService(store, this.updateService); } onTransaction(transaction: Transaction): void { diff --git a/subprojects/frontend/src/xtext/xtextServiceResults.ts b/subprojects/frontend/src/xtext/xtextServiceResults.ts index 4cfb9c33..e93c6714 100644 --- a/subprojects/frontend/src/xtext/xtextServiceResults.ts +++ b/subprojects/frontend/src/xtext/xtextServiceResults.ts @@ -26,12 +26,13 @@ export type ServiceConflictResult = z.infer; export function isConflictResult( result: unknown, - conflictType: Conflict, + conflictType?: Conflict | undefined, ): boolean { const parsedConflictResult = ServiceConflictResult.safeParse(result); return ( parsedConflictResult.success && - parsedConflictResult.data.conflict === conflictType + (conflictType === undefined || + parsedConflictResult.data.conflict === conflictType) ); } -- cgit v1.2.3-54-g00ecf