/* * SPDX-FileCopyrightText: 2021-2023 The Refinery Authors * * SPDX-License-Identifier: EPL-2.0 */ import { createAtom, makeAutoObservable, observable } from 'mobx'; import ms from 'ms'; import { nanoid } from 'nanoid'; import { interpret } from 'xstate'; import CancelledError from '../utils/CancelledError'; import PendingTask from '../utils/PendingTask'; import getLogger from '../utils/getLogger'; import fetchBackendConfig from './fetchBackendConfig'; import webSocketMachine from './webSocketMachine'; import { type XtextWebPushService, XtextResponse, type XtextWebRequest, } from './xtextMessages'; import { PongResult } from './xtextServiceResults'; const XTEXT_SUBPROTOCOL_V1 = 'tools.refinery.language.web.xtext.v1'; // Use a large enough timeout so that a request can complete successfully // even if the browser has throttled the background tab. const REQUEST_TIMEOUT = ms('5s'); const log = getLogger('xtext.XtextWebSocketClient'); export type ReconnectHandler = () => void; export type DisconnectHandler = () => void; export type PushHandler = ( resourceId: string, stateId: string, service: XtextWebPushService, data: unknown, ) => void; export default class XtextWebSocketClient { private readonly stateAtom = createAtom('state'); private webSocket: WebSocket | undefined; private readonly pendingRequests = new Map>(); private readonly interpreter = interpret( webSocketMachine.withConfig({ actions: { openWebSocket: () => this.openWebSocket(), closeWebSocket: () => this.closeWebSocket(), notifyReconnect: () => this.onReconnect(), notifyDisconnect: () => this.onDisconnect(), cancelPendingRequests: () => this.cancelPendingRequests(), }, services: { pingService: () => this.sendPing(), }, }), { logger: log.log.bind(log), }, ); private readonly openListener = () => { if (this.webSocket === undefined) { throw new Error('Open listener called without a WebSocket'); } const { webSocket: { protocol }, } = this; if (protocol === XTEXT_SUBPROTOCOL_V1) { this.interpreter.send('OPENED'); } else { this.interpreter.send({ type: 'ERROR', message: `Unknown subprotocol ${protocol}`, }); } }; private readonly errorListener = (event: Event) => { log.error('WebSocket error', event); this.interpreter.send({ type: 'ERROR', message: 'WebSocket error' }); }; private readonly closeListener = ({ code, reason }: CloseEvent) => this.interpreter.send({ type: 'ERROR', message: `Socket closed unexpectedly: ${code} ${reason}`, }); private readonly messageListener = ({ data }: MessageEvent) => { if (typeof data !== 'string') { this.interpreter.send({ type: 'ERROR', message: 'Unexpected message format', }); return; } let json: unknown; try { json = JSON.parse(data); } catch (error) { log.error('JSON parse error', error); this.interpreter.send({ type: 'ERROR', message: 'Malformed message' }); return; } const responseResult = XtextResponse.safeParse(json); if (!responseResult.success) { log.error('Xtext response', json, 'is malformed:', responseResult.error); this.interpreter.send({ type: 'ERROR', message: 'Malformed message' }); return; } const { data: response } = responseResult; if ('service' in response) { // `XtextWebPushMessage.push` is optional, but `service` is not. const { resource, stateId, service, push } = response; this.onPush(resource, stateId, service, push); return; } const { id } = response; const task = this.pendingRequests.get(id); if (task === undefined) { log.warn('Task', id, 'has been already resolved'); return; } this.removeTask(id); if ('error' in response) { const formattedMessage = `${response.error} error: ${response.message}`; log.error('Task', id, formattedMessage); task.reject(new Error(formattedMessage)); } else { task.resolve(response.response); } }; constructor( private readonly onReconnect: ReconnectHandler, private readonly onDisconnect: DisconnectHandler, private readonly onPush: PushHandler, ) { makeAutoObservable< XtextWebSocketClient, | 'stateAtom' | 'webSocket' | 'interpreter' | 'openListener' | 'openWebSocket' | 'errorListener' | 'closeListener' | 'messageListener' | 'sendPing' >(this, { stateAtom: false, webSocket: observable.ref, interpreter: false, openListener: false, openWebSocket: false, errorListener: false, closeListener: false, messageListener: false, sendPing: false, }); } start(): void { this.interpreter .onTransition((state, event) => { log.trace('WebSocke state transition', state.value, 'on event', event); this.stateAtom.reportChanged(); }) .start(); this.interpreter.send(window.navigator.onLine ? 'ONLINE' : 'OFFLINE'); window.addEventListener('offline', () => this.interpreter.send('OFFLINE')); window.addEventListener('online', () => this.interpreter.send('ONLINE')); this.updateVisibility(); document.addEventListener('visibilitychange', () => this.updateVisibility(), ); window.addEventListener('pagehide', () => this.interpreter.send('PAGE_HIDE'), ); window.addEventListener('pageshow', () => { this.updateVisibility(); this.interpreter.send('PAGE_SHOW'); }); // https://developer.chrome.com/blog/page-lifecycle-api/#new-features-added-in-chrome-68 if ('wasDiscarded' in document) { document.addEventListener('freeze', () => this.interpreter.send('PAGE_FREEZE'), ); document.addEventListener('resume', () => this.interpreter.send('PAGE_RESUME'), ); } this.interpreter.send('CONNECT'); } get state() { this.stateAtom.reportObserved(); return this.interpreter.getSnapshot(); } get opening(): boolean { return this.state.matches('connection.socketCreated.open.opening'); } get opened(): boolean { return this.state.matches('connection.socketCreated.open.opened'); } get disconnectedByUser(): boolean { return this.state.matches('connection.disconnected'); } get networkMissing(): boolean { return ( this.state.matches('connection.temporarilyOffline') || (this.disconnectedByUser && this.state.matches('network.offline')) ); } get errors(): string[] { return this.state.context.errors; } connect(): void { this.interpreter.send('CONNECT'); } disconnect(): void { this.interpreter.send('DISCONNECT'); } forceReconnectOnError(): void { this.interpreter.send({ type: 'ERROR', message: 'Client error', }); } send(request: unknown): Promise { if (!this.opened || this.webSocket === undefined) { throw new Error('Not connected'); } const id = nanoid(); const promise = new Promise((resolve, reject) => { const task = new PendingTask(resolve, reject, REQUEST_TIMEOUT, () => { this.interpreter.send({ type: 'ERROR', message: 'Connection timed out', }); this.removeTask(id); }); this.pendingRequests.set(id, task); }); const webRequest: XtextWebRequest = { id, request }; const json = JSON.stringify(webRequest); this.webSocket.send(json); return promise; } setKeepAlive(keepAlive: boolean): void { this.interpreter.send({ type: keepAlive ? 'GENERATION_STARTED' : 'GENERATION_ENDED', }); } private updateVisibility(): void { this.interpreter.send(document.hidden ? 'TAB_HIDDEN' : 'TAB_VISIBLE'); } private openWebSocket(): void { if (this.webSocket !== undefined) { throw new Error('WebSocket already open'); } log.debug('Creating WebSocket'); (async () => { let { webSocketURL } = await fetchBackendConfig(); if (webSocketURL === undefined) { webSocketURL = `${window.origin.replace(/^http/, 'ws')}/xtext-service`; } this.openWebSocketWithURL(webSocketURL); })().catch((error) => { log.error('Error while initializing connection', error); const message = error instanceof Error ? error.message : String(error); this.interpreter.send({ type: 'ERROR', message, }); }); } private openWebSocketWithURL(webSocketURL: string): void { this.webSocket = new WebSocket(webSocketURL, XTEXT_SUBPROTOCOL_V1); this.webSocket.addEventListener('open', this.openListener); this.webSocket.addEventListener('close', this.closeListener); this.webSocket.addEventListener('error', this.errorListener); this.webSocket.addEventListener('message', this.messageListener); } private closeWebSocket() { if (this.webSocket === undefined) { // We might get here when there is a network error before the socket is initialized // and we don't have to do anything to close it. return; } log.debug('Closing WebSocket'); this.webSocket.removeEventListener('open', this.openListener); this.webSocket.removeEventListener('close', this.closeListener); this.webSocket.removeEventListener('error', this.errorListener); this.webSocket.removeEventListener('message', this.messageListener); this.webSocket.close(1000, 'Closing connection'); this.webSocket = undefined; } private removeTask(id: string): void { this.pendingRequests.delete(id); } private cancelPendingRequests(): void { this.pendingRequests.forEach((task) => task.reject(new CancelledError('Closing connection')), ); this.pendingRequests.clear(); } private async sendPing(): Promise { const ping = nanoid(); const result = await this.send({ ping }); const { pong } = PongResult.parse(result); if (ping !== pong) { throw new Error(`Expected pong ${ping} but got ${pong} instead`); } } }