From b7a46b805bd7fbb3b21a48a035698ab11fadcb7c Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Sat, 19 Aug 2023 14:39:39 +0200 Subject: feat: interruptible VIATRA engine Reduce server load by introducing a timeout for semantics analysis. --- .../language/web/semantics/SemanticsService.java | 129 +++++--------------- .../language/web/semantics/SemanticsWorker.java | 133 +++++++++++++++++++++ .../web/xtext/server/TransactionExecutor.java | 46 +++++-- .../web/xtext/server/push/PushWebDocument.java | 13 +- .../xtext/server/push/PushWebDocumentProvider.java | 9 +- .../language/web/xtext/servlet/XtextWebSocket.java | 14 ++- 6 files changed, 219 insertions(+), 125 deletions(-) create mode 100644 subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsWorker.java (limited to 'subprojects/language-web') diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsService.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsService.java index 2495430e..39191162 100644 --- a/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsService.java +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsService.java @@ -5,8 +5,6 @@ */ package tools.refinery.language.web.semantics; -import com.google.gson.JsonArray; -import com.google.gson.JsonObject; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; @@ -14,43 +12,29 @@ import org.eclipse.xtext.service.OperationCanceledManager; import org.eclipse.xtext.util.CancelIndicator; import org.eclipse.xtext.web.server.model.AbstractCachedService; import org.eclipse.xtext.web.server.model.IXtextWebDocument; -import org.eclipse.xtext.web.server.model.XtextWebDocument; import org.eclipse.xtext.web.server.validation.ValidationService; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import tools.refinery.language.model.problem.Problem; -import tools.refinery.language.semantics.model.ModelInitializer; -import tools.refinery.language.semantics.model.SemanticsUtils; -import tools.refinery.store.model.Model; -import tools.refinery.store.model.ModelStore; -import tools.refinery.store.query.viatra.ViatraModelQueryAdapter; -import tools.refinery.store.reasoning.ReasoningAdapter; -import tools.refinery.store.reasoning.ReasoningStoreAdapter; -import tools.refinery.store.reasoning.literal.Concreteness; -import tools.refinery.store.reasoning.representation.PartialRelation; -import tools.refinery.store.representation.TruthValue; -import tools.refinery.store.tuple.Tuple; +import tools.refinery.language.web.xtext.server.push.PushWebDocument; -import java.util.Arrays; -import java.util.List; -import java.util.TreeMap; +import java.util.concurrent.*; @Singleton public class SemanticsService extends AbstractCachedService { private static final Logger LOG = LoggerFactory.getLogger(SemanticsService.class); @Inject - private SemanticsUtils semanticsUtils; + private Provider workerProvider; @Inject - private ValidationService validationService; + private OperationCanceledManager operationCanceledManager; @Inject - private Provider initializerProvider; + private ValidationService validationService; - @Inject - private OperationCanceledManager operationCanceledManager; + private final ExecutorService executorService = Executors.newCachedThreadPool(); @Override public SemanticsResult compute(IXtextWebDocument doc, CancelIndicator cancelIndicator) { @@ -58,44 +42,42 @@ public class SemanticsService extends AbstractCachedService { if (LOG.isTraceEnabled()) { start = System.currentTimeMillis(); } - Problem problem = getProblem(doc, cancelIndicator); + var problem = getProblem(doc, cancelIndicator); if (problem == null) { return null; } - var initializer = initializerProvider.get(); - var builder = ModelStore.builder() - .with(ViatraModelQueryAdapter.builder()) - .with(ReasoningAdapter.builder() - .requiredInterpretations(Concreteness.PARTIAL)); - operationCanceledManager.checkCanceled(cancelIndicator); + var worker = workerProvider.get(); + worker.setProblem(problem,cancelIndicator); + var future = executorService.submit(worker); + SemanticsResult result = null; try { - var modelSeed = initializer.createModel(problem, builder); - operationCanceledManager.checkCanceled(cancelIndicator); - var nodeTrace = getNodeTrace(initializer); - operationCanceledManager.checkCanceled(cancelIndicator); - var store = builder.build(); - operationCanceledManager.checkCanceled(cancelIndicator); - var model = store.getAdapter(ReasoningStoreAdapter.class).createInitialModel(modelSeed); - operationCanceledManager.checkCanceled(cancelIndicator); - var partialInterpretation = getPartialInterpretation(initializer, model, cancelIndicator); - if (LOG.isTraceEnabled()) { - long end = System.currentTimeMillis(); - LOG.trace("Computed semantics for {} ({}) in {}ms", doc.getResourceId(), doc.getStateId(), - end - start); - } - return new SemanticsSuccessResult(nodeTrace, partialInterpretation); - } catch (RuntimeException e) { - LOG.debug("Error while computing semantics", e); - return new SemanticsErrorResult(e.getMessage()); + result = future.get(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + future.cancel(true); + LOG.error("Semantics service interrupted", e); + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + operationCanceledManager.propagateAsErrorIfCancelException(e.getCause()); + throw new IllegalStateException(e); + } catch (TimeoutException e) { + future.cancel(true); + LOG.trace("Semantics service timeout", e); + return new SemanticsErrorResult("Partial interpretation timed out"); } + if (LOG.isTraceEnabled()) { + long end = System.currentTimeMillis(); + LOG.trace("Computed semantics for {} ({}) in {}ms", doc.getResourceId(), doc.getStateId(), + end - start); + } + return result; } @Nullable private Problem getProblem(IXtextWebDocument doc, CancelIndicator cancelIndicator) { - if (!(doc instanceof XtextWebDocument webDoc)) { + if (!(doc instanceof PushWebDocument pushDoc)) { throw new IllegalArgumentException("Unexpected IXtextWebDocument: " + doc); } - var validationResult = webDoc.getCachedServiceResult(validationService, cancelIndicator, true); + var validationResult = pushDoc.getCachedServiceResult(validationService, cancelIndicator, true); boolean hasError = validationResult.getIssues().stream() .anyMatch(issue -> "error".equals(issue.getSeverity())); if (hasError) { @@ -111,53 +93,4 @@ public class SemanticsService extends AbstractCachedService { } return problem; } - - private List getNodeTrace(ModelInitializer initializer) { - var nodeTrace = new String[initializer.getNodeCount()]; - for (var entry : initializer.getNodeTrace().keyValuesView()) { - var node = entry.getOne(); - var index = entry.getTwo(); - nodeTrace[index] = semanticsUtils.getName(node).orElse(null); - } - return Arrays.asList(nodeTrace); - } - - private JsonObject getPartialInterpretation(ModelInitializer initializer, Model model, - CancelIndicator cancelIndicator) { - var adapter = model.getAdapter(ReasoningAdapter.class); - var json = new JsonObject(); - for (var entry : initializer.getRelationTrace().entrySet()) { - var relation = entry.getKey(); - var partialSymbol = entry.getValue(); - var tuples = getTuplesJson(adapter, partialSymbol); - var name = semanticsUtils.getName(relation).orElse(partialSymbol.name()); - json.add(name, tuples); - operationCanceledManager.checkCanceled(cancelIndicator); - } - return json; - } - - private static JsonArray getTuplesJson(ReasoningAdapter adapter, PartialRelation partialSymbol) { - var interpretation = adapter.getPartialInterpretation(Concreteness.PARTIAL, partialSymbol); - var cursor = interpretation.getAll(); - var map = new TreeMap(); - while (cursor.move()) { - map.put(cursor.getKey(), cursor.getValue()); - } - var tuples = new JsonArray(); - for (var entry : map.entrySet()) { - tuples.add(toArray(entry.getKey(), entry.getValue())); - } - return tuples; - } - - private static JsonArray toArray(Tuple tuple, TruthValue value) { - int arity = tuple.getSize(); - var json = new JsonArray(arity + 1); - for (int i = 0; i < arity; i++) { - json.add(tuple.get(i)); - } - json.add(value.toString()); - return json; - } } diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsWorker.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsWorker.java new file mode 100644 index 00000000..25589260 --- /dev/null +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/semantics/SemanticsWorker.java @@ -0,0 +1,133 @@ +/* + * SPDX-FileCopyrightText: 2023 The Refinery Authors + * + * SPDX-License-Identifier: EPL-2.0 + */ +package tools.refinery.language.web.semantics; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.inject.Inject; +import org.eclipse.xtext.service.OperationCanceledManager; +import org.eclipse.xtext.util.CancelIndicator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tools.refinery.language.model.problem.Problem; +import tools.refinery.language.semantics.model.ModelInitializer; +import tools.refinery.language.semantics.model.SemanticsUtils; +import tools.refinery.store.model.Model; +import tools.refinery.store.model.ModelStore; +import tools.refinery.store.query.viatra.ViatraModelQueryAdapter; +import tools.refinery.store.reasoning.ReasoningAdapter; +import tools.refinery.store.reasoning.ReasoningStoreAdapter; +import tools.refinery.store.reasoning.literal.Concreteness; +import tools.refinery.store.reasoning.representation.PartialRelation; +import tools.refinery.store.representation.TruthValue; +import tools.refinery.store.tuple.Tuple; +import tools.refinery.viatra.runtime.CancellationToken; + +import java.util.Arrays; +import java.util.List; +import java.util.TreeMap; +import java.util.concurrent.Callable; + +class SemanticsWorker implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(SemanticsWorker.class); + + @Inject + private SemanticsUtils semanticsUtils; + + @Inject + private OperationCanceledManager operationCanceledManager; + + @Inject + private ModelInitializer initializer; + + private Problem problem; + + private CancellationToken cancellationToken; + + public void setProblem(Problem problem, CancelIndicator parentIndicator) { + this.problem = problem; + cancellationToken = () -> { + if (Thread.interrupted() || parentIndicator.isCanceled()) { + operationCanceledManager.throwOperationCanceledException(); + } + }; + } + + @Override + public SemanticsResult call() { + var builder = ModelStore.builder() + .with(ViatraModelQueryAdapter.builder() + .cancellationToken(cancellationToken)) + .with(ReasoningAdapter.builder() + .requiredInterpretations(Concreteness.PARTIAL)); + cancellationToken.checkCancelled(); + try { + var modelSeed = initializer.createModel(problem, builder); + cancellationToken.checkCancelled(); + var nodeTrace = getNodeTrace(initializer); + cancellationToken.checkCancelled(); + var store = builder.build(); + cancellationToken.checkCancelled(); + var model = store.getAdapter(ReasoningStoreAdapter.class).createInitialModel(modelSeed); + cancellationToken.checkCancelled(); + var partialInterpretation = getPartialInterpretation(initializer, model); + + return new SemanticsSuccessResult(nodeTrace, partialInterpretation); + } catch (RuntimeException e) { + LOG.debug("Error while computing semantics", e); + var message = e.getMessage(); + return new SemanticsErrorResult(message == null ? "Partial interpretation error" : e.getMessage()); + } + } + + private List getNodeTrace(ModelInitializer initializer) { + var nodeTrace = new String[initializer.getNodeCount()]; + for (var entry : initializer.getNodeTrace().keyValuesView()) { + var node = entry.getOne(); + var index = entry.getTwo(); + nodeTrace[index] = semanticsUtils.getName(node).orElse(null); + } + return Arrays.asList(nodeTrace); + } + + private JsonObject getPartialInterpretation(ModelInitializer initializer, Model model) { + var adapter = model.getAdapter(ReasoningAdapter.class); + var json = new JsonObject(); + for (var entry : initializer.getRelationTrace().entrySet()) { + var relation = entry.getKey(); + var partialSymbol = entry.getValue(); + var tuples = getTuplesJson(adapter, partialSymbol); + var name = semanticsUtils.getName(relation).orElse(partialSymbol.name()); + json.add(name, tuples); + cancellationToken.checkCancelled(); + } + return json; + } + + private static JsonArray getTuplesJson(ReasoningAdapter adapter, PartialRelation partialSymbol) { + var interpretation = adapter.getPartialInterpretation(Concreteness.PARTIAL, partialSymbol); + var cursor = interpretation.getAll(); + var map = new TreeMap(); + while (cursor.move()) { + map.put(cursor.getKey(), cursor.getValue()); + } + var tuples = new JsonArray(); + for (var entry : map.entrySet()) { + tuples.add(toArray(entry.getKey(), entry.getValue())); + } + return tuples; + } + + private static JsonArray toArray(Tuple tuple, TruthValue value) { + int arity = tuple.getSize(); + var json = new JsonArray(arity + 1); + for (int i = 0; i < arity; i++) { + json.add(tuple.get(i)); + } + json.add(value.toString()); + return json; + } +} diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/TransactionExecutor.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/TransactionExecutor.java index 2c0e9329..74456604 100644 --- a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/TransactionExecutor.java +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/TransactionExecutor.java @@ -42,6 +42,8 @@ public class TransactionExecutor implements IDisposable, PrecomputationListener private final List pendingPushMessages = new ArrayList<>(); + private volatile boolean disposed; + public TransactionExecutor(ISession session, IResourceServiceProvider.Registry resourceServiceProviderRegistry) { this.session = session; this.resourceServiceProviderRegistry = resourceServiceProviderRegistry; @@ -52,10 +54,13 @@ public class TransactionExecutor implements IDisposable, PrecomputationListener } public void handleRequest(XtextWebRequest request) throws ResponseHandlerException { + if (disposed) { + return; + } var serviceContext = new SimpleServiceContext(session, request.getRequestData()); var ping = serviceContext.getParameter("ping"); if (ping != null) { - responseHandler.onResponse(new XtextWebOkResponse(request, new PongResult(ping))); + onResponse(new XtextWebOkResponse(request, new PongResult(ping))); return; } synchronized (callPendingLock) { @@ -72,23 +77,36 @@ public class TransactionExecutor implements IDisposable, PrecomputationListener var serviceDispatcher = injector.getInstance(XtextServiceDispatcher.class); var service = serviceDispatcher.getService(new SubscribingServiceContext(serviceContext, this)); var serviceResult = service.getService().apply(); - responseHandler.onResponse(new XtextWebOkResponse(request, serviceResult)); + onResponse(new XtextWebOkResponse(request, serviceResult)); } catch (InvalidRequestException e) { - responseHandler.onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.REQUEST_ERROR, e)); + onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.REQUEST_ERROR, e)); } catch (RuntimeException e) { - responseHandler.onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.SERVER_ERROR, e)); + onResponse(new XtextWebErrorResponse(request, XtextWebErrorKind.SERVER_ERROR, e)); } finally { - synchronized (callPendingLock) { - for (var message : pendingPushMessages) { - try { - responseHandler.onResponse(message); - } catch (ResponseHandlerException | RuntimeException e) { - LOG.error("Error while flushing push message", e); - } + flushPendingPushMessages(); + } + } + + private void onResponse(XtextWebResponse response) throws ResponseHandlerException { + if (!disposed) { + responseHandler.onResponse(response); + } + } + + private void flushPendingPushMessages() { + synchronized (callPendingLock) { + for (var message : pendingPushMessages) { + if (disposed) { + return; + } + try { + responseHandler.onResponse(message); + } catch (ResponseHandlerException | RuntimeException e) { + LOG.error("Error while flushing push message", e); } - pendingPushMessages.clear(); - callPending = false; } + pendingPushMessages.clear(); + callPending = false; } } @@ -164,10 +182,12 @@ public class TransactionExecutor implements IDisposable, PrecomputationListener @Override public void dispose() { + disposed = true; for (var subscription : subscriptions.values()) { var document = subscription.get(); if (document != null) { document.removePrecomputationListener(this); + document.cancelBackgroundWork(); } } } diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocument.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocument.java index dfbd4878..1542c694 100644 --- a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocument.java +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocument.java @@ -27,11 +27,11 @@ public class PushWebDocument extends XtextWebDocument { private final Map, IServiceResult> precomputedServices = new HashMap<>(); + private final DocumentSynchronizer synchronizer; + public PushWebDocument(String resourceId, DocumentSynchronizer synchronizer) { super(resourceId, synchronizer); - if (resourceId == null) { - throw new IllegalArgumentException("resourceId must not be null"); - } + this.synchronizer = synchronizer; } public void addPrecomputationListener(PrecomputationListener listener) { @@ -63,6 +63,9 @@ public class PushWebDocument extends XtextWebDocument { private void notifyPrecomputationListeners(String serviceName, T result) { var resourceId = getResourceId(); + if (resourceId == null) { + return; + } var stateId = getStateId(); List copyOfListeners; synchronized (precomputationListeners) { @@ -83,4 +86,8 @@ public class PushWebDocument extends XtextWebDocument { } } } + + public void cancelBackgroundWork() { + synchronizer.setCanceled(true); + } } diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocumentProvider.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocumentProvider.java index b6f4fb43..ec6204ef 100644 --- a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocumentProvider.java +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/server/push/PushWebDocumentProvider.java @@ -27,12 +27,7 @@ public class PushWebDocumentProvider implements IWebDocumentProvider { @Override public XtextWebDocument get(String resourceId, IServiceContext serviceContext) { - if (resourceId == null) { - return new XtextWebDocument(null, synchronizerProvider.get()); - } else { - // We only need to send push messages if a resourceId is specified. - return new PushWebDocument(resourceId, - serviceContext.getSession().get(DocumentSynchronizer.class, () -> this.synchronizerProvider.get())); - } + return new PushWebDocument(resourceId, + serviceContext.getSession().get(DocumentSynchronizer.class, () -> this.synchronizerProvider.get())); } } diff --git a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/servlet/XtextWebSocket.java b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/servlet/XtextWebSocket.java index 043d318c..923fecd6 100644 --- a/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/servlet/XtextWebSocket.java +++ b/subprojects/language-web/src/main/java/tools/refinery/language/web/xtext/servlet/XtextWebSocket.java @@ -70,10 +70,11 @@ public class XtextWebSocket implements ResponseHandler { @OnWebSocketError public void onError(Throwable error) { + executor.dispose(); if (webSocketSession == null) { return; } - LOG.error("Internal websocket error in connection from" + webSocketSession.getRemoteSocketAddress(), error); + LOG.error("Internal websocket error in connection from " + webSocketSession.getRemoteSocketAddress(), error); } @OnWebSocketMessage @@ -86,14 +87,18 @@ public class XtextWebSocket implements ResponseHandler { try { request = gson.fromJson(reader, XtextWebRequest.class); } catch (JsonIOException e) { - LOG.error("Cannot read from websocket from" + webSocketSession.getRemoteSocketAddress(), e); + LOG.error("Cannot read from websocket from " + webSocketSession.getRemoteSocketAddress(), e); if (webSocketSession.isOpen()) { + executor.dispose(); webSocketSession.close(StatusCode.SERVER_ERROR, "Cannot read payload", Callback.NOOP); } return; } catch (JsonParseException e) { - LOG.warn("Malformed websocket request from" + webSocketSession.getRemoteSocketAddress(), e); - webSocketSession.close(XtextStatusCode.INVALID_JSON, "Invalid JSON payload", Callback.NOOP); + LOG.warn("Malformed websocket request from " + webSocketSession.getRemoteSocketAddress(), e); + if (webSocketSession.isOpen()) { + executor.dispose(); + webSocketSession.close(XtextStatusCode.INVALID_JSON, "Invalid JSON payload", Callback.NOOP); + } return; } try { @@ -101,6 +106,7 @@ public class XtextWebSocket implements ResponseHandler { } catch (ResponseHandlerException e) { LOG.warn("Cannot write websocket response", e); if (webSocketSession.isOpen()) { + executor.dispose(); webSocketSession.close(StatusCode.SERVER_ERROR, "Cannot write response", Callback.NOOP); } } -- cgit v1.2.3-54-g00ecf