aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/TransactionExecutor.java
blob: a3792bac9180e7ac67264ae0c6289453f2e0be21 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/*
 * SPDX-FileCopyrightText: 2021-2023 The Refinery Authors <https://refinery.tools/>
 *
 * SPDX-License-Identifier: EPL-2.0
 */
package tools.refinery.language.web.xtext.server;

import com.google.common.base.Strings;
import com.google.inject.Injector;
import org.eclipse.emf.common.util.URI;
import org.eclipse.xtext.resource.IResourceServiceProvider;
import org.eclipse.xtext.util.IDisposable;
import org.eclipse.xtext.web.server.*;
import org.eclipse.xtext.web.server.InvalidRequestException.UnknownLanguageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tools.refinery.language.web.xtext.server.message.*;
import tools.refinery.language.web.xtext.server.push.PrecomputationListener;
import tools.refinery.language.web.xtext.server.push.PushWebDocument;
import tools.refinery.language.web.xtext.servlet.SimpleServiceContext;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransactionExecutor implements IDisposable, PrecomputationListener {
	private static final Logger LOG = LoggerFactory.getLogger(TransactionExecutor.class);

	private final ISession session;

	private final IResourceServiceProvider.Registry resourceServiceProviderRegistry;

	private final Map<String, WeakReference<PushWebDocument>> subscriptions = new HashMap<>();

	private ResponseHandler responseHandler;

	private final Object callPendingLock = new Object();

	private boolean callPending;

	private final List<XtextWebPushMessage> pendingPushMessages = new ArrayList<>();

	private volatile boolean disposed;

	public TransactionExecutor(ISession session, IResourceServiceProvider.Registry resourceServiceProviderRegistry) {
		this.session = session;
		this.resourceServiceProviderRegistry = resourceServiceProviderRegistry;
	}

	public void setResponseHandler(ResponseHandler responseHandler) {
		this.responseHandler = responseHandler;
	}

	public void handleRequest(XtextWebRequest request) throws ResponseHandlerException {
		if (disposed) {
			return;
		}
		var serviceContext = new SimpleServiceContext(session, request.getRequestData());
		var ping = serviceContext.getParameter("ping");
		if (ping != null) {
			onResponse(new XtextWebOkResponse(request, new PongResult(ping)));
			return;
		}
		synchronized (callPendingLock) {
			if (callPending) {
				LOG.error("Reentrant request detected");
			}
			if (!pendingPushMessages.isEmpty()) {
				LOG.error("{} push messages got stuck without a pending request", pendingPushMessages.size());
			}
			callPending = true;
		}
		try {
			var injector = getInjector(serviceContext);
			var serviceDispatcher = injector.getInstance(XtextServiceDispatcher.class);
			var service = serviceDispatcher.getService(new SubscribingServiceContext(serviceContext, this));
			var serviceResult = service.getService().apply();
			onResponse(new XtextWebOkResponse(request, serviceResult));
		} catch (InvalidRequestException e) {
			onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.REQUEST_ERROR, e));
		} catch (RuntimeException e) {
			onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.SERVER_ERROR, e));
		} finally {
			flushPendingPushMessages();
		}
	}

	private void onResponse(XtextWebResponse response) throws ResponseHandlerException {
		if (!disposed) {
			responseHandler.onResponse(response);
		}
	}

	private void flushPendingPushMessages() {
		synchronized (callPendingLock) {
			for (var message : pendingPushMessages) {
				if (disposed) {
					return;
				}
				try {
					responseHandler.onResponse(message);
				} catch (ResponseHandlerException | RuntimeException e) {
					LOG.error("Error while flushing push message", e);
				}
			}
			pendingPushMessages.clear();
			callPending = false;
		}
	}

	@Override
	public void onPrecomputedServiceResult(String resourceId, String stateId, String serviceName,
			IServiceResult serviceResult) throws ResponseHandlerException {
		var message = new XtextWebPushMessage(resourceId, stateId, serviceName, serviceResult);
		synchronized (callPendingLock) {
			// If we're currently responding to a call we must delay any push messages until
			// the reply is sent, because push messages relating to the new state id must be
			// sent after the response with the new state id so that the client knows about
			// the new state when it receives the push message.
			if (callPending) {
				pendingPushMessages.add(message);
			} else {
				responseHandler.onResponse(message);
			}
		}
	}

	@Override
	public void onSubscribeToPrecomputationEvents(String resourceId, PushWebDocument document) {
		PushWebDocument previousDocument = null;
		var previousSubscription = subscriptions.get(resourceId);
		if (previousSubscription != null) {
			previousDocument = previousSubscription.get();
		}
		if (previousDocument == document) {
			return;
		}
		if (previousDocument != null) {
			previousDocument.removePrecomputationListener(this);
		}
		subscriptions.put(resourceId, new WeakReference<>(document));
	}

	/**
	 * Get the injector to satisfy the request in the {@code serviceContext}.
	 * Based on {@link org.eclipse.xtext.web.servlet.XtextServlet#getInjector}.
	 *
	 * @param context the Xtext service context of the request
	 * @return the injector for the Xtext language in the request
	 * @throws UnknownLanguageException if the Xtext language cannot be determined
	 */
	protected Injector getInjector(IServiceContext context) {
		IResourceServiceProvider resourceServiceProvider;
		var resourceName = context.getParameter("resource");
		if (resourceName == null) {
			resourceName = "";
		}
		var emfURI = URI.createURI(resourceName);
		var contentType = context.getParameter("contentType");
		if (Strings.isNullOrEmpty(contentType)) {
			resourceServiceProvider = resourceServiceProviderRegistry.getResourceServiceProvider(emfURI);
			if (resourceServiceProvider == null) {
				if (emfURI.toString().isEmpty()) {
					throw new UnknownLanguageException(
							"Unable to identify the Xtext language: missing parameter 'resource' or 'contentType'.");
				} else {
					throw new UnknownLanguageException(
							"Unable to identify the Xtext language for resource " + emfURI + ".");
				}
			}
		} else {
			resourceServiceProvider = resourceServiceProviderRegistry.getResourceServiceProvider(emfURI, contentType);
			if (resourceServiceProvider == null) {
				throw new UnknownLanguageException(
						"Unable to identify the Xtext language for contentType " + contentType + ".");
			}
		}
		return resourceServiceProvider.get(Injector.class);
	}

	@Override
	public void dispose() {
		disposed = true;
		for (var subscription : subscriptions.values()) {
			var document = subscription.get();
			if (document != null) {
				document.removePrecomputationListener(this);
				document.dispose();
			}
		}
	}
}