aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/frontend/src/xtext/UpdateStateTracker.ts
diff options
context:
space:
mode:
authorLibravatar Kristóf Marussy <kristof@marussy.com>2022-08-25 20:03:41 +0200
committerLibravatar Kristóf Marussy <kristof@marussy.com>2022-08-25 21:54:27 +0200
commitd774c3d2c4fc5948483438d8304af5baa6bb7a91 (patch)
treea2d7a7704dabb806e3b4578abc90b143ca668080 /subprojects/frontend/src/xtext/UpdateStateTracker.ts
parentfix(frontend): UpdateService synchronization (diff)
downloadrefinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.tar.gz
refinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.tar.zst
refinery-d774c3d2c4fc5948483438d8304af5baa6bb7a91.zip
refactor(frontend): extract xtextStateId tracking
Diffstat (limited to 'subprojects/frontend/src/xtext/UpdateStateTracker.ts')
-rw-r--r--subprojects/frontend/src/xtext/UpdateStateTracker.ts357
1 files changed, 357 insertions, 0 deletions
diff --git a/subprojects/frontend/src/xtext/UpdateStateTracker.ts b/subprojects/frontend/src/xtext/UpdateStateTracker.ts
new file mode 100644
index 00000000..04359060
--- /dev/null
+++ b/subprojects/frontend/src/xtext/UpdateStateTracker.ts
@@ -0,0 +1,357 @@
1/**
2 * @file State tracker for pushing updates to the Xtext server.
3 *
4 * This file implements complex logic to avoid missing or overwriting state updates
5 * and to avoid sending conflicting updates to the Xtext server.
6 *
7 * The `LockedState` and `PendingUpdate` objects are used as capabilities to
8 * signify whether the socket to the Xtext server is locked for updates and
9 * whether an update is in progress, respectively.
10 * Always use these objects only received as an argument of a lambda expression
11 * or method and never leak them into class field or global variables.
12 * The presence of such an objects in the scope should always imply that
13 * the corresponding condition holds.
14 */
15
16import {
17 type ChangeDesc,
18 ChangeSet,
19 type ChangeSpec,
20 StateEffect,
21 type Transaction,
22} from '@codemirror/state';
23import { E_CANCELED, Mutex, withTimeout } from 'async-mutex';
24
25import type EditorStore from '../editor/EditorStore';
26import getLogger from '../utils/getLogger';
27
28const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000;
29
30const log = getLogger('xtext.UpdateStateTracker');
31
32/**
33 * State effect used to override the dirty changes after a transaction.
34 *
35 * If this state effect is _not_ present in a transaction,
36 * the transaction will be appended to the current dirty changes.
37 *
38 * If this state effect is present, the current dirty changes will be replaced
39 * by the value of this effect.
40 */
41const setDirtyChanges = StateEffect.define<ChangeSet>();
42
43export interface StateUpdateResult<T> {
44 /** The new state ID on the server or `undefined` if no update was performed. */
45 newStateId: string | undefined;
46
47 /** Optional data payload received during the update. */
48 data: T;
49}
50
51/**
52 * Signifies a capability that the Xtext server state is locked for update.
53 */
54export interface LockedState {
55 /**
56 *
57 * @param callback the asynchronous callback that updates the server state
58 * @returns a promise resolving after the update
59 */
60 updateExclusive(
61 callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>,
62 ): Promise<void>;
63
64 /**
65 * Executes an asynchronous callback that updates the state on the server.
66 *
67 * If the callback returns `undefined` as the `newStateId`,
68 * the update is assumed to be aborted and any pending changes will be marked as dirt again.
69 * Any exceptions thrown in `callback` will also cause the update to be aborted.
70 *
71 * Ensures that updates happen sequentially and manages `pendingUpdate`
72 * and `dirtyChanges` to reflect changes being synchronized to the server
73 * and not yet synchronized to the server, respectively.
74 *
75 * Optionally, `callback` may return a second value that is retured by this function.
76 *
77 * Once the remote procedure call to update the server state finishes
78 * and returns the new `stateId`, `callback` must return _immediately_
79 * to ensure that the local `stateId` is updated likewise to be able to handle
80 * push messages referring to the new `stateId` from the server.
81 * If additional asynchronous work is needed to compute the second value in some cases,
82 * use `T | undefined` instead of `T` as a return type and signal the need for additional
83 * computations by returning `undefined`. Thus additional computations can be performed
84 * outside of the critical section.
85 *
86 * @param callback the asynchronous callback that updates the server state
87 * @returns a promise resolving to the second value returned by `callback`
88 */
89 withUpdateExclusive<T>(
90 callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>,
91 ): Promise<T>;
92}
93
94export interface Delta {
95 deltaOffset: number;
96
97 deltaReplaceLength: number;
98
99 deltaText: string;
100}
101
102/**
103 * Signifies a capability that dirty changes are being marked for uploading.
104 */
105export interface PendingUpdate {
106 prepareDeltaUpdateExclusive(): Delta | undefined;
107
108 extendPendingUpdateExclusive(): void;
109
110 applyBeforeDirtyChangesExclusive(changeSpec: ChangeSpec): void;
111}
112
113export default class UpdateStateTracker {
114 xtextStateId: string | undefined;
115
116 /**
117 * The changes being synchronized to the server if a full or delta text update is running
118 * withing a `withUpdateExclusive` block, `undefined` otherwise.
119 *
120 * Must be `undefined` before and after entering the critical section of `mutex`
121 * and may only be changes in the critical section of `mutex`.
122 *
123 * Methods named with an `Exclusive` suffix in this class assume that the mutex is held
124 * and may call `updateExclusive` or `withUpdateExclusive` to mutate this field.
125 *
126 * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive`
127 * block and require this field to be non-`undefined`.
128 */
129 private pendingChanges: ChangeSet | undefined;
130
131 /**
132 * Local changes not yet sychronized to the server and not part of the current update, if any.
133 */
134 private dirtyChanges: ChangeSet;
135
136 /**
137 * Locked when we try to modify the state on the server.
138 */
139 private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS);
140
141 constructor(private readonly store: EditorStore) {
142 this.dirtyChanges = this.newEmptyChangeSet();
143 }
144
145 get locekdForUpdate(): boolean {
146 return this.mutex.isLocked();
147 }
148
149 get hasDirtyChanges(): boolean {
150 return !this.dirtyChanges.empty;
151 }
152
153 get upToDate(): boolean {
154 return !this.locekdForUpdate && !this.hasDirtyChanges;
155 }
156
157 hasChangesSince(xtextStateId: string): boolean {
158 return (
159 this.xtextStateId !== xtextStateId ||
160 this.locekdForUpdate ||
161 this.hasDirtyChanges
162 );
163 }
164
165 /**
166 * Extends the current set of changes with `transaction`.
167 *
168 * Also determines if the transaction has made local changes
169 * that will have to be synchronized to the server
170 *
171 * @param transaction the transaction that affected the editor
172 * @returns `true` if the transaction requires and idle update, `false` otherwise
173 */
174 onTransaction(transaction: Transaction): boolean {
175 const setDirtyChangesEffect = transaction.effects.find((effect) =>
176 effect.is(setDirtyChanges),
177 ) as StateEffect<ChangeSet> | undefined;
178 if (setDirtyChangesEffect) {
179 const { value } = setDirtyChangesEffect;
180 if (this.pendingChanges !== undefined) {
181 // Do not clear `pendingUpdate`, because that would indicate an update failure
182 // to `withUpdateExclusive`.
183 this.pendingChanges = ChangeSet.empty(value.length);
184 }
185 this.dirtyChanges = value;
186 return false;
187 }
188 if (transaction.docChanged) {
189 this.dirtyChanges = this.dirtyChanges.compose(transaction.changes);
190 return true;
191 }
192 return false;
193 }
194
195 invalidateStateId(): void {
196 this.xtextStateId = undefined;
197 }
198
199 /**
200 * Computes the summary of any changes happened since the last complete update.
201 *
202 * The result reflects any changes that happened since the `xtextStateId`
203 * version was uploaded to the server.
204 *
205 * @returns the summary of changes since the last update
206 */
207 computeChangesSinceLastUpdate(): ChangeDesc {
208 return (
209 this.pendingChanges?.composeDesc(this.dirtyChanges.desc) ??
210 this.dirtyChanges.desc
211 );
212 }
213
214 private newEmptyChangeSet(): ChangeSet {
215 return ChangeSet.of([], this.store.state.doc.length);
216 }
217
218 private readonly pendingUpdate: PendingUpdate = {
219 prepareDeltaUpdateExclusive: (): Delta | undefined => {
220 this.pendingUpdate.extendPendingUpdateExclusive();
221 if (this.pendingChanges === undefined || this.pendingChanges.empty) {
222 return undefined;
223 }
224 let minFromA = Number.MAX_SAFE_INTEGER;
225 let maxToA = 0;
226 let minFromB = Number.MAX_SAFE_INTEGER;
227 let maxToB = 0;
228 this.pendingChanges.iterChangedRanges((fromA, toA, fromB, toB) => {
229 minFromA = Math.min(minFromA, fromA);
230 maxToA = Math.max(maxToA, toA);
231 minFromB = Math.min(minFromB, fromB);
232 maxToB = Math.max(maxToB, toB);
233 });
234 return {
235 deltaOffset: minFromA,
236 deltaReplaceLength: maxToA - minFromA,
237 deltaText: this.store.state.doc.sliceString(minFromB, maxToB),
238 };
239 },
240 extendPendingUpdateExclusive: (): void => {
241 if (!this.locekdForUpdate) {
242 throw new Error('Cannot update state without locking the mutex');
243 }
244 if (this.hasDirtyChanges) {
245 this.pendingChanges =
246 this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges;
247 this.dirtyChanges = this.newEmptyChangeSet();
248 }
249 },
250 applyBeforeDirtyChangesExclusive: (changeSpec: ChangeSpec): void => {
251 if (!this.locekdForUpdate) {
252 throw new Error('Cannot update state without locking the mutex');
253 }
254 const pendingChanges =
255 this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges;
256 const revertChanges = pendingChanges.invert(this.store.state.doc);
257 const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength);
258 const redoChanges = pendingChanges.map(applyBefore.desc);
259 const changeSet = revertChanges.compose(applyBefore).compose(redoChanges);
260 this.store.dispatch({
261 changes: changeSet,
262 // Keep the current set of dirty changes (but update them according the re-formatting)
263 // and to not add the formatting the dirty changes.
264 effects: [setDirtyChanges.of(redoChanges)],
265 });
266 },
267 };
268
269 private readonly lockedState: LockedState = {
270 updateExclusive: (
271 callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>,
272 ): Promise<void> => {
273 return this.lockedState.withUpdateExclusive<void>(
274 async (pendingUpdate) => {
275 const newStateId = await callback(pendingUpdate);
276 return { newStateId, data: undefined };
277 },
278 );
279 },
280 withUpdateExclusive: async <T>(
281 callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>,
282 ): Promise<T> => {
283 if (!this.locekdForUpdate) {
284 throw new Error('Cannot update state without locking the mutex');
285 }
286 if (this.pendingChanges !== undefined) {
287 throw new Error('Delta updates are not reentrant');
288 }
289 let newStateId: string | undefined;
290 let data: T;
291 try {
292 ({ newStateId, data } = await callback(this.pendingUpdate));
293 } catch (e) {
294 log.error('Error while update', e);
295 this.cancelUpdate();
296 throw e;
297 }
298 if (newStateId === undefined) {
299 this.cancelUpdate();
300 } else {
301 this.xtextStateId = newStateId;
302 this.pendingChanges = undefined;
303 }
304 return data;
305 },
306 };
307
308 private cancelUpdate(): void {
309 if (this.pendingChanges === undefined) {
310 return;
311 }
312 this.dirtyChanges = this.pendingChanges.compose(this.dirtyChanges);
313 this.pendingChanges = undefined;
314 }
315
316 runExclusive<T>(
317 callback: (lockedState: LockedState) => Promise<T>,
318 ): Promise<T> {
319 return this.mutex.runExclusive(async () => {
320 if (this.pendingChanges !== undefined) {
321 throw new Error('Update is pending before entering critical section');
322 }
323 const result = await callback(this.lockedState);
324 if (this.pendingChanges !== undefined) {
325 throw new Error('Update is pending after entering critical section');
326 }
327 return result;
328 });
329 }
330
331 runExclusiveHighPriority<T>(
332 callback: (lockedState: LockedState) => Promise<T>,
333 ): Promise<T> {
334 this.mutex.cancel();
335 return this.runExclusive(callback);
336 }
337
338 async runExclusiveWithRetries<T>(
339 callback: (lockedState: LockedState) => Promise<T>,
340 maxRetries = 5,
341 ): Promise<T> {
342 let retries = 0;
343 while (retries < maxRetries) {
344 try {
345 // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries.
346 return await this.runExclusive(callback);
347 } catch (error) {
348 // Let timeout errors propagate to give up retrying on a flaky connection.
349 if (error !== E_CANCELED) {
350 throw error;
351 }
352 retries += 1;
353 }
354 }
355 throw E_CANCELED;
356 }
357}