aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/frontend/src/xtext
diff options
context:
space:
mode:
authorLibravatar Kristóf Marussy <kristof@marussy.com>2022-08-25 20:03:41 +0200
committerLibravatar Kristóf Marussy <kristof@marussy.com>2022-08-25 21:54:27 +0200
commitd774c3d2c4fc5948483438d8304af5baa6bb7a91 (patch)
treea2d7a7704dabb806e3b4578abc90b143ca668080 /subprojects/frontend/src/xtext
parentfix(frontend): UpdateService synchronization (diff)
downloadrefinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.tar.gz
refinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.tar.zst
refinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.zip
refactor(frontend): extract xtextStateId tracking
Diffstat (limited to 'subprojects/frontend/src/xtext')
-rw-r--r--subprojects/frontend/src/xtext/UpdateService.ts470
-rw-r--r--subprojects/frontend/src/xtext/UpdateStateTracker.ts357
2 files changed, 495 insertions, 332 deletions
diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts
index 3b4ae259..94e01ca2 100644
--- a/subprojects/frontend/src/xtext/UpdateService.ts
+++ b/subprojects/frontend/src/xtext/UpdateService.ts
@@ -1,17 +1,16 @@
1import { 1import type { ChangeDesc, Transaction } from '@codemirror/state';
2 type ChangeDesc, 2import { E_CANCELED, E_TIMEOUT } from 'async-mutex';
3 ChangeSet,
4 type ChangeSpec,
5 StateEffect,
6 type Transaction,
7} from '@codemirror/state';
8import { E_CANCELED, E_TIMEOUT, Mutex, withTimeout } from 'async-mutex';
9import { debounce } from 'lodash-es'; 3import { debounce } from 'lodash-es';
10import { nanoid } from 'nanoid'; 4import { nanoid } from 'nanoid';
11 5
12import type EditorStore from '../editor/EditorStore'; 6import type EditorStore from '../editor/EditorStore';
13import getLogger from '../utils/getLogger'; 7import getLogger from '../utils/getLogger';
14 8
9import UpdateStateTracker, {
10 type LockedState,
11 type PendingUpdate,
12} from './UpdateStateTracker';
13import type { StateUpdateResult, Delta } from './UpdateStateTracker';
15import type XtextWebSocketClient from './XtextWebSocketClient'; 14import type XtextWebSocketClient from './XtextWebSocketClient';
16import { 15import {
17 type ContentAssistEntry, 16 type ContentAssistEntry,
@@ -24,98 +23,51 @@ import {
24 23
25const UPDATE_TIMEOUT_MS = 500; 24const UPDATE_TIMEOUT_MS = 500;
26 25
27const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000;
28
29const FORMAT_TEXT_RETRIES = 5;
30
31const log = getLogger('xtext.UpdateService'); 26const log = getLogger('xtext.UpdateService');
32 27
33/**
34 * State effect used to override the dirty changes after a transaction.
35 *
36 * If this state effect is _not_ present in a transaction,
37 * the transaction will be appended to the current dirty changes.
38 *
39 * If this state effect is present, the current dirty changes will be replaced
40 * by the value of this effect.
41 */
42const setDirtyChanges = StateEffect.define<ChangeSet>();
43
44export interface AbortSignal { 28export interface AbortSignal {
45 aborted: boolean; 29 aborted: boolean;
46} 30}
47 31
48export interface ContentAssistParams {
49 caretOffset: number;
50
51 proposalsLimit: number;
52}
53
54export type CancellableResult<T> = 32export type CancellableResult<T> =
55 | { cancelled: false; data: T } 33 | { cancelled: false; data: T }
56 | { cancelled: true }; 34 | { cancelled: true };
57 35
58interface StateUpdateResult<T> { 36export interface ContentAssistParams {
59 newStateId: string; 37 caretOffset: number;
60
61 data: T;
62}
63
64interface Delta {
65 deltaOffset: number;
66
67 deltaReplaceLength: number;
68 38
69 deltaText: string; 39 proposalsLimit: number;
70} 40}
71 41
72export default class UpdateService { 42export default class UpdateService {
73 readonly resourceName: string; 43 readonly resourceName: string;
74 44
75 xtextStateId: string | undefined; 45 private readonly tracker: UpdateStateTracker;
76
77 private readonly store: EditorStore;
78
79 private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS);
80
81 /**
82 * The changes being synchronized to the server if a full or delta text update is running
83 * withing a `withUpdateExclusive` block, `undefined` otherwise.
84 *
85 * Must be `undefined` before and after entering the critical section of `mutex`
86 * and may only be changes in the critical section of `mutex`.
87 *
88 * Methods named with an `Exclusive` suffix in this class assume that the mutex is held
89 * and may call `withUpdateExclusive` or `doFallbackUpdateFullTextExclusive`
90 * to mutate this field.
91 *
92 * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive`
93 * block and require this field to be non-`undefined`.
94 */
95 private pendingUpdate: ChangeSet | undefined;
96
97 /**
98 * Local changes not yet sychronized to the server and not part of the running update, if any.
99 */
100 private dirtyChanges: ChangeSet;
101
102 private readonly webSocketClient: XtextWebSocketClient;
103 46
104 private readonly idleUpdateLater = debounce( 47 private readonly idleUpdateLater = debounce(
105 () => this.idleUpdate(), 48 () => this.idleUpdate(),
106 UPDATE_TIMEOUT_MS, 49 UPDATE_TIMEOUT_MS,
107 ); 50 );
108 51
109 constructor(store: EditorStore, webSocketClient: XtextWebSocketClient) { 52 constructor(
53 private readonly store: EditorStore,
54 private readonly webSocketClient: XtextWebSocketClient,
55 ) {
110 this.resourceName = `${nanoid(7)}.problem`; 56 this.resourceName = `${nanoid(7)}.problem`;
111 this.store = store; 57 this.tracker = new UpdateStateTracker(store);
112 this.dirtyChanges = this.newEmptyChangeSet(); 58 }
113 this.webSocketClient = webSocketClient; 59
60 get xtextStateId(): string | undefined {
61 return this.tracker.xtextStateId;
62 }
63
64 computeChangesSinceLastUpdate(): ChangeDesc {
65 return this.tracker.computeChangesSinceLastUpdate();
114 } 66 }
115 67
116 onReconnect(): void { 68 onReconnect(): void {
117 this.xtextStateId = undefined; 69 this.tracker.invalidateStateId();
118 this.updateFullText().catch((error) => { 70 this.updateFullTextOrThrow().catch((error) => {
119 // Let E_TIMEOUT errors propagate, since if the first update times out, 71 // Let E_TIMEOUT errors propagate, since if the first update times out,
120 // we can't use the connection. 72 // we can't use the connection.
121 if (error === E_CANCELED) { 73 if (error === E_CANCELED) {
@@ -128,46 +80,17 @@ export default class UpdateService {
128 } 80 }
129 81
130 onTransaction(transaction: Transaction): void { 82 onTransaction(transaction: Transaction): void {
131 const setDirtyChangesEffect = transaction.effects.find((effect) => 83 if (this.tracker.onTransaction(transaction)) {
132 effect.is(setDirtyChanges),
133 ) as StateEffect<ChangeSet> | undefined;
134 if (setDirtyChangesEffect) {
135 const { value } = setDirtyChangesEffect;
136 if (this.pendingUpdate !== undefined) {
137 // Do not clear `pendingUpdate`, because that would indicate an update failure
138 // to `withUpdateExclusive`.
139 this.pendingUpdate = ChangeSet.empty(value.length);
140 }
141 this.dirtyChanges = value;
142 return;
143 }
144 if (transaction.docChanged) {
145 this.dirtyChanges = this.dirtyChanges.compose(transaction.changes);
146 this.idleUpdateLater(); 84 this.idleUpdateLater();
147 } 85 }
148 } 86 }
149 87
150 /**
151 * Computes the summary of any changes happened since the last complete update.
152 *
153 * The result reflects any changes that happened since the `xtextStateId`
154 * version was uploaded to the server.
155 *
156 * @returns the summary of changes since the last update
157 */
158 computeChangesSinceLastUpdate(): ChangeDesc {
159 return (
160 this.pendingUpdate?.composeDesc(this.dirtyChanges.desc) ??
161 this.dirtyChanges.desc
162 );
163 }
164
165 private idleUpdate(): void { 88 private idleUpdate(): void {
166 if (!this.webSocketClient.isOpen || this.dirtyChanges.empty) { 89 if (!this.webSocketClient.isOpen || !this.tracker.hasDirtyChanges) {
167 return; 90 return;
168 } 91 }
169 if (!this.mutex.isLocked()) { 92 if (!this.tracker.locekdForUpdate) {
170 this.update().catch((error) => { 93 this.updateOrThrow().catch((error) => {
171 if (error === E_CANCELED || error === E_TIMEOUT) { 94 if (error === E_CANCELED || error === E_TIMEOUT) {
172 log.debug('Idle update cancelled'); 95 log.debug('Idle update cancelled');
173 return; 96 return;
@@ -178,28 +101,6 @@ export default class UpdateService {
178 this.idleUpdateLater(); 101 this.idleUpdateLater();
179 } 102 }
180 103
181 private newEmptyChangeSet(): ChangeSet {
182 return ChangeSet.of([], this.store.state.doc.length);
183 }
184
185 private updateFullText(): Promise<void> {
186 return this.runExclusive(() => this.updateFullTextExclusive());
187 }
188
189 private async updateFullTextExclusive(): Promise<void> {
190 await this.withVoidUpdateExclusive(() => this.doUpdateFullTextExclusive());
191 }
192
193 private async doUpdateFullTextExclusive(): Promise<string> {
194 const result = await this.webSocketClient.send({
195 resource: this.resourceName,
196 serviceType: 'update',
197 fullText: this.store.state.doc.sliceString(0),
198 });
199 const { stateId } = DocumentStateResult.parse(result);
200 return stateId;
201 }
202
203 /** 104 /**
204 * Makes sure that the document state on the server reflects recent 105 * Makes sure that the document state on the server reflects recent
205 * local changes. 106 * local changes.
@@ -209,26 +110,34 @@ export default class UpdateService {
209 * 110 *
210 * @returns a promise resolving when the update is completed 111 * @returns a promise resolving when the update is completed
211 */ 112 */
212 private async update(): Promise<void> { 113 private async updateOrThrow(): Promise<void> {
213 // We may check here for the delta to avoid locking, 114 // We may check here for the delta to avoid locking,
214 // but we'll need to recompute the delta in the critical section, 115 // but we'll need to recompute the delta in the critical section,
215 // because it may have changed by the time we can acquire the lock. 116 // because it may have changed by the time we can acquire the lock.
216 if (this.dirtyChanges.empty) { 117 if (
118 !this.tracker.hasDirtyChanges &&
119 this.tracker.xtextStateId !== undefined
120 ) {
217 return; 121 return;
218 } 122 }
219 await this.runExclusive(() => this.updateExclusive()); 123 await this.tracker.runExclusive((lockedState) =>
124 this.updateExclusive(lockedState),
125 );
220 } 126 }
221 127
222 private async updateExclusive(): Promise<void> { 128 private async updateExclusive(lockedState: LockedState): Promise<void> {
223 if (this.xtextStateId === undefined) { 129 if (this.xtextStateId === undefined) {
224 await this.updateFullTextExclusive(); 130 await this.updateFullTextExclusive(lockedState);
225 } 131 }
226 const delta = this.computeDelta(); 132 if (!this.tracker.hasDirtyChanges) {
227 if (delta === undefined) {
228 return; 133 return;
229 } 134 }
230 log.trace('Editor delta', delta); 135 await lockedState.updateExclusive(async (pendingUpdate) => {
231 await this.withVoidUpdateExclusive(async () => { 136 const delta = pendingUpdate.prepareDeltaUpdateExclusive();
137 if (delta === undefined) {
138 return undefined;
139 }
140 log.trace('Editor delta', delta);
232 const result = await this.webSocketClient.send({ 141 const result = await this.webSocketClient.send({
233 resource: this.resourceName, 142 resource: this.resourceName,
234 serviceType: 'update', 143 serviceType: 'update',
@@ -240,79 +149,50 @@ export default class UpdateService {
240 return parsedDocumentStateResult.data.stateId; 149 return parsedDocumentStateResult.data.stateId;
241 } 150 }
242 if (isConflictResult(result, 'invalidStateId')) { 151 if (isConflictResult(result, 'invalidStateId')) {
243 return this.doFallbackUpdateFullTextExclusive(); 152 return this.doUpdateFullTextExclusive(pendingUpdate);
244 } 153 }
245 throw parsedDocumentStateResult.error; 154 throw parsedDocumentStateResult.error;
246 }); 155 });
247 } 156 }
248 157
249 async fetchOccurrences( 158 private updateFullTextOrThrow(): Promise<void> {
250 getCaretOffset: () => CancellableResult<number>, 159 return this.tracker.runExclusive((lockedState) =>
251 ): Promise<CancellableResult<OccurrencesResult>> { 160 this.updateFullTextExclusive(lockedState),
252 try { 161 );
253 await this.update(); 162 }
254 } catch (error) { 163
255 if (error === E_CANCELED || error === E_TIMEOUT) { 164 private async updateFullTextExclusive(
256 return { cancelled: true }; 165 lockedState: LockedState,
257 } 166 ): Promise<void> {
258 throw error; 167 await lockedState.updateExclusive((pendingUpdate) =>
259 } 168 this.doUpdateFullTextExclusive(pendingUpdate),
260 if (!this.dirtyChanges.empty || this.mutex.isLocked()) { 169 );
261 // Just give up if another update is in progress. 170 }
262 return { cancelled: true }; 171
263 } 172 private async doUpdateFullTextExclusive(
264 const caretOffsetResult = getCaretOffset(); 173 pendingUpdate: PendingUpdate,
265 if (caretOffsetResult.cancelled) { 174 ): Promise<string> {
266 return { cancelled: true }; 175 log.debug('Performing full text update');
267 } 176 pendingUpdate.extendPendingUpdateExclusive();
268 const expectedStateId = this.xtextStateId; 177 const result = await this.webSocketClient.send({
269 const data = await this.webSocketClient.send({
270 resource: this.resourceName, 178 resource: this.resourceName,
271 serviceType: 'occurrences', 179 serviceType: 'update',
272 caretOffset: caretOffsetResult.data, 180 fullText: this.store.state.doc.sliceString(0),
273 expectedStateId,
274 }); 181 });
275 if ( 182 const { stateId } = DocumentStateResult.parse(result);
276 // The query must have reached the server without being conflicted with an update 183 return stateId;
277 // or cancelled server-side.
278 isConflictResult(data) ||
279 // And no state update should have occurred since then.
280 this.xtextStateId !== expectedStateId ||
281 // And there should be no change to the editor text since then.
282 !this.dirtyChanges.empty ||
283 // And there should be no state update in progress.
284 this.mutex.isLocked()
285 ) {
286 return { cancelled: true };
287 }
288 const parsedOccurrencesResult = OccurrencesResult.safeParse(data);
289 if (!parsedOccurrencesResult.success) {
290 log.error(
291 'Unexpected occurences result',
292 data,
293 'not an OccurrencesResult:',
294 parsedOccurrencesResult.error,
295 );
296 throw parsedOccurrencesResult.error;
297 }
298 if (parsedOccurrencesResult.data.stateId !== expectedStateId) {
299 return { cancelled: true };
300 }
301 return { cancelled: false, data: parsedOccurrencesResult.data };
302 } 184 }
303 185
304 async fetchContentAssist( 186 async fetchContentAssist(
305 params: ContentAssistParams, 187 params: ContentAssistParams,
306 signal: AbortSignal, 188 signal: AbortSignal,
307 ): Promise<ContentAssistEntry[]> { 189 ): Promise<ContentAssistEntry[]> {
308 if (!this.mutex.isLocked && this.xtextStateId !== undefined) { 190 if (this.tracker.upToDate && this.xtextStateId !== undefined) {
309 return this.fetchContentAssistFetchOnly(params, this.xtextStateId); 191 return this.fetchContentAssistFetchOnly(params, this.xtextStateId);
310 } 192 }
311 // Content assist updates should have priority over other updates.
312 this.mutex.cancel();
313 try { 193 try {
314 return await this.runExclusive(() => 194 return await this.tracker.runExclusiveHighPriority((lockedState) =>
315 this.fetchContentAssistExclusive(params, signal), 195 this.fetchContentAssistExclusive(params, lockedState, signal),
316 ); 196 );
317 } catch (error) { 197 } catch (error) {
318 if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { 198 if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) {
@@ -324,20 +204,30 @@ export default class UpdateService {
324 204
325 private async fetchContentAssistExclusive( 205 private async fetchContentAssistExclusive(
326 params: ContentAssistParams, 206 params: ContentAssistParams,
207 lockedState: LockedState,
327 signal: AbortSignal, 208 signal: AbortSignal,
328 ): Promise<ContentAssistEntry[]> { 209 ): Promise<ContentAssistEntry[]> {
329 if (this.xtextStateId === undefined) { 210 if (this.xtextStateId === undefined) {
330 await this.updateFullTextExclusive(); 211 await this.updateFullTextExclusive(lockedState);
331 } 212 }
332 if (signal.aborted) { 213 if (signal.aborted) {
333 return []; 214 return [];
334 } 215 }
335 const delta = this.computeDelta(); 216 if (this.tracker.hasDirtyChanges) {
336 if (delta !== undefined) {
337 log.trace('Editor delta', delta);
338 // Try to fetch while also performing a delta update. 217 // Try to fetch while also performing a delta update.
339 const fetchUpdateEntries = await this.withUpdateExclusive(() => 218 const fetchUpdateEntries = await lockedState.withUpdateExclusive(
340 this.doFetchContentAssistWithDeltaExclusive(params, delta), 219 async (pendingUpdate) => {
220 const delta = pendingUpdate.prepareDeltaUpdateExclusive();
221 if (delta === undefined) {
222 return { newStateId: undefined, data: undefined };
223 }
224 log.trace('Editor delta', delta);
225 return this.doFetchContentAssistWithDeltaExclusive(
226 params,
227 pendingUpdate,
228 delta,
229 );
230 },
341 ); 231 );
342 if (fetchUpdateEntries !== undefined) { 232 if (fetchUpdateEntries !== undefined) {
343 return fetchUpdateEntries; 233 return fetchUpdateEntries;
@@ -354,6 +244,7 @@ export default class UpdateService {
354 244
355 private async doFetchContentAssistWithDeltaExclusive( 245 private async doFetchContentAssistWithDeltaExclusive(
356 params: ContentAssistParams, 246 params: ContentAssistParams,
247 pendingUpdate: PendingUpdate,
357 delta: Delta, 248 delta: Delta,
358 ): Promise<StateUpdateResult<ContentAssistEntry[] | undefined>> { 249 ): Promise<StateUpdateResult<ContentAssistEntry[] | undefined>> {
359 const fetchUpdateResult = await this.webSocketClient.send({ 250 const fetchUpdateResult = await this.webSocketClient.send({
@@ -372,7 +263,7 @@ export default class UpdateService {
372 } 263 }
373 if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { 264 if (isConflictResult(fetchUpdateResult, 'invalidStateId')) {
374 log.warn('Server state invalid during content assist'); 265 log.warn('Server state invalid during content assist');
375 const newStateId = await this.doFallbackUpdateFullTextExclusive(); 266 const newStateId = await this.doUpdateFullTextExclusive(pendingUpdate);
376 // We must finish this state update transaction to prepare for any push events 267 // We must finish this state update transaction to prepare for any push events
377 // before querying for content assist, so we just return `undefined` and will query 268 // before querying for content assist, so we just return `undefined` and will query
378 // the content assist service later. 269 // the content assist service later.
@@ -402,33 +293,21 @@ export default class UpdateService {
402 return fetchOnlyEntries; 293 return fetchOnlyEntries;
403 } 294 }
404 295
405 async formatText(): Promise<void> { 296 formatText(): Promise<void> {
406 let retries = 0; 297 return this.tracker.runExclusiveWithRetries((lockedState) =>
407 while (retries < FORMAT_TEXT_RETRIES) { 298 this.formatTextExclusive(lockedState),
408 try { 299 );
409 // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries.
410 await this.runExclusive(() => this.formatTextExclusive());
411 return;
412 } catch (error) {
413 // Let timeout errors propagate to give up formatting on a flaky connection.
414 if (error === E_CANCELED && retries < FORMAT_TEXT_RETRIES) {
415 retries += 1;
416 } else {
417 throw error;
418 }
419 }
420 }
421 } 300 }
422 301
423 private async formatTextExclusive(): Promise<void> { 302 private async formatTextExclusive(lockedState: LockedState): Promise<void> {
424 await this.updateExclusive(); 303 await this.updateExclusive(lockedState);
425 let { from, to } = this.store.state.selection.main; 304 let { from, to } = this.store.state.selection.main;
426 if (to <= from) { 305 if (to <= from) {
427 from = 0; 306 from = 0;
428 to = this.store.state.doc.length; 307 to = this.store.state.doc.length;
429 } 308 }
430 log.debug('Formatting from', from, 'to', to); 309 log.debug('Formatting from', from, 'to', to);
431 await this.withVoidUpdateExclusive(async () => { 310 await lockedState.updateExclusive(async (pendingUpdate) => {
432 const result = await this.webSocketClient.send({ 311 const result = await this.webSocketClient.send({
433 resource: this.resourceName, 312 resource: this.resourceName,
434 serviceType: 'format', 313 serviceType: 'format',
@@ -436,7 +315,7 @@ export default class UpdateService {
436 selectionEnd: to, 315 selectionEnd: to,
437 }); 316 });
438 const { stateId, formattedText } = FormattingResult.parse(result); 317 const { stateId, formattedText } = FormattingResult.parse(result);
439 this.applyBeforeDirtyChanges({ 318 pendingUpdate.applyBeforeDirtyChangesExclusive({
440 from, 319 from,
441 to, 320 to,
442 insert: formattedText, 321 insert: formattedText,
@@ -445,119 +324,46 @@ export default class UpdateService {
445 }); 324 });
446 } 325 }
447 326
448 private computeDelta(): Delta | undefined { 327 async fetchOccurrences(
449 if (this.dirtyChanges.empty) { 328 getCaretOffset: () => CancellableResult<number>,
450 return undefined; 329 ): Promise<CancellableResult<OccurrencesResult>> {
451 }
452 let minFromA = Number.MAX_SAFE_INTEGER;
453 let maxToA = 0;
454 let minFromB = Number.MAX_SAFE_INTEGER;
455 let maxToB = 0;
456 this.dirtyChanges.iterChangedRanges((fromA, toA, fromB, toB) => {
457 minFromA = Math.min(minFromA, fromA);
458 maxToA = Math.max(maxToA, toA);
459 minFromB = Math.min(minFromB, fromB);
460 maxToB = Math.max(maxToB, toB);
461 });
462 return {
463 deltaOffset: minFromA,
464 deltaReplaceLength: maxToA - minFromA,
465 deltaText: this.store.state.doc.sliceString(minFromB, maxToB),
466 };
467 }
468
469 private applyBeforeDirtyChanges(changeSpec: ChangeSpec): void {
470 const pendingChanges =
471 this.pendingUpdate?.compose(this.dirtyChanges) ?? this.dirtyChanges;
472 const revertChanges = pendingChanges.invert(this.store.state.doc);
473 const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength);
474 const redoChanges = pendingChanges.map(applyBefore.desc);
475 const changeSet = revertChanges.compose(applyBefore).compose(redoChanges);
476 this.store.dispatch({
477 changes: changeSet,
478 // Keep the current set of dirty changes (but update them according the re-formatting)
479 // and to not add the formatting the dirty changes.
480 effects: [setDirtyChanges.of(redoChanges)],
481 });
482 }
483
484 private runExclusive<T>(callback: () => Promise<T>): Promise<T> {
485 return this.mutex.runExclusive(async () => {
486 if (this.pendingUpdate !== undefined) {
487 throw new Error('Update is pending before entering critical section');
488 }
489 const result = await callback();
490 if (this.pendingUpdate !== undefined) {
491 throw new Error('Update is pending after entering critical section');
492 }
493 return result;
494 });
495 }
496
497 private withVoidUpdateExclusive(
498 callback: () => Promise<string>,
499 ): Promise<void> {
500 return this.withUpdateExclusive<void>(async () => {
501 const newStateId = await callback();
502 return { newStateId, data: undefined };
503 });
504 }
505
506 /**
507 * Executes an asynchronous callback that updates the state on the server.
508 *
509 * Ensures that updates happen sequentially and manages `pendingUpdate`
510 * and `dirtyChanges` to reflect changes being synchronized to the server
511 * and not yet synchronized to the server, respectively.
512 *
513 * Optionally, `callback` may return a second value that is retured by this function.
514 *
515 * Once the remote procedure call to update the server state finishes
516 * and returns the new `stateId`, `callback` must return _immediately_
517 * to ensure that the local `stateId` is updated likewise to be able to handle
518 * push messages referring to the new `stateId` from the server.
519 * If additional work is needed to compute the second value in some cases,
520 * use `T | undefined` instead of `T` as a return type and signal the need for additional
521 * computations by returning `undefined`. Thus additional computations can be performed
522 * outside of the critical section.
523 *
524 * @param callback the asynchronous callback that updates the server state
525 * @returns a promise resolving to the second value returned by `callback`
526 */
527 private async withUpdateExclusive<T>(
528 callback: () => Promise<StateUpdateResult<T>>,
529 ): Promise<T> {
530 if (this.pendingUpdate !== undefined) {
531 throw new Error('Delta updates are not reentrant');
532 }
533 this.pendingUpdate = this.dirtyChanges;
534 this.dirtyChanges = this.newEmptyChangeSet();
535 let data: T;
536 try { 330 try {
537 ({ newStateId: this.xtextStateId, data } = await callback()); 331 await this.updateOrThrow();
538 this.pendingUpdate = undefined; 332 } catch (error) {
539 } catch (e) { 333 if (error === E_CANCELED || error === E_TIMEOUT) {
540 log.error('Error while update', e); 334 return { cancelled: true };
541 if (this.pendingUpdate === undefined) {
542 log.error('pendingUpdate was cleared during update');
543 } else {
544 this.dirtyChanges = this.pendingUpdate.compose(this.dirtyChanges);
545 } 335 }
546 this.pendingUpdate = undefined; 336 throw error;
547 this.webSocketClient.forceReconnectOnError();
548 throw e;
549 } 337 }
550 return data; 338 if (!this.tracker.upToDate) {
551 } 339 // Just give up if another update is in progress.
552 340 return { cancelled: true };
553 private doFallbackUpdateFullTextExclusive(): Promise<string> { 341 }
554 if (this.pendingUpdate === undefined) { 342 const caretOffsetResult = getCaretOffset();
555 throw new Error('Only a pending update can be extended'); 343 if (caretOffsetResult.cancelled) {
344 return { cancelled: true };
345 }
346 const expectedStateId = this.xtextStateId;
347 if (expectedStateId === undefined) {
348 // If there is no state on the server, don't bother with finding occurrences.
349 return { cancelled: true };
350 }
351 const data = await this.webSocketClient.send({
352 resource: this.resourceName,
353 serviceType: 'occurrences',
354 caretOffset: caretOffsetResult.data,
355 expectedStateId,
356 });
357 if (
358 isConflictResult(data) ||
359 this.tracker.hasChangesSince(expectedStateId)
360 ) {
361 return { cancelled: true };
362 }
363 const parsedOccurrencesResult = OccurrencesResult.parse(data);
364 if (parsedOccurrencesResult.stateId !== expectedStateId) {
365 return { cancelled: true };
556 } 366 }
557 log.warn('Delta update failed, performing full text update'); 367 return { cancelled: false, data: parsedOccurrencesResult };
558 this.xtextStateId = undefined;
559 this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges);
560 this.dirtyChanges = this.newEmptyChangeSet();
561 return this.doUpdateFullTextExclusive();
562 } 368 }
563} 369}
diff --git a/subprojects/frontend/src/xtext/UpdateStateTracker.ts b/subprojects/frontend/src/xtext/UpdateStateTracker.ts
new file mode 100644
index 00000000..04359060
--- /dev/null
+++ b/subprojects/frontend/src/xtext/UpdateStateTracker.ts
@@ -0,0 +1,357 @@
1/**
2 * @file State tracker for pushing updates to the Xtext server.
3 *
4 * This file implements complex logic to avoid missing or overwriting state updates
5 * and to avoid sending conflicting updates to the Xtext server.
6 *
7 * The `LockedState` and `PendingUpdate` objects are used as capabilities to
8 * signify whether the socket to the Xtext server is locked for updates and
9 * whether an update is in progress, respectively.
10 * Always use these objects only received as an argument of a lambda expression
11 * or method and never leak them into class field or global variables.
12 * The presence of such an objects in the scope should always imply that
13 * the corresponding condition holds.
14 */
15
16import {
17 type ChangeDesc,
18 ChangeSet,
19 type ChangeSpec,
20 StateEffect,
21 type Transaction,
22} from '@codemirror/state';
23import { E_CANCELED, Mutex, withTimeout } from 'async-mutex';
24
25import type EditorStore from '../editor/EditorStore';
26import getLogger from '../utils/getLogger';
27
28const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000;
29
30const log = getLogger('xtext.UpdateStateTracker');
31
32/**
33 * State effect used to override the dirty changes after a transaction.
34 *
35 * If this state effect is _not_ present in a transaction,
36 * the transaction will be appended to the current dirty changes.
37 *
38 * If this state effect is present, the current dirty changes will be replaced
39 * by the value of this effect.
40 */
41const setDirtyChanges = StateEffect.define<ChangeSet>();
42
43export interface StateUpdateResult<T> {
44 /** The new state ID on the server or `undefined` if no update was performed. */
45 newStateId: string | undefined;
46
47 /** Optional data payload received during the update. */
48 data: T;
49}
50
51/**
52 * Signifies a capability that the Xtext server state is locked for update.
53 */
54export interface LockedState {
55 /**
56 *
57 * @param callback the asynchronous callback that updates the server state
58 * @returns a promise resolving after the update
59 */
60 updateExclusive(
61 callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>,
62 ): Promise<void>;
63
64 /**
65 * Executes an asynchronous callback that updates the state on the server.
66 *
67 * If the callback returns `undefined` as the `newStateId`,
68 * the update is assumed to be aborted and any pending changes will be marked as dirt again.
69 * Any exceptions thrown in `callback` will also cause the update to be aborted.
70 *
71 * Ensures that updates happen sequentially and manages `pendingUpdate`
72 * and `dirtyChanges` to reflect changes being synchronized to the server
73 * and not yet synchronized to the server, respectively.
74 *
75 * Optionally, `callback` may return a second value that is retured by this function.
76 *
77 * Once the remote procedure call to update the server state finishes
78 * and returns the new `stateId`, `callback` must return _immediately_
79 * to ensure that the local `stateId` is updated likewise to be able to handle
80 * push messages referring to the new `stateId` from the server.
81 * If additional asynchronous work is needed to compute the second value in some cases,
82 * use `T | undefined` instead of `T` as a return type and signal the need for additional
83 * computations by returning `undefined`. Thus additional computations can be performed
84 * outside of the critical section.
85 *
86 * @param callback the asynchronous callback that updates the server state
87 * @returns a promise resolving to the second value returned by `callback`
88 */
89 withUpdateExclusive<T>(
90 callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>,
91 ): Promise<T>;
92}
93
94export interface Delta {
95 deltaOffset: number;
96
97 deltaReplaceLength: number;
98
99 deltaText: string;
100}
101
102/**
103 * Signifies a capability that dirty changes are being marked for uploading.
104 */
105export interface PendingUpdate {
106 prepareDeltaUpdateExclusive(): Delta | undefined;
107
108 extendPendingUpdateExclusive(): void;
109
110 applyBeforeDirtyChangesExclusive(changeSpec: ChangeSpec): void;
111}
112
113export default class UpdateStateTracker {
114 xtextStateId: string | undefined;
115
116 /**
117 * The changes being synchronized to the server if a full or delta text update is running
118 * withing a `withUpdateExclusive` block, `undefined` otherwise.
119 *
120 * Must be `undefined` before and after entering the critical section of `mutex`
121 * and may only be changes in the critical section of `mutex`.
122 *
123 * Methods named with an `Exclusive` suffix in this class assume that the mutex is held
124 * and may call `updateExclusive` or `withUpdateExclusive` to mutate this field.
125 *
126 * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive`
127 * block and require this field to be non-`undefined`.
128 */
129 private pendingChanges: ChangeSet | undefined;
130
131 /**
132 * Local changes not yet sychronized to the server and not part of the current update, if any.
133 */
134 private dirtyChanges: ChangeSet;
135
136 /**
137 * Locked when we try to modify the state on the server.
138 */
139 private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS);
140
141 constructor(private readonly store: EditorStore) {
142 this.dirtyChanges = this.newEmptyChangeSet();
143 }
144
145 get locekdForUpdate(): boolean {
146 return this.mutex.isLocked();
147 }
148
149 get hasDirtyChanges(): boolean {
150 return !this.dirtyChanges.empty;
151 }
152
153 get upToDate(): boolean {
154 return !this.locekdForUpdate && !this.hasDirtyChanges;
155 }
156
157 hasChangesSince(xtextStateId: string): boolean {
158 return (
159 this.xtextStateId !== xtextStateId ||
160 this.locekdForUpdate ||
161 this.hasDirtyChanges
162 );
163 }
164
165 /**
166 * Extends the current set of changes with `transaction`.
167 *
168 * Also determines if the transaction has made local changes
169 * that will have to be synchronized to the server
170 *
171 * @param transaction the transaction that affected the editor
172 * @returns `true` if the transaction requires and idle update, `false` otherwise
173 */
174 onTransaction(transaction: Transaction): boolean {
175 const setDirtyChangesEffect = transaction.effects.find((effect) =>
176 effect.is(setDirtyChanges),
177 ) as StateEffect<ChangeSet> | undefined;
178 if (setDirtyChangesEffect) {
179 const { value } = setDirtyChangesEffect;
180 if (this.pendingChanges !== undefined) {
181 // Do not clear `pendingUpdate`, because that would indicate an update failure
182 // to `withUpdateExclusive`.
183 this.pendingChanges = ChangeSet.empty(value.length);
184 }
185 this.dirtyChanges = value;
186 return false;
187 }
188 if (transaction.docChanged) {
189 this.dirtyChanges = this.dirtyChanges.compose(transaction.changes);
190 return true;
191 }
192 return false;
193 }
194
195 invalidateStateId(): void {
196 this.xtextStateId = undefined;
197 }
198
199 /**
200 * Computes the summary of any changes happened since the last complete update.
201 *
202 * The result reflects any changes that happened since the `xtextStateId`
203 * version was uploaded to the server.
204 *
205 * @returns the summary of changes since the last update
206 */
207 computeChangesSinceLastUpdate(): ChangeDesc {
208 return (
209 this.pendingChanges?.composeDesc(this.dirtyChanges.desc) ??
210 this.dirtyChanges.desc
211 );
212 }
213
214 private newEmptyChangeSet(): ChangeSet {
215 return ChangeSet.of([], this.store.state.doc.length);
216 }
217
218 private readonly pendingUpdate: PendingUpdate = {
219 prepareDeltaUpdateExclusive: (): Delta | undefined => {
220 this.pendingUpdate.extendPendingUpdateExclusive();
221 if (this.pendingChanges === undefined || this.pendingChanges.empty) {
222 return undefined;
223 }
224 let minFromA = Number.MAX_SAFE_INTEGER;
225 let maxToA = 0;
226 let minFromB = Number.MAX_SAFE_INTEGER;
227 let maxToB = 0;
228 this.pendingChanges.iterChangedRanges((fromA, toA, fromB, toB) => {
229 minFromA = Math.min(minFromA, fromA);
230 maxToA = Math.max(maxToA, toA);
231 minFromB = Math.min(minFromB, fromB);
232 maxToB = Math.max(maxToB, toB);
233 });
234 return {
235 deltaOffset: minFromA,
236 deltaReplaceLength: maxToA - minFromA,
237 deltaText: this.store.state.doc.sliceString(minFromB, maxToB),
238 };
239 },
240 extendPendingUpdateExclusive: (): void => {
241 if (!this.locekdForUpdate) {
242 throw new Error('Cannot update state without locking the mutex');
243 }
244 if (this.hasDirtyChanges) {
245 this.pendingChanges =
246 this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges;
247 this.dirtyChanges = this.newEmptyChangeSet();
248 }
249 },
250 applyBeforeDirtyChangesExclusive: (changeSpec: ChangeSpec): void => {
251 if (!this.locekdForUpdate) {
252 throw new Error('Cannot update state without locking the mutex');
253 }
254 const pendingChanges =
255 this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges;
256 const revertChanges = pendingChanges.invert(this.store.state.doc);
257 const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength);
258 const redoChanges = pendingChanges.map(applyBefore.desc);
259 const changeSet = revertChanges.compose(applyBefore).compose(redoChanges);
260 this.store.dispatch({
261 changes: changeSet,
262 // Keep the current set of dirty changes (but update them according the re-formatting)
263 // and to not add the formatting the dirty changes.
264 effects: [setDirtyChanges.of(redoChanges)],
265 });
266 },
267 };
268
269 private readonly lockedState: LockedState = {
270 updateExclusive: (
271 callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>,
272 ): Promise<void> => {
273 return this.lockedState.withUpdateExclusive<void>(
274 async (pendingUpdate) => {
275 const newStateId = await callback(pendingUpdate);
276 return { newStateId, data: undefined };
277 },
278 );
279 },
280 withUpdateExclusive: async <T>(
281 callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>,
282 ): Promise<T> => {
283 if (!this.locekdForUpdate) {
284 throw new Error('Cannot update state without locking the mutex');
285 }
286 if (this.pendingChanges !== undefined) {
287 throw new Error('Delta updates are not reentrant');
288 }
289 let newStateId: string | undefined;
290 let data: T;
291 try {
292 ({ newStateId, data } = await callback(this.pendingUpdate));
293 } catch (e) {
294 log.error('Error while update', e);
295 this.cancelUpdate();
296 throw e;
297 }
298 if (newStateId === undefined) {
299 this.cancelUpdate();
300 } else {
301 this.xtextStateId = newStateId;
302 this.pendingChanges = undefined;
303 }
304 return data;
305 },
306 };
307
308 private cancelUpdate(): void {
309 if (this.pendingChanges === undefined) {
310 return;
311 }
312 this.dirtyChanges = this.pendingChanges.compose(this.dirtyChanges);
313 this.pendingChanges = undefined;
314 }
315
316 runExclusive<T>(
317 callback: (lockedState: LockedState) => Promise<T>,
318 ): Promise<T> {
319 return this.mutex.runExclusive(async () => {
320 if (this.pendingChanges !== undefined) {
321 throw new Error('Update is pending before entering critical section');
322 }
323 const result = await callback(this.lockedState);
324 if (this.pendingChanges !== undefined) {
325 throw new Error('Update is pending after entering critical section');
326 }
327 return result;
328 });
329 }
330
331 runExclusiveHighPriority<T>(
332 callback: (lockedState: LockedState) => Promise<T>,
333 ): Promise<T> {
334 this.mutex.cancel();
335 return this.runExclusive(callback);
336 }
337
338 async runExclusiveWithRetries<T>(
339 callback: (lockedState: LockedState) => Promise<T>,
340 maxRetries = 5,
341 ): Promise<T> {
342 let retries = 0;
343 while (retries < maxRetries) {
344 try {
345 // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries.
346 return await this.runExclusive(callback);
347 } catch (error) {
348 // Let timeout errors propagate to give up retrying on a flaky connection.
349 if (error !== E_CANCELED) {
350 throw error;
351 }
352 retries += 1;
353 }
354 }
355 throw E_CANCELED;
356 }
357}