diff options
-rw-r--r-- | subprojects/frontend/src/xtext/UpdateService.ts | 470 | ||||
-rw-r--r-- | subprojects/frontend/src/xtext/UpdateStateTracker.ts | 357 |
2 files changed, 495 insertions, 332 deletions
diff --git a/subprojects/frontend/src/xtext/UpdateService.ts b/subprojects/frontend/src/xtext/UpdateService.ts index 3b4ae259..94e01ca2 100644 --- a/subprojects/frontend/src/xtext/UpdateService.ts +++ b/subprojects/frontend/src/xtext/UpdateService.ts | |||
@@ -1,17 +1,16 @@ | |||
1 | import { | 1 | import type { ChangeDesc, Transaction } from '@codemirror/state'; |
2 | type ChangeDesc, | 2 | import { E_CANCELED, E_TIMEOUT } from 'async-mutex'; |
3 | ChangeSet, | ||
4 | type ChangeSpec, | ||
5 | StateEffect, | ||
6 | type Transaction, | ||
7 | } from '@codemirror/state'; | ||
8 | import { E_CANCELED, E_TIMEOUT, Mutex, withTimeout } from 'async-mutex'; | ||
9 | import { debounce } from 'lodash-es'; | 3 | import { debounce } from 'lodash-es'; |
10 | import { nanoid } from 'nanoid'; | 4 | import { nanoid } from 'nanoid'; |
11 | 5 | ||
12 | import type EditorStore from '../editor/EditorStore'; | 6 | import type EditorStore from '../editor/EditorStore'; |
13 | import getLogger from '../utils/getLogger'; | 7 | import getLogger from '../utils/getLogger'; |
14 | 8 | ||
9 | import UpdateStateTracker, { | ||
10 | type LockedState, | ||
11 | type PendingUpdate, | ||
12 | } from './UpdateStateTracker'; | ||
13 | import type { StateUpdateResult, Delta } from './UpdateStateTracker'; | ||
15 | import type XtextWebSocketClient from './XtextWebSocketClient'; | 14 | import type XtextWebSocketClient from './XtextWebSocketClient'; |
16 | import { | 15 | import { |
17 | type ContentAssistEntry, | 16 | type ContentAssistEntry, |
@@ -24,98 +23,51 @@ import { | |||
24 | 23 | ||
25 | const UPDATE_TIMEOUT_MS = 500; | 24 | const UPDATE_TIMEOUT_MS = 500; |
26 | 25 | ||
27 | const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; | ||
28 | |||
29 | const FORMAT_TEXT_RETRIES = 5; | ||
30 | |||
31 | const log = getLogger('xtext.UpdateService'); | 26 | const log = getLogger('xtext.UpdateService'); |
32 | 27 | ||
33 | /** | ||
34 | * State effect used to override the dirty changes after a transaction. | ||
35 | * | ||
36 | * If this state effect is _not_ present in a transaction, | ||
37 | * the transaction will be appended to the current dirty changes. | ||
38 | * | ||
39 | * If this state effect is present, the current dirty changes will be replaced | ||
40 | * by the value of this effect. | ||
41 | */ | ||
42 | const setDirtyChanges = StateEffect.define<ChangeSet>(); | ||
43 | |||
44 | export interface AbortSignal { | 28 | export interface AbortSignal { |
45 | aborted: boolean; | 29 | aborted: boolean; |
46 | } | 30 | } |
47 | 31 | ||
48 | export interface ContentAssistParams { | ||
49 | caretOffset: number; | ||
50 | |||
51 | proposalsLimit: number; | ||
52 | } | ||
53 | |||
54 | export type CancellableResult<T> = | 32 | export type CancellableResult<T> = |
55 | | { cancelled: false; data: T } | 33 | | { cancelled: false; data: T } |
56 | | { cancelled: true }; | 34 | | { cancelled: true }; |
57 | 35 | ||
58 | interface StateUpdateResult<T> { | 36 | export interface ContentAssistParams { |
59 | newStateId: string; | 37 | caretOffset: number; |
60 | |||
61 | data: T; | ||
62 | } | ||
63 | |||
64 | interface Delta { | ||
65 | deltaOffset: number; | ||
66 | |||
67 | deltaReplaceLength: number; | ||
68 | 38 | ||
69 | deltaText: string; | 39 | proposalsLimit: number; |
70 | } | 40 | } |
71 | 41 | ||
72 | export default class UpdateService { | 42 | export default class UpdateService { |
73 | readonly resourceName: string; | 43 | readonly resourceName: string; |
74 | 44 | ||
75 | xtextStateId: string | undefined; | 45 | private readonly tracker: UpdateStateTracker; |
76 | |||
77 | private readonly store: EditorStore; | ||
78 | |||
79 | private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); | ||
80 | |||
81 | /** | ||
82 | * The changes being synchronized to the server if a full or delta text update is running | ||
83 | * withing a `withUpdateExclusive` block, `undefined` otherwise. | ||
84 | * | ||
85 | * Must be `undefined` before and after entering the critical section of `mutex` | ||
86 | * and may only be changes in the critical section of `mutex`. | ||
87 | * | ||
88 | * Methods named with an `Exclusive` suffix in this class assume that the mutex is held | ||
89 | * and may call `withUpdateExclusive` or `doFallbackUpdateFullTextExclusive` | ||
90 | * to mutate this field. | ||
91 | * | ||
92 | * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` | ||
93 | * block and require this field to be non-`undefined`. | ||
94 | */ | ||
95 | private pendingUpdate: ChangeSet | undefined; | ||
96 | |||
97 | /** | ||
98 | * Local changes not yet sychronized to the server and not part of the running update, if any. | ||
99 | */ | ||
100 | private dirtyChanges: ChangeSet; | ||
101 | |||
102 | private readonly webSocketClient: XtextWebSocketClient; | ||
103 | 46 | ||
104 | private readonly idleUpdateLater = debounce( | 47 | private readonly idleUpdateLater = debounce( |
105 | () => this.idleUpdate(), | 48 | () => this.idleUpdate(), |
106 | UPDATE_TIMEOUT_MS, | 49 | UPDATE_TIMEOUT_MS, |
107 | ); | 50 | ); |
108 | 51 | ||
109 | constructor(store: EditorStore, webSocketClient: XtextWebSocketClient) { | 52 | constructor( |
53 | private readonly store: EditorStore, | ||
54 | private readonly webSocketClient: XtextWebSocketClient, | ||
55 | ) { | ||
110 | this.resourceName = `${nanoid(7)}.problem`; | 56 | this.resourceName = `${nanoid(7)}.problem`; |
111 | this.store = store; | 57 | this.tracker = new UpdateStateTracker(store); |
112 | this.dirtyChanges = this.newEmptyChangeSet(); | 58 | } |
113 | this.webSocketClient = webSocketClient; | 59 | |
60 | get xtextStateId(): string | undefined { | ||
61 | return this.tracker.xtextStateId; | ||
62 | } | ||
63 | |||
64 | computeChangesSinceLastUpdate(): ChangeDesc { | ||
65 | return this.tracker.computeChangesSinceLastUpdate(); | ||
114 | } | 66 | } |
115 | 67 | ||
116 | onReconnect(): void { | 68 | onReconnect(): void { |
117 | this.xtextStateId = undefined; | 69 | this.tracker.invalidateStateId(); |
118 | this.updateFullText().catch((error) => { | 70 | this.updateFullTextOrThrow().catch((error) => { |
119 | // Let E_TIMEOUT errors propagate, since if the first update times out, | 71 | // Let E_TIMEOUT errors propagate, since if the first update times out, |
120 | // we can't use the connection. | 72 | // we can't use the connection. |
121 | if (error === E_CANCELED) { | 73 | if (error === E_CANCELED) { |
@@ -128,46 +80,17 @@ export default class UpdateService { | |||
128 | } | 80 | } |
129 | 81 | ||
130 | onTransaction(transaction: Transaction): void { | 82 | onTransaction(transaction: Transaction): void { |
131 | const setDirtyChangesEffect = transaction.effects.find((effect) => | 83 | if (this.tracker.onTransaction(transaction)) { |
132 | effect.is(setDirtyChanges), | ||
133 | ) as StateEffect<ChangeSet> | undefined; | ||
134 | if (setDirtyChangesEffect) { | ||
135 | const { value } = setDirtyChangesEffect; | ||
136 | if (this.pendingUpdate !== undefined) { | ||
137 | // Do not clear `pendingUpdate`, because that would indicate an update failure | ||
138 | // to `withUpdateExclusive`. | ||
139 | this.pendingUpdate = ChangeSet.empty(value.length); | ||
140 | } | ||
141 | this.dirtyChanges = value; | ||
142 | return; | ||
143 | } | ||
144 | if (transaction.docChanged) { | ||
145 | this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); | ||
146 | this.idleUpdateLater(); | 84 | this.idleUpdateLater(); |
147 | } | 85 | } |
148 | } | 86 | } |
149 | 87 | ||
150 | /** | ||
151 | * Computes the summary of any changes happened since the last complete update. | ||
152 | * | ||
153 | * The result reflects any changes that happened since the `xtextStateId` | ||
154 | * version was uploaded to the server. | ||
155 | * | ||
156 | * @returns the summary of changes since the last update | ||
157 | */ | ||
158 | computeChangesSinceLastUpdate(): ChangeDesc { | ||
159 | return ( | ||
160 | this.pendingUpdate?.composeDesc(this.dirtyChanges.desc) ?? | ||
161 | this.dirtyChanges.desc | ||
162 | ); | ||
163 | } | ||
164 | |||
165 | private idleUpdate(): void { | 88 | private idleUpdate(): void { |
166 | if (!this.webSocketClient.isOpen || this.dirtyChanges.empty) { | 89 | if (!this.webSocketClient.isOpen || !this.tracker.hasDirtyChanges) { |
167 | return; | 90 | return; |
168 | } | 91 | } |
169 | if (!this.mutex.isLocked()) { | 92 | if (!this.tracker.locekdForUpdate) { |
170 | this.update().catch((error) => { | 93 | this.updateOrThrow().catch((error) => { |
171 | if (error === E_CANCELED || error === E_TIMEOUT) { | 94 | if (error === E_CANCELED || error === E_TIMEOUT) { |
172 | log.debug('Idle update cancelled'); | 95 | log.debug('Idle update cancelled'); |
173 | return; | 96 | return; |
@@ -178,28 +101,6 @@ export default class UpdateService { | |||
178 | this.idleUpdateLater(); | 101 | this.idleUpdateLater(); |
179 | } | 102 | } |
180 | 103 | ||
181 | private newEmptyChangeSet(): ChangeSet { | ||
182 | return ChangeSet.of([], this.store.state.doc.length); | ||
183 | } | ||
184 | |||
185 | private updateFullText(): Promise<void> { | ||
186 | return this.runExclusive(() => this.updateFullTextExclusive()); | ||
187 | } | ||
188 | |||
189 | private async updateFullTextExclusive(): Promise<void> { | ||
190 | await this.withVoidUpdateExclusive(() => this.doUpdateFullTextExclusive()); | ||
191 | } | ||
192 | |||
193 | private async doUpdateFullTextExclusive(): Promise<string> { | ||
194 | const result = await this.webSocketClient.send({ | ||
195 | resource: this.resourceName, | ||
196 | serviceType: 'update', | ||
197 | fullText: this.store.state.doc.sliceString(0), | ||
198 | }); | ||
199 | const { stateId } = DocumentStateResult.parse(result); | ||
200 | return stateId; | ||
201 | } | ||
202 | |||
203 | /** | 104 | /** |
204 | * Makes sure that the document state on the server reflects recent | 105 | * Makes sure that the document state on the server reflects recent |
205 | * local changes. | 106 | * local changes. |
@@ -209,26 +110,34 @@ export default class UpdateService { | |||
209 | * | 110 | * |
210 | * @returns a promise resolving when the update is completed | 111 | * @returns a promise resolving when the update is completed |
211 | */ | 112 | */ |
212 | private async update(): Promise<void> { | 113 | private async updateOrThrow(): Promise<void> { |
213 | // We may check here for the delta to avoid locking, | 114 | // We may check here for the delta to avoid locking, |
214 | // but we'll need to recompute the delta in the critical section, | 115 | // but we'll need to recompute the delta in the critical section, |
215 | // because it may have changed by the time we can acquire the lock. | 116 | // because it may have changed by the time we can acquire the lock. |
216 | if (this.dirtyChanges.empty) { | 117 | if ( |
118 | !this.tracker.hasDirtyChanges && | ||
119 | this.tracker.xtextStateId !== undefined | ||
120 | ) { | ||
217 | return; | 121 | return; |
218 | } | 122 | } |
219 | await this.runExclusive(() => this.updateExclusive()); | 123 | await this.tracker.runExclusive((lockedState) => |
124 | this.updateExclusive(lockedState), | ||
125 | ); | ||
220 | } | 126 | } |
221 | 127 | ||
222 | private async updateExclusive(): Promise<void> { | 128 | private async updateExclusive(lockedState: LockedState): Promise<void> { |
223 | if (this.xtextStateId === undefined) { | 129 | if (this.xtextStateId === undefined) { |
224 | await this.updateFullTextExclusive(); | 130 | await this.updateFullTextExclusive(lockedState); |
225 | } | 131 | } |
226 | const delta = this.computeDelta(); | 132 | if (!this.tracker.hasDirtyChanges) { |
227 | if (delta === undefined) { | ||
228 | return; | 133 | return; |
229 | } | 134 | } |
230 | log.trace('Editor delta', delta); | 135 | await lockedState.updateExclusive(async (pendingUpdate) => { |
231 | await this.withVoidUpdateExclusive(async () => { | 136 | const delta = pendingUpdate.prepareDeltaUpdateExclusive(); |
137 | if (delta === undefined) { | ||
138 | return undefined; | ||
139 | } | ||
140 | log.trace('Editor delta', delta); | ||
232 | const result = await this.webSocketClient.send({ | 141 | const result = await this.webSocketClient.send({ |
233 | resource: this.resourceName, | 142 | resource: this.resourceName, |
234 | serviceType: 'update', | 143 | serviceType: 'update', |
@@ -240,79 +149,50 @@ export default class UpdateService { | |||
240 | return parsedDocumentStateResult.data.stateId; | 149 | return parsedDocumentStateResult.data.stateId; |
241 | } | 150 | } |
242 | if (isConflictResult(result, 'invalidStateId')) { | 151 | if (isConflictResult(result, 'invalidStateId')) { |
243 | return this.doFallbackUpdateFullTextExclusive(); | 152 | return this.doUpdateFullTextExclusive(pendingUpdate); |
244 | } | 153 | } |
245 | throw parsedDocumentStateResult.error; | 154 | throw parsedDocumentStateResult.error; |
246 | }); | 155 | }); |
247 | } | 156 | } |
248 | 157 | ||
249 | async fetchOccurrences( | 158 | private updateFullTextOrThrow(): Promise<void> { |
250 | getCaretOffset: () => CancellableResult<number>, | 159 | return this.tracker.runExclusive((lockedState) => |
251 | ): Promise<CancellableResult<OccurrencesResult>> { | 160 | this.updateFullTextExclusive(lockedState), |
252 | try { | 161 | ); |
253 | await this.update(); | 162 | } |
254 | } catch (error) { | 163 | |
255 | if (error === E_CANCELED || error === E_TIMEOUT) { | 164 | private async updateFullTextExclusive( |
256 | return { cancelled: true }; | 165 | lockedState: LockedState, |
257 | } | 166 | ): Promise<void> { |
258 | throw error; | 167 | await lockedState.updateExclusive((pendingUpdate) => |
259 | } | 168 | this.doUpdateFullTextExclusive(pendingUpdate), |
260 | if (!this.dirtyChanges.empty || this.mutex.isLocked()) { | 169 | ); |
261 | // Just give up if another update is in progress. | 170 | } |
262 | return { cancelled: true }; | 171 | |
263 | } | 172 | private async doUpdateFullTextExclusive( |
264 | const caretOffsetResult = getCaretOffset(); | 173 | pendingUpdate: PendingUpdate, |
265 | if (caretOffsetResult.cancelled) { | 174 | ): Promise<string> { |
266 | return { cancelled: true }; | 175 | log.debug('Performing full text update'); |
267 | } | 176 | pendingUpdate.extendPendingUpdateExclusive(); |
268 | const expectedStateId = this.xtextStateId; | 177 | const result = await this.webSocketClient.send({ |
269 | const data = await this.webSocketClient.send({ | ||
270 | resource: this.resourceName, | 178 | resource: this.resourceName, |
271 | serviceType: 'occurrences', | 179 | serviceType: 'update', |
272 | caretOffset: caretOffsetResult.data, | 180 | fullText: this.store.state.doc.sliceString(0), |
273 | expectedStateId, | ||
274 | }); | 181 | }); |
275 | if ( | 182 | const { stateId } = DocumentStateResult.parse(result); |
276 | // The query must have reached the server without being conflicted with an update | 183 | return stateId; |
277 | // or cancelled server-side. | ||
278 | isConflictResult(data) || | ||
279 | // And no state update should have occurred since then. | ||
280 | this.xtextStateId !== expectedStateId || | ||
281 | // And there should be no change to the editor text since then. | ||
282 | !this.dirtyChanges.empty || | ||
283 | // And there should be no state update in progress. | ||
284 | this.mutex.isLocked() | ||
285 | ) { | ||
286 | return { cancelled: true }; | ||
287 | } | ||
288 | const parsedOccurrencesResult = OccurrencesResult.safeParse(data); | ||
289 | if (!parsedOccurrencesResult.success) { | ||
290 | log.error( | ||
291 | 'Unexpected occurences result', | ||
292 | data, | ||
293 | 'not an OccurrencesResult:', | ||
294 | parsedOccurrencesResult.error, | ||
295 | ); | ||
296 | throw parsedOccurrencesResult.error; | ||
297 | } | ||
298 | if (parsedOccurrencesResult.data.stateId !== expectedStateId) { | ||
299 | return { cancelled: true }; | ||
300 | } | ||
301 | return { cancelled: false, data: parsedOccurrencesResult.data }; | ||
302 | } | 184 | } |
303 | 185 | ||
304 | async fetchContentAssist( | 186 | async fetchContentAssist( |
305 | params: ContentAssistParams, | 187 | params: ContentAssistParams, |
306 | signal: AbortSignal, | 188 | signal: AbortSignal, |
307 | ): Promise<ContentAssistEntry[]> { | 189 | ): Promise<ContentAssistEntry[]> { |
308 | if (!this.mutex.isLocked && this.xtextStateId !== undefined) { | 190 | if (this.tracker.upToDate && this.xtextStateId !== undefined) { |
309 | return this.fetchContentAssistFetchOnly(params, this.xtextStateId); | 191 | return this.fetchContentAssistFetchOnly(params, this.xtextStateId); |
310 | } | 192 | } |
311 | // Content assist updates should have priority over other updates. | ||
312 | this.mutex.cancel(); | ||
313 | try { | 193 | try { |
314 | return await this.runExclusive(() => | 194 | return await this.tracker.runExclusiveHighPriority((lockedState) => |
315 | this.fetchContentAssistExclusive(params, signal), | 195 | this.fetchContentAssistExclusive(params, lockedState, signal), |
316 | ); | 196 | ); |
317 | } catch (error) { | 197 | } catch (error) { |
318 | if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { | 198 | if ((error === E_CANCELED || error === E_TIMEOUT) && signal.aborted) { |
@@ -324,20 +204,30 @@ export default class UpdateService { | |||
324 | 204 | ||
325 | private async fetchContentAssistExclusive( | 205 | private async fetchContentAssistExclusive( |
326 | params: ContentAssistParams, | 206 | params: ContentAssistParams, |
207 | lockedState: LockedState, | ||
327 | signal: AbortSignal, | 208 | signal: AbortSignal, |
328 | ): Promise<ContentAssistEntry[]> { | 209 | ): Promise<ContentAssistEntry[]> { |
329 | if (this.xtextStateId === undefined) { | 210 | if (this.xtextStateId === undefined) { |
330 | await this.updateFullTextExclusive(); | 211 | await this.updateFullTextExclusive(lockedState); |
331 | } | 212 | } |
332 | if (signal.aborted) { | 213 | if (signal.aborted) { |
333 | return []; | 214 | return []; |
334 | } | 215 | } |
335 | const delta = this.computeDelta(); | 216 | if (this.tracker.hasDirtyChanges) { |
336 | if (delta !== undefined) { | ||
337 | log.trace('Editor delta', delta); | ||
338 | // Try to fetch while also performing a delta update. | 217 | // Try to fetch while also performing a delta update. |
339 | const fetchUpdateEntries = await this.withUpdateExclusive(() => | 218 | const fetchUpdateEntries = await lockedState.withUpdateExclusive( |
340 | this.doFetchContentAssistWithDeltaExclusive(params, delta), | 219 | async (pendingUpdate) => { |
220 | const delta = pendingUpdate.prepareDeltaUpdateExclusive(); | ||
221 | if (delta === undefined) { | ||
222 | return { newStateId: undefined, data: undefined }; | ||
223 | } | ||
224 | log.trace('Editor delta', delta); | ||
225 | return this.doFetchContentAssistWithDeltaExclusive( | ||
226 | params, | ||
227 | pendingUpdate, | ||
228 | delta, | ||
229 | ); | ||
230 | }, | ||
341 | ); | 231 | ); |
342 | if (fetchUpdateEntries !== undefined) { | 232 | if (fetchUpdateEntries !== undefined) { |
343 | return fetchUpdateEntries; | 233 | return fetchUpdateEntries; |
@@ -354,6 +244,7 @@ export default class UpdateService { | |||
354 | 244 | ||
355 | private async doFetchContentAssistWithDeltaExclusive( | 245 | private async doFetchContentAssistWithDeltaExclusive( |
356 | params: ContentAssistParams, | 246 | params: ContentAssistParams, |
247 | pendingUpdate: PendingUpdate, | ||
357 | delta: Delta, | 248 | delta: Delta, |
358 | ): Promise<StateUpdateResult<ContentAssistEntry[] | undefined>> { | 249 | ): Promise<StateUpdateResult<ContentAssistEntry[] | undefined>> { |
359 | const fetchUpdateResult = await this.webSocketClient.send({ | 250 | const fetchUpdateResult = await this.webSocketClient.send({ |
@@ -372,7 +263,7 @@ export default class UpdateService { | |||
372 | } | 263 | } |
373 | if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { | 264 | if (isConflictResult(fetchUpdateResult, 'invalidStateId')) { |
374 | log.warn('Server state invalid during content assist'); | 265 | log.warn('Server state invalid during content assist'); |
375 | const newStateId = await this.doFallbackUpdateFullTextExclusive(); | 266 | const newStateId = await this.doUpdateFullTextExclusive(pendingUpdate); |
376 | // We must finish this state update transaction to prepare for any push events | 267 | // We must finish this state update transaction to prepare for any push events |
377 | // before querying for content assist, so we just return `undefined` and will query | 268 | // before querying for content assist, so we just return `undefined` and will query |
378 | // the content assist service later. | 269 | // the content assist service later. |
@@ -402,33 +293,21 @@ export default class UpdateService { | |||
402 | return fetchOnlyEntries; | 293 | return fetchOnlyEntries; |
403 | } | 294 | } |
404 | 295 | ||
405 | async formatText(): Promise<void> { | 296 | formatText(): Promise<void> { |
406 | let retries = 0; | 297 | return this.tracker.runExclusiveWithRetries((lockedState) => |
407 | while (retries < FORMAT_TEXT_RETRIES) { | 298 | this.formatTextExclusive(lockedState), |
408 | try { | 299 | ); |
409 | // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. | ||
410 | await this.runExclusive(() => this.formatTextExclusive()); | ||
411 | return; | ||
412 | } catch (error) { | ||
413 | // Let timeout errors propagate to give up formatting on a flaky connection. | ||
414 | if (error === E_CANCELED && retries < FORMAT_TEXT_RETRIES) { | ||
415 | retries += 1; | ||
416 | } else { | ||
417 | throw error; | ||
418 | } | ||
419 | } | ||
420 | } | ||
421 | } | 300 | } |
422 | 301 | ||
423 | private async formatTextExclusive(): Promise<void> { | 302 | private async formatTextExclusive(lockedState: LockedState): Promise<void> { |
424 | await this.updateExclusive(); | 303 | await this.updateExclusive(lockedState); |
425 | let { from, to } = this.store.state.selection.main; | 304 | let { from, to } = this.store.state.selection.main; |
426 | if (to <= from) { | 305 | if (to <= from) { |
427 | from = 0; | 306 | from = 0; |
428 | to = this.store.state.doc.length; | 307 | to = this.store.state.doc.length; |
429 | } | 308 | } |
430 | log.debug('Formatting from', from, 'to', to); | 309 | log.debug('Formatting from', from, 'to', to); |
431 | await this.withVoidUpdateExclusive(async () => { | 310 | await lockedState.updateExclusive(async (pendingUpdate) => { |
432 | const result = await this.webSocketClient.send({ | 311 | const result = await this.webSocketClient.send({ |
433 | resource: this.resourceName, | 312 | resource: this.resourceName, |
434 | serviceType: 'format', | 313 | serviceType: 'format', |
@@ -436,7 +315,7 @@ export default class UpdateService { | |||
436 | selectionEnd: to, | 315 | selectionEnd: to, |
437 | }); | 316 | }); |
438 | const { stateId, formattedText } = FormattingResult.parse(result); | 317 | const { stateId, formattedText } = FormattingResult.parse(result); |
439 | this.applyBeforeDirtyChanges({ | 318 | pendingUpdate.applyBeforeDirtyChangesExclusive({ |
440 | from, | 319 | from, |
441 | to, | 320 | to, |
442 | insert: formattedText, | 321 | insert: formattedText, |
@@ -445,119 +324,46 @@ export default class UpdateService { | |||
445 | }); | 324 | }); |
446 | } | 325 | } |
447 | 326 | ||
448 | private computeDelta(): Delta | undefined { | 327 | async fetchOccurrences( |
449 | if (this.dirtyChanges.empty) { | 328 | getCaretOffset: () => CancellableResult<number>, |
450 | return undefined; | 329 | ): Promise<CancellableResult<OccurrencesResult>> { |
451 | } | ||
452 | let minFromA = Number.MAX_SAFE_INTEGER; | ||
453 | let maxToA = 0; | ||
454 | let minFromB = Number.MAX_SAFE_INTEGER; | ||
455 | let maxToB = 0; | ||
456 | this.dirtyChanges.iterChangedRanges((fromA, toA, fromB, toB) => { | ||
457 | minFromA = Math.min(minFromA, fromA); | ||
458 | maxToA = Math.max(maxToA, toA); | ||
459 | minFromB = Math.min(minFromB, fromB); | ||
460 | maxToB = Math.max(maxToB, toB); | ||
461 | }); | ||
462 | return { | ||
463 | deltaOffset: minFromA, | ||
464 | deltaReplaceLength: maxToA - minFromA, | ||
465 | deltaText: this.store.state.doc.sliceString(minFromB, maxToB), | ||
466 | }; | ||
467 | } | ||
468 | |||
469 | private applyBeforeDirtyChanges(changeSpec: ChangeSpec): void { | ||
470 | const pendingChanges = | ||
471 | this.pendingUpdate?.compose(this.dirtyChanges) ?? this.dirtyChanges; | ||
472 | const revertChanges = pendingChanges.invert(this.store.state.doc); | ||
473 | const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength); | ||
474 | const redoChanges = pendingChanges.map(applyBefore.desc); | ||
475 | const changeSet = revertChanges.compose(applyBefore).compose(redoChanges); | ||
476 | this.store.dispatch({ | ||
477 | changes: changeSet, | ||
478 | // Keep the current set of dirty changes (but update them according the re-formatting) | ||
479 | // and to not add the formatting the dirty changes. | ||
480 | effects: [setDirtyChanges.of(redoChanges)], | ||
481 | }); | ||
482 | } | ||
483 | |||
484 | private runExclusive<T>(callback: () => Promise<T>): Promise<T> { | ||
485 | return this.mutex.runExclusive(async () => { | ||
486 | if (this.pendingUpdate !== undefined) { | ||
487 | throw new Error('Update is pending before entering critical section'); | ||
488 | } | ||
489 | const result = await callback(); | ||
490 | if (this.pendingUpdate !== undefined) { | ||
491 | throw new Error('Update is pending after entering critical section'); | ||
492 | } | ||
493 | return result; | ||
494 | }); | ||
495 | } | ||
496 | |||
497 | private withVoidUpdateExclusive( | ||
498 | callback: () => Promise<string>, | ||
499 | ): Promise<void> { | ||
500 | return this.withUpdateExclusive<void>(async () => { | ||
501 | const newStateId = await callback(); | ||
502 | return { newStateId, data: undefined }; | ||
503 | }); | ||
504 | } | ||
505 | |||
506 | /** | ||
507 | * Executes an asynchronous callback that updates the state on the server. | ||
508 | * | ||
509 | * Ensures that updates happen sequentially and manages `pendingUpdate` | ||
510 | * and `dirtyChanges` to reflect changes being synchronized to the server | ||
511 | * and not yet synchronized to the server, respectively. | ||
512 | * | ||
513 | * Optionally, `callback` may return a second value that is retured by this function. | ||
514 | * | ||
515 | * Once the remote procedure call to update the server state finishes | ||
516 | * and returns the new `stateId`, `callback` must return _immediately_ | ||
517 | * to ensure that the local `stateId` is updated likewise to be able to handle | ||
518 | * push messages referring to the new `stateId` from the server. | ||
519 | * If additional work is needed to compute the second value in some cases, | ||
520 | * use `T | undefined` instead of `T` as a return type and signal the need for additional | ||
521 | * computations by returning `undefined`. Thus additional computations can be performed | ||
522 | * outside of the critical section. | ||
523 | * | ||
524 | * @param callback the asynchronous callback that updates the server state | ||
525 | * @returns a promise resolving to the second value returned by `callback` | ||
526 | */ | ||
527 | private async withUpdateExclusive<T>( | ||
528 | callback: () => Promise<StateUpdateResult<T>>, | ||
529 | ): Promise<T> { | ||
530 | if (this.pendingUpdate !== undefined) { | ||
531 | throw new Error('Delta updates are not reentrant'); | ||
532 | } | ||
533 | this.pendingUpdate = this.dirtyChanges; | ||
534 | this.dirtyChanges = this.newEmptyChangeSet(); | ||
535 | let data: T; | ||
536 | try { | 330 | try { |
537 | ({ newStateId: this.xtextStateId, data } = await callback()); | 331 | await this.updateOrThrow(); |
538 | this.pendingUpdate = undefined; | 332 | } catch (error) { |
539 | } catch (e) { | 333 | if (error === E_CANCELED || error === E_TIMEOUT) { |
540 | log.error('Error while update', e); | 334 | return { cancelled: true }; |
541 | if (this.pendingUpdate === undefined) { | ||
542 | log.error('pendingUpdate was cleared during update'); | ||
543 | } else { | ||
544 | this.dirtyChanges = this.pendingUpdate.compose(this.dirtyChanges); | ||
545 | } | 335 | } |
546 | this.pendingUpdate = undefined; | 336 | throw error; |
547 | this.webSocketClient.forceReconnectOnError(); | ||
548 | throw e; | ||
549 | } | 337 | } |
550 | return data; | 338 | if (!this.tracker.upToDate) { |
551 | } | 339 | // Just give up if another update is in progress. |
552 | 340 | return { cancelled: true }; | |
553 | private doFallbackUpdateFullTextExclusive(): Promise<string> { | 341 | } |
554 | if (this.pendingUpdate === undefined) { | 342 | const caretOffsetResult = getCaretOffset(); |
555 | throw new Error('Only a pending update can be extended'); | 343 | if (caretOffsetResult.cancelled) { |
344 | return { cancelled: true }; | ||
345 | } | ||
346 | const expectedStateId = this.xtextStateId; | ||
347 | if (expectedStateId === undefined) { | ||
348 | // If there is no state on the server, don't bother with finding occurrences. | ||
349 | return { cancelled: true }; | ||
350 | } | ||
351 | const data = await this.webSocketClient.send({ | ||
352 | resource: this.resourceName, | ||
353 | serviceType: 'occurrences', | ||
354 | caretOffset: caretOffsetResult.data, | ||
355 | expectedStateId, | ||
356 | }); | ||
357 | if ( | ||
358 | isConflictResult(data) || | ||
359 | this.tracker.hasChangesSince(expectedStateId) | ||
360 | ) { | ||
361 | return { cancelled: true }; | ||
362 | } | ||
363 | const parsedOccurrencesResult = OccurrencesResult.parse(data); | ||
364 | if (parsedOccurrencesResult.stateId !== expectedStateId) { | ||
365 | return { cancelled: true }; | ||
556 | } | 366 | } |
557 | log.warn('Delta update failed, performing full text update'); | 367 | return { cancelled: false, data: parsedOccurrencesResult }; |
558 | this.xtextStateId = undefined; | ||
559 | this.pendingUpdate = this.pendingUpdate.compose(this.dirtyChanges); | ||
560 | this.dirtyChanges = this.newEmptyChangeSet(); | ||
561 | return this.doUpdateFullTextExclusive(); | ||
562 | } | 368 | } |
563 | } | 369 | } |
diff --git a/subprojects/frontend/src/xtext/UpdateStateTracker.ts b/subprojects/frontend/src/xtext/UpdateStateTracker.ts new file mode 100644 index 00000000..04359060 --- /dev/null +++ b/subprojects/frontend/src/xtext/UpdateStateTracker.ts | |||
@@ -0,0 +1,357 @@ | |||
1 | /** | ||
2 | * @file State tracker for pushing updates to the Xtext server. | ||
3 | * | ||
4 | * This file implements complex logic to avoid missing or overwriting state updates | ||
5 | * and to avoid sending conflicting updates to the Xtext server. | ||
6 | * | ||
7 | * The `LockedState` and `PendingUpdate` objects are used as capabilities to | ||
8 | * signify whether the socket to the Xtext server is locked for updates and | ||
9 | * whether an update is in progress, respectively. | ||
10 | * Always use these objects only received as an argument of a lambda expression | ||
11 | * or method and never leak them into class field or global variables. | ||
12 | * The presence of such an objects in the scope should always imply that | ||
13 | * the corresponding condition holds. | ||
14 | */ | ||
15 | |||
16 | import { | ||
17 | type ChangeDesc, | ||
18 | ChangeSet, | ||
19 | type ChangeSpec, | ||
20 | StateEffect, | ||
21 | type Transaction, | ||
22 | } from '@codemirror/state'; | ||
23 | import { E_CANCELED, Mutex, withTimeout } from 'async-mutex'; | ||
24 | |||
25 | import type EditorStore from '../editor/EditorStore'; | ||
26 | import getLogger from '../utils/getLogger'; | ||
27 | |||
28 | const WAIT_FOR_UPDATE_TIMEOUT_MS = 1000; | ||
29 | |||
30 | const log = getLogger('xtext.UpdateStateTracker'); | ||
31 | |||
32 | /** | ||
33 | * State effect used to override the dirty changes after a transaction. | ||
34 | * | ||
35 | * If this state effect is _not_ present in a transaction, | ||
36 | * the transaction will be appended to the current dirty changes. | ||
37 | * | ||
38 | * If this state effect is present, the current dirty changes will be replaced | ||
39 | * by the value of this effect. | ||
40 | */ | ||
41 | const setDirtyChanges = StateEffect.define<ChangeSet>(); | ||
42 | |||
43 | export interface StateUpdateResult<T> { | ||
44 | /** The new state ID on the server or `undefined` if no update was performed. */ | ||
45 | newStateId: string | undefined; | ||
46 | |||
47 | /** Optional data payload received during the update. */ | ||
48 | data: T; | ||
49 | } | ||
50 | |||
51 | /** | ||
52 | * Signifies a capability that the Xtext server state is locked for update. | ||
53 | */ | ||
54 | export interface LockedState { | ||
55 | /** | ||
56 | * | ||
57 | * @param callback the asynchronous callback that updates the server state | ||
58 | * @returns a promise resolving after the update | ||
59 | */ | ||
60 | updateExclusive( | ||
61 | callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>, | ||
62 | ): Promise<void>; | ||
63 | |||
64 | /** | ||
65 | * Executes an asynchronous callback that updates the state on the server. | ||
66 | * | ||
67 | * If the callback returns `undefined` as the `newStateId`, | ||
68 | * the update is assumed to be aborted and any pending changes will be marked as dirt again. | ||
69 | * Any exceptions thrown in `callback` will also cause the update to be aborted. | ||
70 | * | ||
71 | * Ensures that updates happen sequentially and manages `pendingUpdate` | ||
72 | * and `dirtyChanges` to reflect changes being synchronized to the server | ||
73 | * and not yet synchronized to the server, respectively. | ||
74 | * | ||
75 | * Optionally, `callback` may return a second value that is retured by this function. | ||
76 | * | ||
77 | * Once the remote procedure call to update the server state finishes | ||
78 | * and returns the new `stateId`, `callback` must return _immediately_ | ||
79 | * to ensure that the local `stateId` is updated likewise to be able to handle | ||
80 | * push messages referring to the new `stateId` from the server. | ||
81 | * If additional asynchronous work is needed to compute the second value in some cases, | ||
82 | * use `T | undefined` instead of `T` as a return type and signal the need for additional | ||
83 | * computations by returning `undefined`. Thus additional computations can be performed | ||
84 | * outside of the critical section. | ||
85 | * | ||
86 | * @param callback the asynchronous callback that updates the server state | ||
87 | * @returns a promise resolving to the second value returned by `callback` | ||
88 | */ | ||
89 | withUpdateExclusive<T>( | ||
90 | callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>, | ||
91 | ): Promise<T>; | ||
92 | } | ||
93 | |||
94 | export interface Delta { | ||
95 | deltaOffset: number; | ||
96 | |||
97 | deltaReplaceLength: number; | ||
98 | |||
99 | deltaText: string; | ||
100 | } | ||
101 | |||
102 | /** | ||
103 | * Signifies a capability that dirty changes are being marked for uploading. | ||
104 | */ | ||
105 | export interface PendingUpdate { | ||
106 | prepareDeltaUpdateExclusive(): Delta | undefined; | ||
107 | |||
108 | extendPendingUpdateExclusive(): void; | ||
109 | |||
110 | applyBeforeDirtyChangesExclusive(changeSpec: ChangeSpec): void; | ||
111 | } | ||
112 | |||
113 | export default class UpdateStateTracker { | ||
114 | xtextStateId: string | undefined; | ||
115 | |||
116 | /** | ||
117 | * The changes being synchronized to the server if a full or delta text update is running | ||
118 | * withing a `withUpdateExclusive` block, `undefined` otherwise. | ||
119 | * | ||
120 | * Must be `undefined` before and after entering the critical section of `mutex` | ||
121 | * and may only be changes in the critical section of `mutex`. | ||
122 | * | ||
123 | * Methods named with an `Exclusive` suffix in this class assume that the mutex is held | ||
124 | * and may call `updateExclusive` or `withUpdateExclusive` to mutate this field. | ||
125 | * | ||
126 | * Methods named with a `do` suffix assume that they are called in a `withUpdateExclusive` | ||
127 | * block and require this field to be non-`undefined`. | ||
128 | */ | ||
129 | private pendingChanges: ChangeSet | undefined; | ||
130 | |||
131 | /** | ||
132 | * Local changes not yet sychronized to the server and not part of the current update, if any. | ||
133 | */ | ||
134 | private dirtyChanges: ChangeSet; | ||
135 | |||
136 | /** | ||
137 | * Locked when we try to modify the state on the server. | ||
138 | */ | ||
139 | private readonly mutex = withTimeout(new Mutex(), WAIT_FOR_UPDATE_TIMEOUT_MS); | ||
140 | |||
141 | constructor(private readonly store: EditorStore) { | ||
142 | this.dirtyChanges = this.newEmptyChangeSet(); | ||
143 | } | ||
144 | |||
145 | get locekdForUpdate(): boolean { | ||
146 | return this.mutex.isLocked(); | ||
147 | } | ||
148 | |||
149 | get hasDirtyChanges(): boolean { | ||
150 | return !this.dirtyChanges.empty; | ||
151 | } | ||
152 | |||
153 | get upToDate(): boolean { | ||
154 | return !this.locekdForUpdate && !this.hasDirtyChanges; | ||
155 | } | ||
156 | |||
157 | hasChangesSince(xtextStateId: string): boolean { | ||
158 | return ( | ||
159 | this.xtextStateId !== xtextStateId || | ||
160 | this.locekdForUpdate || | ||
161 | this.hasDirtyChanges | ||
162 | ); | ||
163 | } | ||
164 | |||
165 | /** | ||
166 | * Extends the current set of changes with `transaction`. | ||
167 | * | ||
168 | * Also determines if the transaction has made local changes | ||
169 | * that will have to be synchronized to the server | ||
170 | * | ||
171 | * @param transaction the transaction that affected the editor | ||
172 | * @returns `true` if the transaction requires and idle update, `false` otherwise | ||
173 | */ | ||
174 | onTransaction(transaction: Transaction): boolean { | ||
175 | const setDirtyChangesEffect = transaction.effects.find((effect) => | ||
176 | effect.is(setDirtyChanges), | ||
177 | ) as StateEffect<ChangeSet> | undefined; | ||
178 | if (setDirtyChangesEffect) { | ||
179 | const { value } = setDirtyChangesEffect; | ||
180 | if (this.pendingChanges !== undefined) { | ||
181 | // Do not clear `pendingUpdate`, because that would indicate an update failure | ||
182 | // to `withUpdateExclusive`. | ||
183 | this.pendingChanges = ChangeSet.empty(value.length); | ||
184 | } | ||
185 | this.dirtyChanges = value; | ||
186 | return false; | ||
187 | } | ||
188 | if (transaction.docChanged) { | ||
189 | this.dirtyChanges = this.dirtyChanges.compose(transaction.changes); | ||
190 | return true; | ||
191 | } | ||
192 | return false; | ||
193 | } | ||
194 | |||
195 | invalidateStateId(): void { | ||
196 | this.xtextStateId = undefined; | ||
197 | } | ||
198 | |||
199 | /** | ||
200 | * Computes the summary of any changes happened since the last complete update. | ||
201 | * | ||
202 | * The result reflects any changes that happened since the `xtextStateId` | ||
203 | * version was uploaded to the server. | ||
204 | * | ||
205 | * @returns the summary of changes since the last update | ||
206 | */ | ||
207 | computeChangesSinceLastUpdate(): ChangeDesc { | ||
208 | return ( | ||
209 | this.pendingChanges?.composeDesc(this.dirtyChanges.desc) ?? | ||
210 | this.dirtyChanges.desc | ||
211 | ); | ||
212 | } | ||
213 | |||
214 | private newEmptyChangeSet(): ChangeSet { | ||
215 | return ChangeSet.of([], this.store.state.doc.length); | ||
216 | } | ||
217 | |||
218 | private readonly pendingUpdate: PendingUpdate = { | ||
219 | prepareDeltaUpdateExclusive: (): Delta | undefined => { | ||
220 | this.pendingUpdate.extendPendingUpdateExclusive(); | ||
221 | if (this.pendingChanges === undefined || this.pendingChanges.empty) { | ||
222 | return undefined; | ||
223 | } | ||
224 | let minFromA = Number.MAX_SAFE_INTEGER; | ||
225 | let maxToA = 0; | ||
226 | let minFromB = Number.MAX_SAFE_INTEGER; | ||
227 | let maxToB = 0; | ||
228 | this.pendingChanges.iterChangedRanges((fromA, toA, fromB, toB) => { | ||
229 | minFromA = Math.min(minFromA, fromA); | ||
230 | maxToA = Math.max(maxToA, toA); | ||
231 | minFromB = Math.min(minFromB, fromB); | ||
232 | maxToB = Math.max(maxToB, toB); | ||
233 | }); | ||
234 | return { | ||
235 | deltaOffset: minFromA, | ||
236 | deltaReplaceLength: maxToA - minFromA, | ||
237 | deltaText: this.store.state.doc.sliceString(minFromB, maxToB), | ||
238 | }; | ||
239 | }, | ||
240 | extendPendingUpdateExclusive: (): void => { | ||
241 | if (!this.locekdForUpdate) { | ||
242 | throw new Error('Cannot update state without locking the mutex'); | ||
243 | } | ||
244 | if (this.hasDirtyChanges) { | ||
245 | this.pendingChanges = | ||
246 | this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges; | ||
247 | this.dirtyChanges = this.newEmptyChangeSet(); | ||
248 | } | ||
249 | }, | ||
250 | applyBeforeDirtyChangesExclusive: (changeSpec: ChangeSpec): void => { | ||
251 | if (!this.locekdForUpdate) { | ||
252 | throw new Error('Cannot update state without locking the mutex'); | ||
253 | } | ||
254 | const pendingChanges = | ||
255 | this.pendingChanges?.compose(this.dirtyChanges) ?? this.dirtyChanges; | ||
256 | const revertChanges = pendingChanges.invert(this.store.state.doc); | ||
257 | const applyBefore = ChangeSet.of(changeSpec, revertChanges.newLength); | ||
258 | const redoChanges = pendingChanges.map(applyBefore.desc); | ||
259 | const changeSet = revertChanges.compose(applyBefore).compose(redoChanges); | ||
260 | this.store.dispatch({ | ||
261 | changes: changeSet, | ||
262 | // Keep the current set of dirty changes (but update them according the re-formatting) | ||
263 | // and to not add the formatting the dirty changes. | ||
264 | effects: [setDirtyChanges.of(redoChanges)], | ||
265 | }); | ||
266 | }, | ||
267 | }; | ||
268 | |||
269 | private readonly lockedState: LockedState = { | ||
270 | updateExclusive: ( | ||
271 | callback: (pendingUpdate: PendingUpdate) => Promise<string | undefined>, | ||
272 | ): Promise<void> => { | ||
273 | return this.lockedState.withUpdateExclusive<void>( | ||
274 | async (pendingUpdate) => { | ||
275 | const newStateId = await callback(pendingUpdate); | ||
276 | return { newStateId, data: undefined }; | ||
277 | }, | ||
278 | ); | ||
279 | }, | ||
280 | withUpdateExclusive: async <T>( | ||
281 | callback: (pendingUpdate: PendingUpdate) => Promise<StateUpdateResult<T>>, | ||
282 | ): Promise<T> => { | ||
283 | if (!this.locekdForUpdate) { | ||
284 | throw new Error('Cannot update state without locking the mutex'); | ||
285 | } | ||
286 | if (this.pendingChanges !== undefined) { | ||
287 | throw new Error('Delta updates are not reentrant'); | ||
288 | } | ||
289 | let newStateId: string | undefined; | ||
290 | let data: T; | ||
291 | try { | ||
292 | ({ newStateId, data } = await callback(this.pendingUpdate)); | ||
293 | } catch (e) { | ||
294 | log.error('Error while update', e); | ||
295 | this.cancelUpdate(); | ||
296 | throw e; | ||
297 | } | ||
298 | if (newStateId === undefined) { | ||
299 | this.cancelUpdate(); | ||
300 | } else { | ||
301 | this.xtextStateId = newStateId; | ||
302 | this.pendingChanges = undefined; | ||
303 | } | ||
304 | return data; | ||
305 | }, | ||
306 | }; | ||
307 | |||
308 | private cancelUpdate(): void { | ||
309 | if (this.pendingChanges === undefined) { | ||
310 | return; | ||
311 | } | ||
312 | this.dirtyChanges = this.pendingChanges.compose(this.dirtyChanges); | ||
313 | this.pendingChanges = undefined; | ||
314 | } | ||
315 | |||
316 | runExclusive<T>( | ||
317 | callback: (lockedState: LockedState) => Promise<T>, | ||
318 | ): Promise<T> { | ||
319 | return this.mutex.runExclusive(async () => { | ||
320 | if (this.pendingChanges !== undefined) { | ||
321 | throw new Error('Update is pending before entering critical section'); | ||
322 | } | ||
323 | const result = await callback(this.lockedState); | ||
324 | if (this.pendingChanges !== undefined) { | ||
325 | throw new Error('Update is pending after entering critical section'); | ||
326 | } | ||
327 | return result; | ||
328 | }); | ||
329 | } | ||
330 | |||
331 | runExclusiveHighPriority<T>( | ||
332 | callback: (lockedState: LockedState) => Promise<T>, | ||
333 | ): Promise<T> { | ||
334 | this.mutex.cancel(); | ||
335 | return this.runExclusive(callback); | ||
336 | } | ||
337 | |||
338 | async runExclusiveWithRetries<T>( | ||
339 | callback: (lockedState: LockedState) => Promise<T>, | ||
340 | maxRetries = 5, | ||
341 | ): Promise<T> { | ||
342 | let retries = 0; | ||
343 | while (retries < maxRetries) { | ||
344 | try { | ||
345 | // eslint-disable-next-line no-await-in-loop -- Use a loop for sequential retries. | ||
346 | return await this.runExclusive(callback); | ||
347 | } catch (error) { | ||
348 | // Let timeout errors propagate to give up retrying on a flaky connection. | ||
349 | if (error !== E_CANCELED) { | ||
350 | throw error; | ||
351 | } | ||
352 | retries += 1; | ||
353 | } | ||
354 | } | ||
355 | throw E_CANCELED; | ||
356 | } | ||
357 | } | ||