aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java108
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java171
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java31
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java408
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java30
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java62
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java376
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java346
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java39
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java28
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java85
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java34
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java14
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java729
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java123
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java82
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java19
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java31
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java103
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java467
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java19
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java32
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java34
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java124
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java164
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java86
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java149
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java36
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java171
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java216
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java81
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java102
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java48
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java81
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java27
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java30
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java74
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java95
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java33
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java32
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java30
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java78
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java23
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java109
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java117
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java163
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java218
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java135
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java150
49 files changed, 5943 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java
new file mode 100644
index 00000000..2469d6bd
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java
@@ -0,0 +1,108 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2012, Bergmann Gabor, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network;
10
11import java.util.Collections;
12import java.util.HashSet;
13import java.util.Set;
14import java.util.TreeSet;
15
16import tools.refinery.viatra.runtime.rete.traceability.PatternTraceInfo;
17import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
18
19/**
20 * Base implementation for a Rete node.
21 *
22 * @author Bergmann Gabor
23 *
24 */
25public abstract class BaseNode implements Node {
26
27 protected ReteContainer reteContainer;
28 protected long nodeId;
29 protected Object tag;
30 protected Set<TraceInfo> traceInfos;
31
32 /**
33 * @param reteContainer
34 * the container to create this node in
35 */
36 public BaseNode(ReteContainer reteContainer) {
37 super();
38 this.reteContainer = reteContainer;
39 this.nodeId = reteContainer.registerNode(this);
40 this.traceInfos = new HashSet<TraceInfo>();
41 }
42
43 @Override
44 public String toString() {
45 if (tag != null)
46 return toStringCore() + "->" + getTraceInfoPatternsEnumerated() + "{" + tag.toString() + "}";
47 else
48 return toStringCore() + "->" + getTraceInfoPatternsEnumerated();
49 }
50
51 /**
52 * clients should override this to append before the tag / trace indicators
53 */
54 protected String toStringCore() {
55 return "[" + nodeId + "]" + getClass().getSimpleName();
56 }
57
58 @Override
59 public ReteContainer getContainer() {
60 return reteContainer;
61 }
62
63 @Override
64 public long getNodeId() {
65 return nodeId;
66 }
67
68 @Override
69 public Object getTag() {
70 return tag;
71 }
72
73 @Override
74 public void setTag(Object tag) {
75 this.tag = tag;
76 }
77
78 @Override
79 public Set<TraceInfo> getTraceInfos() {
80 return Collections.unmodifiableSet(traceInfos);
81 }
82
83 @Override
84 public void assignTraceInfo(TraceInfo traceInfo) {
85 traceInfos.add(traceInfo);
86 traceInfo.assignNode(this);
87 }
88
89 @Override
90 public void acceptPropagatedTraceInfo(TraceInfo traceInfo) {
91 assignTraceInfo(traceInfo);
92 }
93
94 /**
95 * Descendants should use this in e.g. logging
96 */
97 protected String getTraceInfoPatternsEnumerated() {
98 TreeSet<String> patternNames = new TreeSet<String>();
99 for (TraceInfo trInfo : traceInfos) {
100 if (trInfo instanceof PatternTraceInfo) {
101 final String pName = ((PatternTraceInfo) trInfo).getPatternName();
102 patternNames.add(pName);
103 }
104 }
105 return patternNames.toString();
106 }
107
108} \ No newline at end of file
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java
new file mode 100644
index 00000000..b261d19d
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java
@@ -0,0 +1,171 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10package tools.refinery.viatra.runtime.rete.network;
11
12import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
13import tools.refinery.viatra.runtime.rete.aggregation.IndexerBasedAggregatorNode;
14import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
15import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode;
16import tools.refinery.viatra.runtime.rete.index.DualInputNode;
17import tools.refinery.viatra.runtime.rete.index.Indexer;
18import tools.refinery.viatra.runtime.rete.index.IterableIndexer;
19import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
20import tools.refinery.viatra.runtime.rete.recipes.*;
21import tools.refinery.viatra.runtime.rete.remote.Address;
22import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo;
23
24import java.util.ArrayList;
25import java.util.Collection;
26import java.util.List;
27
28/**
29 * Class responsible for connecting freshly instantiating Rete nodes to their parents.
30 *
31 * @author Bergmann Gabor
32 *
33 */
34class ConnectionFactory {
35 ReteContainer reteContainer;
36
37 public ConnectionFactory(ReteContainer reteContainer) {
38 super();
39 this.reteContainer = reteContainer;
40 }
41
42 // TODO move to node implementation instead?
43 private boolean isStateful(ReteNodeRecipe recipe) {
44 return recipe instanceof ProjectionIndexerRecipe || recipe instanceof IndexerBasedAggregatorRecipe
45 || recipe instanceof SingleColumnAggregatorRecipe || recipe instanceof ExpressionEnforcerRecipe
46 || recipe instanceof TransitiveClosureRecipe || recipe instanceof ProductionRecipe
47 || recipe instanceof UniquenessEnforcerRecipe || recipe instanceof RelationEvaluationRecipe;
48
49 }
50
51 /**
52 * PRE: nodes for parent recipes must already be created and registered
53 * <p>
54 * PRE: must not be an input node (for which {@link InputConnector} is responsible)
55 */
56 public void connectToParents(RecipeTraceInfo recipeTrace, Node freshNode) {
57 final ReteNodeRecipe recipe = recipeTrace.getRecipe();
58 if (recipe instanceof ConstantRecipe) {
59 // NO-OP
60 } else if (recipe instanceof InputRecipe) {
61 throw new IllegalArgumentException(
62 ConnectionFactory.class.getSimpleName() + " not intended for input connection: " + recipe);
63 } else if (recipe instanceof SingleParentNodeRecipe) {
64 final Receiver receiver = (Receiver) freshNode;
65 ReteNodeRecipe parentRecipe = ((SingleParentNodeRecipe) recipe).getParent();
66 connectToParent(recipe, receiver, parentRecipe);
67 } else if (recipe instanceof RelationEvaluationRecipe) {
68 List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents();
69 List<Supplier> parentSuppliers = new ArrayList<Supplier>();
70 for (final ReteNodeRecipe parentRecipe : parentRecipes) {
71 parentSuppliers.add(getSupplierForRecipe(parentRecipe));
72 }
73 ((RelationEvaluatorNode) freshNode).connectToParents(parentSuppliers);
74 } else if (recipe instanceof BetaRecipe) {
75 final DualInputNode beta = (DualInputNode) freshNode;
76 final ArrayList<RecipeTraceInfo> parentTraces = new ArrayList<RecipeTraceInfo>(
77 recipeTrace.getParentRecipeTraces());
78 Slots slots = avoidActiveNodeConflict(parentTraces.get(0), parentTraces.get(1));
79 beta.connectToIndexers(slots.primary, slots.secondary);
80 } else if (recipe instanceof IndexerBasedAggregatorRecipe) {
81 final IndexerBasedAggregatorNode aggregator = (IndexerBasedAggregatorNode) freshNode;
82 final IndexerBasedAggregatorRecipe aggregatorRecipe = (IndexerBasedAggregatorRecipe) recipe;
83 aggregator.initializeWith((ProjectionIndexer) resolveIndexer(aggregatorRecipe.getParent()));
84 } else if (recipe instanceof MultiParentNodeRecipe) {
85 final Receiver receiver = (Receiver) freshNode;
86 List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents();
87 for (ReteNodeRecipe parentRecipe : parentRecipes) {
88 connectToParent(recipe, receiver, parentRecipe);
89 }
90 }
91 }
92
93 private Indexer resolveIndexer(final IndexerRecipe indexerRecipe) {
94 final Address<? extends Node> address = reteContainer.getNetwork().getExistingNodeByRecipe(indexerRecipe);
95 return (Indexer) reteContainer.resolveLocal(address);
96 }
97
98 private void connectToParent(ReteNodeRecipe recipe, Receiver freshNode, ReteNodeRecipe parentRecipe) {
99 final Supplier parentSupplier = getSupplierForRecipe(parentRecipe);
100
101 // special synch
102 if (freshNode instanceof ReinitializedNode) {
103 Collection<Tuple> tuples = new ArrayList<Tuple>();
104 parentSupplier.pullInto(tuples, true);
105 ((ReinitializedNode) freshNode).reinitializeWith(tuples);
106 reteContainer.connect(parentSupplier, freshNode);
107 } else { // default case
108 // stateless nodes do not have to be synced with contents UNLESS they already have children (recursive
109 // corner case)
110 if (isStateful(recipe)
111 || ((freshNode instanceof Supplier) && !((Supplier) freshNode).getReceivers().isEmpty())) {
112 reteContainer.connectAndSynchronize(parentSupplier, freshNode);
113 } else {
114 // stateless node, no synch
115 reteContainer.connect(parentSupplier, freshNode);
116 }
117 }
118 }
119
120 private Supplier getSupplierForRecipe(ReteNodeRecipe recipe) {
121 @SuppressWarnings("unchecked")
122 final Address<? extends Supplier> parentAddress = (Address<? extends Supplier>) reteContainer.getNetwork()
123 .getExistingNodeByRecipe(recipe);
124 final Supplier supplier = reteContainer.getProvisioner().asSupplier(parentAddress);
125 return supplier;
126 }
127
128 /**
129 * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of
130 * the two indexers is supported.
131 *
132 * @return a replacement for the secondary Indexers, if needed
133 */
134 private Slots avoidActiveNodeConflict(final RecipeTraceInfo primarySlot, final RecipeTraceInfo secondarySlot) {
135 Slots result = new Slots() {
136 {
137 primary = (IterableIndexer) resolveIndexer((ProjectionIndexerRecipe) primarySlot.getRecipe());
138 secondary = resolveIndexer((IndexerRecipe) secondarySlot.getRecipe());
139 }
140 };
141 if (activeNodeConflict(result.primary, result.secondary))
142 if (result.secondary instanceof IterableIndexer)
143 result.secondary = resolveActiveIndexer(secondarySlot);
144 else
145 result.primary = (IterableIndexer) resolveActiveIndexer(primarySlot);
146 return result;
147 }
148
149 private Indexer resolveActiveIndexer(final RecipeTraceInfo inactiveIndexerTrace) {
150 final RecipeTraceInfo activeIndexerTrace = reteContainer.getProvisioner()
151 .accessActiveIndexer(inactiveIndexerTrace);
152 reteContainer.getProvisioner().getOrCreateNodeByRecipe(activeIndexerTrace);
153 return resolveIndexer((ProjectionIndexerRecipe) activeIndexerTrace.getRecipe());
154 }
155
156 private static class Slots {
157 IterableIndexer primary;
158 Indexer secondary;
159 }
160
161 /**
162 * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of
163 * the two indexers is supported.
164 *
165 * @return true if there is a conflict of active nodes.
166 */
167 private boolean activeNodeConflict(Indexer primarySlot, Indexer secondarySlot) {
168 return !primarySlot.equals(secondarySlot) && primarySlot.getActiveNode().equals(secondarySlot.getActiveNode());
169 }
170
171}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java
new file mode 100644
index 00000000..c22b06d8
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java
@@ -0,0 +1,31 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2017, Gabor Bergmann, IncQueryLabs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network;
10
11import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
12
13/**
14 * @author Gabor Bergmann
15 * @since 1.7
16 */
17public interface IGroupable {
18
19 /**
20 * @return the current group of the mailbox
21 * @since 1.7
22 */
23 CommunicationGroup getCurrentGroup();
24
25 /**
26 * Sets the current group of the mailbox
27 * @since 1.7
28 */
29 void setCurrentGroup(CommunicationGroup group);
30
31} \ No newline at end of file
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java
new file mode 100644
index 00000000..64f59ff3
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java
@@ -0,0 +1,408 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.ArrayList;
13import java.util.Collection;
14import java.util.Collections;
15import java.util.List;
16import java.util.Map;
17import java.util.Map.Entry;
18import java.util.Set;
19import java.util.concurrent.locks.Lock;
20import java.util.concurrent.locks.ReadWriteLock;
21import java.util.concurrent.locks.ReentrantReadWriteLock;
22
23import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
24import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
25import tools.refinery.viatra.runtime.matchers.util.Direction;
26import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
27import tools.refinery.viatra.runtime.rete.matcher.ReteEngine;
28import tools.refinery.viatra.runtime.rete.recipes.ReteNodeRecipe;
29import tools.refinery.viatra.runtime.rete.remote.Address;
30import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo;
31import tools.refinery.viatra.runtime.rete.util.Options;
32
33/**
34 * @author Gabor Bergmann
35 *
36 */
37public class Network {
38 final int threads;
39
40 protected ArrayList<ReteContainer> containers;
41 ReteContainer headContainer;
42 private int firstContainer = 0;
43 private int nextContainer = 0;
44
45 // the following fields exist only if threads > 0
46 protected Map<ReteContainer, Long> globalTerminationCriteria = null;
47 protected Map<ReteContainer, Long> reportedClocks = null;
48 protected Lock updateLock = null; // grab during normal update operations
49 protected Lock structuralChangeLock = null; // grab if the network structure
50 // is to
51 // be changed
52
53 // Knowledge of the outside world
54 private ReteEngine engine;
55 protected NodeFactory nodeFactory;
56 protected InputConnector inputConnector;
57
58 // Node and recipe administration
59 // incl. addresses for existing nodes by recipe (where available)
60 // Maintained by NodeProvisioner of each container
61 Map<ReteNodeRecipe, Address<? extends Node>> nodesByRecipe = CollectionsFactory.createMap();
62 Set<RecipeTraceInfo> recipeTraces = CollectionsFactory.createSet();
63
64 /**
65 * @throws IllegalStateException
66 * if no node has been constructed for the recipe
67 */
68 public synchronized Address<? extends Node> getExistingNodeByRecipe(ReteNodeRecipe recipe) {
69 final Address<? extends Node> node = nodesByRecipe.get(recipe);
70 if (node == null)
71 throw new IllegalStateException(String.format("Rete node for recipe %s not constructed yet.", recipe));
72 return node;
73 }
74
75 /**
76 * @return null if no node has been constructed for the recipe
77 */
78 public synchronized Address<? extends Node> getNodeByRecipeIfExists(ReteNodeRecipe recipe) {
79 final Address<? extends Node> node = nodesByRecipe.get(recipe);
80 return node;
81 }
82
83 /**
84 * @param threads
85 * the number of threads to operate the network with; 0 means single-threaded operation, 1 starts an
86 * asynchronous thread to operate the RETE net, >1 uses multiple RETE containers.
87 */
88 public Network(int threads, ReteEngine engine) {
89 super();
90 this.threads = threads;
91 this.engine = engine;
92 this.inputConnector = new InputConnector(this);
93 this.nodeFactory = new NodeFactory(engine.getLogger());
94
95 containers = new ArrayList<ReteContainer>();
96 firstContainer = (threads > 1) ? Options.firstFreeContainer : 0; // NOPMD
97 nextContainer = firstContainer;
98
99 if (threads > 0) {
100 globalTerminationCriteria = CollectionsFactory.createMap();
101 reportedClocks = CollectionsFactory.createMap();
102 ReadWriteLock rwl = new ReentrantReadWriteLock();
103 updateLock = rwl.readLock();
104 structuralChangeLock = rwl.writeLock();
105 for (int i = 0; i < threads; ++i)
106 containers.add(new ReteContainer(this, true));
107 } else
108 containers.add(new ReteContainer(this, false));
109
110 headContainer = containers.get(0);
111 }
112
113 /**
114 * Kills this Network along with all containers and message consumption cycles.
115 */
116 public void kill() {
117 for (ReteContainer container : containers) {
118 container.kill();
119 }
120 containers.clear();
121 }
122
123 /**
124 * Returns the head container, that is guaranteed to reside in the same JVM as the Network object.
125 */
126 public ReteContainer getHeadContainer() {
127 return headContainer;
128 }
129
130 /**
131 * Returns the next container in round-robin fashion. Configurable not to yield head container.
132 */
133 public ReteContainer getNextContainer() {
134 if (nextContainer >= containers.size())
135 nextContainer = firstContainer;
136 return containers.get(nextContainer++);
137 }
138
139 /**
140 * Internal message delivery method.
141 *
142 * @pre threads > 0
143 */
144 private void sendUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
145 ReteContainer affectedContainer = receiver.getContainer();
146 synchronized (globalTerminationCriteria) {
147 long newCriterion = affectedContainer.sendUpdateToLocalAddress(receiver, direction, updateElement);
148 terminationCriterion(affectedContainer, newCriterion);
149 }
150 }
151
152 /**
153 * Internal message delivery method for single-threaded operation
154 *
155 * @pre threads == 0
156 */
157 private void sendUpdateSingleThreaded(Address<? extends Receiver> receiver, Direction direction,
158 Tuple updateElement) {
159 ReteContainer affectedContainer = receiver.getContainer();
160 affectedContainer.sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement);
161 }
162
163 /**
164 * Internal message delivery method.
165 *
166 * @pre threads > 0
167 */
168 private void sendUpdates(Address<? extends Receiver> receiver, Direction direction,
169 Collection<Tuple> updateElements) {
170 if (updateElements.isEmpty())
171 return;
172 ReteContainer affectedContainer = receiver.getContainer();
173 synchronized (globalTerminationCriteria) {
174 long newCriterion = affectedContainer.sendUpdatesToLocalAddress(receiver, direction, updateElements);
175 terminationCriterion(affectedContainer, newCriterion);
176 }
177 }
178
179 /**
180 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may
181 * reside in any of the containers associated with this network. To be called from a user thread during normal
182 * operation, NOT during construction.
183 *
184 * @since 2.4
185 */
186 public void sendExternalUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
187 if (threads > 0) {
188 try {
189 updateLock.lock();
190 sendUpdate(receiver, direction, updateElement);
191 } finally {
192 updateLock.unlock();
193 }
194 } else {
195 sendUpdateSingleThreaded(receiver, direction, updateElement);
196 // getHeadContainer().
197 }
198 }
199
200 /**
201 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may
202 * reside in any of the containers associated with this network. To be called from a user thread during
203 * construction.
204 *
205 * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may
206 * span through network calls, that's why it's not enforced here )
207 *
208 * @return the value of the target container's clock at the time when the message was accepted into its message
209 * queue
210 * @since 2.4
211 */
212 public void sendConstructionUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
213 // structuralChangeLock.lock();
214 if (threads > 0)
215 sendUpdate(receiver, direction, updateElement);
216 else
217 receiver.getContainer().sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement);
218 // structuralChangeLock.unlock();
219 }
220
221 /**
222 * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial
223 * matching. The node may reside in any of the containers associated with this network. To be called from a user
224 * thread during construction.
225 *
226 * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may
227 * span through network calls, that's why it's not enforced here )
228 *
229 * @since 2.4
230 */
231 public void sendConstructionUpdates(Address<? extends Receiver> receiver, Direction direction,
232 Collection<Tuple> updateElements) {
233 // structuralChangeLock.lock();
234 if (threads > 0)
235 sendUpdates(receiver, direction, updateElements);
236 else
237 receiver.getContainer().sendUpdatesToLocalAddressSingleThreaded(receiver, direction, updateElements);
238 // structuralChangeLock.unlock();
239 }
240
241 /**
242 * Establishes connection between a supplier and a receiver node, regardless which container they are in. Not to be
243 * called remotely, because this method enforces the structural lock.
244 *
245 * @param supplier
246 * @param receiver
247 * @param synchronise
248 * indicates whether the receiver should be synchronised to the current contents of the supplier
249 */
250 public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
251 boolean synchronise) {
252 try {
253 if (threads > 0)
254 structuralChangeLock.lock();
255 receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise);
256 } finally {
257 if (threads > 0)
258 structuralChangeLock.unlock();
259 }
260 }
261
262 /**
263 * Severs connection between a supplier and a receiver node, regardless which container they are in. Not to be
264 * called remotely, because this method enforces the structural lock.
265 *
266 * @param supplier
267 * @param receiver
268 * @param desynchronise
269 * indicates whether the current contents of the supplier should be subtracted from the receiver
270 */
271 public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
272 boolean desynchronise) {
273 try {
274 if (threads > 0)
275 structuralChangeLock.lock();
276 receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise);
277 } finally {
278 if (threads > 0)
279 structuralChangeLock.unlock();
280 }
281 }
282
283 /**
284 * Containers use this method to report whenever they run out of messages in their queue.
285 *
286 * To be called from the thread of the reporting container.
287 *
288 * @pre threads > 0.
289 * @param reportingContainer
290 * the container reporting the emptiness of its message queue.
291 * @param clock
292 * the value of the container's clock when reporting.
293 * @param localTerminationCriteria
294 * the latest clock values this container has received from other containers since the last time it
295 * reported termination.
296 */
297 void reportLocalUpdateTermination(ReteContainer reportingContainer, long clock,
298 Map<ReteContainer, Long> localTerminationCriteria) {
299 synchronized (globalTerminationCriteria) {
300 for (Entry<ReteContainer, Long> criterion : localTerminationCriteria.entrySet()) {
301 terminationCriterion(criterion.getKey(), criterion.getValue());
302 }
303
304 reportedClocks.put(reportingContainer, clock);
305 Long criterion = globalTerminationCriteria.get(reportingContainer);
306 if (criterion != null && criterion < clock)
307 globalTerminationCriteria.remove(reportingContainer);
308
309 if (globalTerminationCriteria.isEmpty())
310 globalTerminationCriteria.notifyAll();
311 }
312 }
313
314 /**
315 * @pre threads > 0
316 */
317 private void terminationCriterion(ReteContainer affectedContainer, long newCriterion) {
318 synchronized (globalTerminationCriteria) {
319 Long oldCriterion = globalTerminationCriteria.get(affectedContainer);
320 Long oldClock = reportedClocks.get(affectedContainer);
321 long relevantClock = oldClock == null ? 0 : oldClock;
322 if ((relevantClock <= newCriterion) && (oldCriterion == null || oldCriterion < newCriterion)) {
323 globalTerminationCriteria.put(affectedContainer, newCriterion);
324 }
325 }
326 }
327
328 /**
329 * Waits until all rete update operations are settled in all containers. Returns immediately, if no updates are
330 * pending.
331 *
332 * To be called from any user thread.
333 */
334 public void waitForReteTermination() {
335 if (threads > 0) {
336 synchronized (globalTerminationCriteria) {
337 while (!globalTerminationCriteria.isEmpty()) {
338 try {
339 globalTerminationCriteria.wait();
340 } catch (InterruptedException e) {
341
342 }
343 }
344 }
345 } else
346 headContainer.deliverMessagesSingleThreaded();
347 }
348
349 /**
350 * Waits to execute action until all rete update operations are settled in all containers. Runs action and returns
351 * immediately, if no updates are pending. The given action is guaranteed to be run when the terminated state still
352 * persists.
353 *
354 * @param action
355 * the action to be run when reaching the steady-state.
356 *
357 * To be called from any user thread.
358 */
359 public void waitForReteTermination(Runnable action) {
360 if (threads > 0) {
361 synchronized (globalTerminationCriteria) {
362 while (!globalTerminationCriteria.isEmpty()) {
363 try {
364 globalTerminationCriteria.wait();
365 } catch (InterruptedException e) {
366
367 }
368 }
369 action.run();
370 }
371 } else {
372 headContainer.deliverMessagesSingleThreaded();
373 action.run();
374 }
375
376 }
377
378 /**
379 * @return an unmodifiable set of known recipe traces
380 */
381 public Set<RecipeTraceInfo> getRecipeTraces() {
382 return Collections.unmodifiableSet(recipeTraces);
383 }
384
385 /**
386 * @return an unmodifiable list of containers
387 */
388 public List<ReteContainer> getContainers() {
389 return Collections.unmodifiableList(containers);
390 }
391
392 public Lock getStructuralChangeLock() {
393 return structuralChangeLock;
394 }
395
396 public NodeFactory getNodeFactory() {
397 return nodeFactory;
398 }
399
400 public InputConnector getInputConnector() {
401 return inputConnector;
402 }
403
404 public ReteEngine getEngine() {
405 return engine;
406 }
407
408}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java
new file mode 100644
index 00000000..c6ba34c4
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java
@@ -0,0 +1,30 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network;
10
11import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
12
13/**
14 * {@link Node}s implementing this interface are sensitive to changes in the dependency graph maintained by the
15 * {@link CommunicationTracker}. The {@link CommunicationTracker} notifies these nodes whenever the SCC of this node is
16 * affected by changes to the dependency graph. Depending on whether this node is contained in a recursive group or not,
17 * it may behave differently, and the {@link NetworkStructureChangeSensitiveNode#networkStructureChanged()} method can
18 * be used to perform changes in behavior.
19 *
20 * @author Tamas Szabo
21 * @since 2.3
22 */
23public interface NetworkStructureChangeSensitiveNode extends Node {
24
25 /**
26 * At the time of the invocation, the dependency graph has already been updated.
27 */
28 public void networkStructureChanged();
29
30}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java
new file mode 100644
index 00000000..e8ab615a
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java
@@ -0,0 +1,62 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.Set;
13
14import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
15import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
16
17/**
18 * A node of a rete network, should be uniquely identified by network and nodeId. NodeId can be requested by registering
19 * at the Network on construction.
20 *
21 * @author Gabor Bergmann
22 */
23public interface Node {
24 /**
25 * @return the network this node belongs to.
26 */
27 ReteContainer getContainer();
28
29 /**
30 * @return the identifier unique to this node within the network.
31 */
32 long getNodeId();
33
34 /**
35 * Assigns a descriptive tag to the node
36 */
37 void setTag(Object tag);
38
39 /**
40 * @return the tag of the node
41 */
42 Object getTag();
43
44 /**
45 * @return unmodifiable view of the list of traceability infos assigned to this node
46 */
47 Set<TraceInfo> getTraceInfos();
48
49 /**
50 * assigns new traceability info to this node
51 */
52 void assignTraceInfo(TraceInfo traceInfo);
53 /**
54 * accepts traceability info propagated to this node
55 */
56 void acceptPropagatedTraceInfo(TraceInfo traceInfo);
57
58 default CommunicationTracker getCommunicationTracker() {
59 return getContainer().getCommunicationTracker();
60 }
61
62}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java
new file mode 100644
index 00000000..3e4ea4e0
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java
@@ -0,0 +1,376 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10package tools.refinery.viatra.runtime.rete.network;
11
12import org.apache.log4j.Logger;
13import org.eclipse.emf.common.util.EMap;
14import tools.refinery.viatra.runtime.rete.itc.alg.representative.RepresentativeElectionAlgorithm;
15import tools.refinery.viatra.runtime.rete.itc.alg.representative.StronglyConnectedComponentAlgorithm;
16import tools.refinery.viatra.runtime.rete.itc.alg.representative.WeaklyConnectedComponentAlgorithm;
17import tools.refinery.viatra.runtime.matchers.context.IPosetComparator;
18import tools.refinery.viatra.runtime.matchers.psystem.IExpressionEvaluator;
19import tools.refinery.viatra.runtime.matchers.psystem.IRelationEvaluator;
20import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
21import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
22import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
23import tools.refinery.viatra.runtime.rete.aggregation.ColumnAggregatorNode;
24import tools.refinery.viatra.runtime.rete.aggregation.CountNode;
25import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode;
26import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulParallelTimelyColumnAggregatorNode;
27import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulSequentialTimelyColumnAggregatorNode;
28import tools.refinery.viatra.runtime.rete.aggregation.timely.FirstOnlyParallelTimelyColumnAggregatorNode;
29import tools.refinery.viatra.runtime.rete.aggregation.timely.FirstOnlySequentialTimelyColumnAggregatorNode;
30import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode;
31import tools.refinery.viatra.runtime.rete.boundary.ExternalInputStatelessFilterNode;
32import tools.refinery.viatra.runtime.rete.eval.EvaluatorCore;
33import tools.refinery.viatra.runtime.rete.eval.MemorylessEvaluatorNode;
34import tools.refinery.viatra.runtime.rete.eval.OutputCachingEvaluatorNode;
35import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode;
36import tools.refinery.viatra.runtime.rete.index.ExistenceNode;
37import tools.refinery.viatra.runtime.rete.index.Indexer;
38import tools.refinery.viatra.runtime.rete.index.JoinNode;
39import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
40import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.AggregatorArchitecture;
41import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
42import tools.refinery.viatra.runtime.rete.misc.ConstantNode;
43import tools.refinery.viatra.runtime.rete.recipes.*;
44import tools.refinery.viatra.runtime.rete.single.*;
45import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
46
47import java.util.HashMap;
48import java.util.List;
49import java.util.Map;
50
51/**
52 * Factory for instantiating Rete nodes. The created nodes are not connected to the network yet.
53 *
54 * @author Bergmann Gabor
55 *
56 */
57class NodeFactory {
58 Logger logger;
59
60 public NodeFactory(Logger logger) {
61 super();
62 this.logger = logger;
63 }
64
65 /**
66 * PRE: parent node must already be created
67 */
68 public Indexer createIndexer(ReteContainer reteContainer, IndexerRecipe recipe, Supplier parentNode,
69 TraceInfo... traces) {
70
71 if (recipe instanceof ProjectionIndexerRecipe) {
72 return parentNode.constructIndex(toMask(recipe.getMask()), traces);
73 // already traced
74 } else if (recipe instanceof AggregatorIndexerRecipe) {
75 int indexOfAggregateResult = recipe.getParent().getArity();
76 int resultPosition = recipe.getMask().getSourceIndices().lastIndexOf(indexOfAggregateResult);
77
78 IAggregatorNode aggregatorNode = (IAggregatorNode) parentNode;
79 final Indexer result = (resultPosition == -1) ? aggregatorNode.getAggregatorOuterIndexer()
80 : aggregatorNode.getAggregatorOuterIdentityIndexer(resultPosition);
81
82 for (TraceInfo traceInfo : traces)
83 result.assignTraceInfo(traceInfo);
84 return result;
85 } else
86 throw new IllegalArgumentException("Unkown Indexer recipe: " + recipe);
87 }
88
89 /**
90 * PRE: recipe is not an indexer recipe.
91 */
92 public Supplier createNode(ReteContainer reteContainer, ReteNodeRecipe recipe, TraceInfo... traces) {
93 if (recipe instanceof IndexerRecipe)
94 throw new IllegalArgumentException("Indexers are not created by NodeFactory: " + recipe);
95
96 Supplier result = instantiateNodeDispatch(reteContainer, recipe);
97 for (TraceInfo traceInfo : traces)
98 result.assignTraceInfo(traceInfo);
99 return result;
100 }
101
102 private Supplier instantiateNodeDispatch(ReteContainer reteContainer, ReteNodeRecipe recipe) {
103
104 // Parentless
105
106 if (recipe instanceof ConstantRecipe)
107 return instantiateNode(reteContainer, (ConstantRecipe) recipe);
108 if (recipe instanceof InputRecipe)
109 return instantiateNode(reteContainer, (InputRecipe) recipe);
110
111 // SingleParentNodeRecipe
112
113 // if (recipe instanceof ProjectionIndexer)
114 // return instantiateNode((ProjectionIndexer)recipe);
115 if (recipe instanceof InputFilterRecipe)
116 return instantiateNode(reteContainer, (InputFilterRecipe) recipe);
117 if (recipe instanceof InequalityFilterRecipe)
118 return instantiateNode(reteContainer, (InequalityFilterRecipe) recipe);
119 if (recipe instanceof EqualityFilterRecipe)
120 return instantiateNode(reteContainer, (EqualityFilterRecipe) recipe);
121 if (recipe instanceof TransparentRecipe)
122 return instantiateNode(reteContainer, (TransparentRecipe) recipe);
123 if (recipe instanceof TrimmerRecipe)
124 return instantiateNode(reteContainer, (TrimmerRecipe) recipe);
125 if (recipe instanceof TransitiveClosureRecipe)
126 return instantiateNode(reteContainer, (TransitiveClosureRecipe) recipe);
127 if (recipe instanceof RepresentativeElectionRecipe)
128 return instantiateNode(reteContainer, (RepresentativeElectionRecipe) recipe);
129 if (recipe instanceof RelationEvaluationRecipe)
130 return instantiateNode(reteContainer, (RelationEvaluationRecipe) recipe);
131 if (recipe instanceof ExpressionEnforcerRecipe)
132 return instantiateNode(reteContainer, (ExpressionEnforcerRecipe) recipe);
133 if (recipe instanceof CountAggregatorRecipe)
134 return instantiateNode(reteContainer, (CountAggregatorRecipe) recipe);
135 if (recipe instanceof SingleColumnAggregatorRecipe)
136 return instantiateNode(reteContainer, (SingleColumnAggregatorRecipe) recipe);
137 if (recipe instanceof DiscriminatorDispatcherRecipe)
138 return instantiateNode(reteContainer, (DiscriminatorDispatcherRecipe) recipe);
139 if (recipe instanceof DiscriminatorBucketRecipe)
140 return instantiateNode(reteContainer, (DiscriminatorBucketRecipe) recipe);
141
142 // MultiParentNodeRecipe
143 if (recipe instanceof UniquenessEnforcerRecipe)
144 return instantiateNode(reteContainer, (UniquenessEnforcerRecipe) recipe);
145 if (recipe instanceof ProductionRecipe)
146 return instantiateNode(reteContainer, (ProductionRecipe) recipe);
147
148 // BetaNodeRecipe
149 if (recipe instanceof JoinRecipe)
150 return instantiateNode(reteContainer, (JoinRecipe) recipe);
151 if (recipe instanceof SemiJoinRecipe)
152 return instantiateNode(reteContainer, (SemiJoinRecipe) recipe);
153 if (recipe instanceof AntiJoinRecipe)
154 return instantiateNode(reteContainer, (AntiJoinRecipe) recipe);
155
156 // ... else
157 throw new IllegalArgumentException("Unsupported recipe type: " + recipe);
158 }
159
160 // INSTANTIATION for recipe types
161
162 private Supplier instantiateNode(ReteContainer reteContainer, InputRecipe recipe) {
163 return new ExternalInputEnumeratorNode(reteContainer);
164 }
165
166 private Supplier instantiateNode(ReteContainer reteContainer, InputFilterRecipe recipe) {
167 return new ExternalInputStatelessFilterNode(reteContainer, toMaskOrNull(recipe.getMask()));
168 }
169
170 private Supplier instantiateNode(ReteContainer reteContainer, CountAggregatorRecipe recipe) {
171 return new CountNode(reteContainer);
172 }
173
174 private Supplier instantiateNode(ReteContainer reteContainer, TransparentRecipe recipe) {
175 return new TransparentNode(reteContainer);
176 }
177
178 private Supplier instantiateNode(ReteContainer reteContainer, ExpressionEnforcerRecipe recipe) {
179 final IExpressionEvaluator evaluator = toIExpressionEvaluator(recipe.getExpression());
180 final Map<String, Integer> posMapping = toStringIndexMap(recipe.getMappedIndices());
181 final int sourceTupleWidth = recipe.getParent().getArity();
182 EvaluatorCore core = null;
183 if (recipe instanceof CheckRecipe) {
184 core = new EvaluatorCore.PredicateEvaluatorCore(logger, evaluator, posMapping, sourceTupleWidth);
185 } else if (recipe instanceof EvalRecipe) {
186 final boolean isUnwinding = ((EvalRecipe) recipe).isUnwinding();
187 core = new EvaluatorCore.FunctionEvaluatorCore(logger, evaluator, posMapping, sourceTupleWidth, isUnwinding);
188 } else {
189 throw new IllegalArgumentException("Unhandled expression enforcer recipe: " + recipe.getClass() + "!");
190 }
191 if (recipe.isCacheOutput()) {
192 return new OutputCachingEvaluatorNode(reteContainer, core);
193 } else {
194 return new MemorylessEvaluatorNode(reteContainer, core);
195 }
196 }
197
198 @SuppressWarnings({ "rawtypes", "unchecked" })
199 private Supplier instantiateNode(ReteContainer reteContainer, SingleColumnAggregatorRecipe recipe) {
200 final IMultisetAggregationOperator operator = recipe.getMultisetAggregationOperator();
201 TupleMask coreMask = null;
202 if (recipe.getOptionalMonotonicityInfo() != null) {
203 coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask());
204 } else {
205 coreMask = toMask(recipe.getGroupByMask());
206 }
207
208 if (reteContainer.isTimelyEvaluation()) {
209 final TimelyConfiguration timelyConfiguration = reteContainer.getTimelyConfiguration();
210 final AggregatorArchitecture aggregatorArchitecture = timelyConfiguration.getAggregatorArchitecture();
211 final TimelineRepresentation timelineRepresentation = timelyConfiguration.getTimelineRepresentation();
212
213 TupleMask posetMask = null;
214
215 if (recipe.getOptionalMonotonicityInfo() != null) {
216 posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask());
217 } else {
218 final int aggregatedColumn = recipe.getAggregableIndex();
219 posetMask = TupleMask.selectSingle(aggregatedColumn, coreMask.sourceWidth);
220 }
221
222 if (timelineRepresentation == TimelineRepresentation.FIRST_ONLY
223 && aggregatorArchitecture == AggregatorArchitecture.SEQUENTIAL) {
224 return new FirstOnlySequentialTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask);
225 } else if (timelineRepresentation == TimelineRepresentation.FIRST_ONLY
226 && aggregatorArchitecture == AggregatorArchitecture.PARALLEL) {
227 return new FirstOnlyParallelTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask);
228 } else if (timelineRepresentation == TimelineRepresentation.FAITHFUL
229 && aggregatorArchitecture == AggregatorArchitecture.SEQUENTIAL) {
230 return new FaithfulSequentialTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask);
231 } else if (timelineRepresentation == TimelineRepresentation.FAITHFUL
232 && aggregatorArchitecture == AggregatorArchitecture.PARALLEL) {
233 return new FaithfulParallelTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask);
234 } else {
235 throw new IllegalArgumentException("Unsupported timely configuration!");
236 }
237 } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) {
238 final TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask());
239 final IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo()
240 .getPosetComparator();
241 return new ColumnAggregatorNode(reteContainer, operator, recipe.isDeleteRederiveEvaluation(), coreMask,
242 posetMask, posetComparator);
243 } else {
244 final int aggregatedColumn = recipe.getAggregableIndex();
245 return new ColumnAggregatorNode(reteContainer, operator, coreMask, aggregatedColumn);
246 }
247 }
248
249 private Supplier instantiateNode(ReteContainer reteContainer, TransitiveClosureRecipe recipe) {
250 return new TransitiveClosureNode(reteContainer);
251 }
252
253 private Supplier instantiateNode(ReteContainer reteContainer, RepresentativeElectionRecipe recipe) {
254 RepresentativeElectionAlgorithm.Factory algorithmFactory = switch (recipe.getConnectivity()) {
255 case STRONG -> StronglyConnectedComponentAlgorithm::new;
256 case WEAK -> WeaklyConnectedComponentAlgorithm::new;
257 };
258 return new RepresentativeElectionNode(reteContainer, algorithmFactory);
259 }
260
261 private Supplier instantiateNode(ReteContainer reteContainer, RelationEvaluationRecipe recipe) {
262 return new RelationEvaluatorNode(reteContainer, toIRelationEvaluator(recipe.getEvaluator()));
263 }
264
265 private Supplier instantiateNode(ReteContainer reteContainer, ProductionRecipe recipe) {
266 if (reteContainer.isTimelyEvaluation()) {
267 return new TimelyProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices()));
268 } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) {
269 TupleMask coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask());
270 TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask());
271 IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo()
272 .getPosetComparator();
273 return new DefaultProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices()),
274 recipe.isDeleteRederiveEvaluation(), coreMask, posetMask, posetComparator);
275 } else {
276 return new DefaultProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices()),
277 recipe.isDeleteRederiveEvaluation());
278 }
279 }
280
281 private Supplier instantiateNode(ReteContainer reteContainer, UniquenessEnforcerRecipe recipe) {
282 if (reteContainer.isTimelyEvaluation()) {
283 return new TimelyUniquenessEnforcerNode(reteContainer, recipe.getArity());
284 } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) {
285 TupleMask coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask());
286 TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask());
287 IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo()
288 .getPosetComparator();
289 return new UniquenessEnforcerNode(reteContainer, recipe.getArity(), recipe.isDeleteRederiveEvaluation(),
290 coreMask, posetMask, posetComparator);
291 } else {
292 return new UniquenessEnforcerNode(reteContainer, recipe.getArity(), recipe.isDeleteRederiveEvaluation());
293 }
294 }
295
296 private Supplier instantiateNode(ReteContainer reteContainer, ConstantRecipe recipe) {
297 final List<Object> constantValues = recipe.getConstantValues();
298 final Object[] constantArray = constantValues.toArray(new Object[constantValues.size()]);
299 return new ConstantNode(reteContainer, Tuples.flatTupleOf(constantArray));
300 }
301
302 private Supplier instantiateNode(ReteContainer reteContainer, DiscriminatorBucketRecipe recipe) {
303 return new DiscriminatorBucketNode(reteContainer, recipe.getBucketKey());
304 }
305
306 private Supplier instantiateNode(ReteContainer reteContainer, DiscriminatorDispatcherRecipe recipe) {
307 return new DiscriminatorDispatcherNode(reteContainer, recipe.getDiscriminationColumnIndex());
308 }
309
310 private Supplier instantiateNode(ReteContainer reteContainer, TrimmerRecipe recipe) {
311 return new TrimmerNode(reteContainer, toMask(recipe.getMask()));
312 }
313
314 private Supplier instantiateNode(ReteContainer reteContainer, InequalityFilterRecipe recipe) {
315 Tunnel result = new InequalityFilterNode(reteContainer, recipe.getSubject(),
316 TupleMask.fromSelectedIndices(recipe.getParent().getArity(), recipe.getInequals()));
317 return result;
318 }
319
320 private Supplier instantiateNode(ReteContainer reteContainer, EqualityFilterRecipe recipe) {
321 final int[] equalIndices = TupleMask.integersToIntArray(recipe.getIndices());
322 return new EqualityFilterNode(reteContainer, equalIndices);
323 }
324
325 private Supplier instantiateNode(ReteContainer reteContainer, AntiJoinRecipe recipe) {
326 return new ExistenceNode(reteContainer, true);
327 }
328
329 private Supplier instantiateNode(ReteContainer reteContainer, SemiJoinRecipe recipe) {
330 return new ExistenceNode(reteContainer, false);
331 }
332
333 private Supplier instantiateNode(ReteContainer reteContainer, JoinRecipe recipe) {
334 return new JoinNode(reteContainer, toMask(recipe.getRightParentComplementaryMask()));
335 }
336
337 // HELPERS
338
339 private IExpressionEvaluator toIExpressionEvaluator(ExpressionDefinition expressionDefinition) {
340 final Object evaluator = expressionDefinition.getEvaluator();
341 if (evaluator instanceof IExpressionEvaluator) {
342 return (IExpressionEvaluator) evaluator;
343 }
344 throw new IllegalArgumentException("No runtime support for expression evaluator: " + evaluator);
345 }
346
347 private IRelationEvaluator toIRelationEvaluator(ExpressionDefinition expressionDefinition) {
348 final Object evaluator = expressionDefinition.getEvaluator();
349 if (evaluator instanceof IRelationEvaluator) {
350 return (IRelationEvaluator) evaluator;
351 }
352 throw new IllegalArgumentException("No runtime support for relation evaluator: " + evaluator);
353 }
354
355 private Map<String, Integer> toStringIndexMap(final EMap<String, Integer> mappedIndices) {
356 final HashMap<String, Integer> result = new HashMap<String, Integer>();
357 for (java.util.Map.Entry<String, Integer> entry : mappedIndices) {
358 result.put(entry.getKey(), entry.getValue());
359 }
360 return result;
361 }
362
363 /** Mask can be null */
364 private TupleMask toMaskOrNull(Mask mask) {
365 if (mask == null)
366 return null;
367 else
368 return toMask(mask);
369 }
370
371 /** Mask is non-null. */
372 private TupleMask toMask(Mask mask) {
373 return TupleMask.fromSelectedIndices(mask.getSourceArity(), mask.getSourceIndices());
374 }
375
376}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java
new file mode 100644
index 00000000..9121fc44
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java
@@ -0,0 +1,346 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext;
13import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
14import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
15import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
16import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
17import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
18import tools.refinery.viatra.runtime.rete.construction.plancompiler.CompilerHelper;
19import tools.refinery.viatra.runtime.rete.index.Indexer;
20import tools.refinery.viatra.runtime.rete.index.OnetimeIndexer;
21import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
22import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand;
23import tools.refinery.viatra.runtime.rete.recipes.*;
24import tools.refinery.viatra.runtime.rete.recipes.helper.RecipeRecognizer;
25import tools.refinery.viatra.runtime.rete.recipes.helper.RecipesHelper;
26import tools.refinery.viatra.runtime.rete.remote.Address;
27import tools.refinery.viatra.runtime.rete.remote.RemoteReceiver;
28import tools.refinery.viatra.runtime.rete.remote.RemoteSupplier;
29import tools.refinery.viatra.runtime.rete.traceability.ActiveNodeConflictTrace;
30import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo;
31import tools.refinery.viatra.runtime.rete.traceability.UserRequestTrace;
32import tools.refinery.viatra.runtime.rete.util.Options;
33
34import java.util.Map;
35import java.util.Set;
36
37/**
38 * Stores the internal parts of a rete network. Nodes are stored according to type and parameters.
39 *
40 * @author Gabor Bergmann
41 */
42public class NodeProvisioner {
43
44 // boolean activeStorage = true;
45
46 ReteContainer reteContainer;
47 NodeFactory nodeFactory;
48 ConnectionFactory connectionFactory;
49 InputConnector inputConnector;
50 IQueryRuntimeContext runtimeContext;
51
52 // TODO as recipe?
53 Map<Supplier, RemoteReceiver> remoteReceivers = CollectionsFactory.createMap();
54 Map<Address<? extends Supplier>, RemoteSupplier> remoteSuppliers = CollectionsFactory.createMap();
55
56 private RecipeRecognizer recognizer;
57
58 /**
59 * PRE: NodeFactory, ConnectionFactory must exist
60 *
61 * @param reteContainer
62 * the ReteNet whose interior is to be mapped.
63 */
64 public NodeProvisioner(ReteContainer reteContainer) {
65 super();
66 this.reteContainer = reteContainer;
67 this.nodeFactory = reteContainer.getNodeFactory();
68 this.connectionFactory = reteContainer.getConnectionFactory();
69 this.inputConnector = reteContainer.getInputConnectionFactory();
70 runtimeContext = reteContainer.getNetwork().getEngine().getRuntimeContext();
71 recognizer = new RecipeRecognizer(runtimeContext);
72 }
73
74 public synchronized Address<? extends Node> getOrCreateNodeByRecipe(RecipeTraceInfo recipeTrace) {
75 ReteNodeRecipe recipe = recipeTrace.getRecipe();
76 Address<? extends Node> result = getNodesByRecipe().get(recipe);
77 if (result != null) {
78 // NODE ALREADY CONSTRUCTED FOR RECIPE, only needs to add trace
79 if (getRecipeTraces().add(recipeTrace))
80 result.getNodeCache().assignTraceInfo(recipeTrace);
81 } else {
82 // No node for this recipe object - but equivalent recipes still
83 // reusable
84 ReteNodeRecipe canonicalRecipe = recognizer.canonicalizeRecipe(recipe);
85 if (canonicalRecipe != recipe) {
86 // FOUND EQUIVALENT RECIPE
87 result = getNodesByRecipe().get(canonicalRecipe);
88 if (result != null) {
89 // NODE ALREADY CONSTRUCTED FOR EQUIVALENT RECIPE
90 recipeTrace.shadowWithEquivalentRecipe(canonicalRecipe);
91 getNodesByRecipe().put(recipe, result);
92 if (getRecipeTraces().add(recipeTrace))
93 result.getNodeCache().assignTraceInfo(recipeTrace);
94 // Bug 491922: ensure that recipe shadowing propagates to
95 // parent traces
96 // note that if equivalentRecipes() becomes more
97 // sophisticated
98 // and considers recipes with different parents, this might
99 // have to be changed
100 ensureParents(recipeTrace);
101 } else {
102 // CONSTRUCTION IN PROGRESS FOR EQUIVALENT RECIPE
103 if (recipe instanceof IndexerRecipe) {
104 // this is allowed for indexers;
105 // go on with the construction, as the same indexer node
106 // will be obtained anyways
107 } else {
108 throw new IllegalStateException(
109 "This should not happen: " + "non-indexer nodes are are supposed to be constructed "
110 + "as soon as they are designated as canonical recipes");
111 }
112 }
113 }
114 if (result == null) {
115 // MUST INSTANTIATE NEW NODE FOR RECIPE
116 final Node freshNode = instantiateNodeForRecipe(recipeTrace, recipe);
117 result = reteContainer.makeAddress(freshNode);
118 }
119 }
120 return result;
121 }
122
123 private Set<RecipeTraceInfo> getRecipeTraces() {
124 return reteContainer.network.recipeTraces;
125 }
126
127 private Node instantiateNodeForRecipe(RecipeTraceInfo recipeTrace, final ReteNodeRecipe recipe) {
128 this.getRecipeTraces().add(recipeTrace);
129 if (recipe instanceof IndexerRecipe) {
130
131 // INSTANTIATE AND HOOK UP
132 // (cannot delay hooking up, because parent determines indexer
133 // implementation)
134 ensureParents(recipeTrace);
135 final ReteNodeRecipe parentRecipe = recipeTrace.getParentRecipeTraces().iterator().next().getRecipe();
136 final Indexer result = nodeFactory.createIndexer(reteContainer, (IndexerRecipe) recipe,
137 asSupplier(
138 (Address<? extends Supplier>) reteContainer.network.getExistingNodeByRecipe(parentRecipe)),
139 recipeTrace);
140
141 // REMEMBER
142 if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) {
143 getNodesByRecipe().put(recipe, reteContainer.makeAddress(result));
144 }
145
146 return result;
147 } else {
148
149 // INSTANTIATE
150 Node result = nodeFactory.createNode(reteContainer, recipe, recipeTrace);
151
152 // REMEMBER
153 if (Options.nodeSharingOption == Options.NodeSharingOption.ALL) {
154 getNodesByRecipe().put(recipe, reteContainer.makeAddress(result));
155 }
156
157 // HOOK UP
158 // (recursion-tolerant due to this delayed order of initialization)
159 if (recipe instanceof InputRecipe) {
160 inputConnector.connectInput((InputRecipe) recipe, result);
161 } else {
162 if (recipe instanceof InputFilterRecipe)
163 inputConnector.connectInputFilter((InputFilterRecipe) recipe, result);
164 ensureParents(recipeTrace);
165 connectionFactory.connectToParents(recipeTrace, result);
166 }
167 return result;
168 }
169 }
170
171 private Map<ReteNodeRecipe, Address<? extends Node>> getNodesByRecipe() {
172 return reteContainer.network.nodesByRecipe;
173 }
174
175 private void ensureParents(RecipeTraceInfo recipeTrace) {
176 for (RecipeTraceInfo parentTrace : recipeTrace.getParentRecipeTraces()) {
177 getOrCreateNodeByRecipe(parentTrace);
178 }
179 }
180
181 //// Remoting - TODO eliminate?
182
183 synchronized RemoteReceiver accessRemoteReceiver(Address<? extends Supplier> address) {
184 throw new UnsupportedOperationException("Multi-container Rete not supported yet");
185 // if (!reteContainer.isLocal(address))
186 // return
187 // address.getContainer().getProvisioner().accessRemoteReceiver(address);
188 // Supplier localSupplier = reteContainer.resolveLocal(address);
189 // RemoteReceiver result = remoteReceivers.get(localSupplier);
190 // if (result == null) {
191 // result = new RemoteReceiver(reteContainer);
192 // reteContainer.connect(localSupplier, result); // stateless node, no
193 // // synch required
194 //
195 // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER)
196 // remoteReceivers.put(localSupplier, result);
197 // }
198 // return result;
199 }
200
201 /**
202 * @pre: address is NOT local
203 */
204 synchronized RemoteSupplier accessRemoteSupplier(Address<? extends Supplier> address) {
205 throw new UnsupportedOperationException("Multi-container Rete not supported yet");
206 // RemoteSupplier result = remoteSuppliers.get(address);
207 // if (result == null) {
208 // result = new RemoteSupplier(reteContainer,
209 // address.getContainer().getProvisioner()
210 // .accessRemoteReceiver(address));
211 // // network.connectAndSynchronize(supplier, result);
212 //
213 // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER)
214 // remoteSuppliers.put(address, result);
215 // }
216 // return result;
217 }
218
219 /**
220 * The powerful method for accessing any (supplier) Address as a local supplier.
221 */
222 public Supplier asSupplier(Address<? extends Supplier> address) {
223 if (!reteContainer.isLocal(address))
224 return accessRemoteSupplier(address);
225 else
226 return reteContainer.resolveLocal(address);
227 }
228
229 /** the composite key tuple is formed as (RecipeTraceInfo, TupleMask) */
230 private Map<Tuple, UserRequestTrace> projectionIndexerUserRequests = CollectionsFactory.createMap();
231
232 // local version
233 // TODO remove?
234 public synchronized ProjectionIndexer accessProjectionIndexer(RecipeTraceInfo productionTrace, TupleMask mask) {
235 Tuple tableKey = Tuples.staticArityFlatTupleOf(productionTrace, mask);
236 UserRequestTrace indexerTrace = projectionIndexerUserRequests.computeIfAbsent(tableKey, k -> {
237 final ProjectionIndexerRecipe projectionIndexerRecipe = projectionIndexerRecipe(
238 productionTrace, mask);
239 return new UserRequestTrace(projectionIndexerRecipe, productionTrace);
240 });
241 final Address<? extends Node> address = getOrCreateNodeByRecipe(indexerTrace);
242 return (ProjectionIndexer) reteContainer.resolveLocal(address);
243 }
244
245 // local version
246 public synchronized ProjectionIndexer accessProjectionIndexerOnetime(RecipeTraceInfo supplierTrace,
247 TupleMask mask) {
248 if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER)
249 return accessProjectionIndexer(supplierTrace, mask);
250
251 final Address<? extends Node> supplierAddress = getOrCreateNodeByRecipe(supplierTrace);
252 Supplier supplier = (Supplier) reteContainer.resolveLocal(supplierAddress);
253
254 OnetimeIndexer result = new OnetimeIndexer(reteContainer, mask);
255 reteContainer.getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, result, reteContainer));
256
257 return result;
258 }
259
260 // local, read-only version
261 public synchronized ProjectionIndexer peekProjectionIndexer(RecipeTraceInfo supplierTrace, TupleMask mask) {
262 final Address<? extends Node> address = getNodesByRecipe().get(projectionIndexerRecipe(supplierTrace, mask));
263 return address == null ? null : (ProjectionIndexer) reteContainer.resolveLocal(address);
264 }
265
266 private ProjectionIndexerRecipe projectionIndexerRecipe(
267 RecipeTraceInfo parentTrace, TupleMask mask) {
268 final ReteNodeRecipe parentRecipe = parentTrace.getRecipe();
269 Tuple tableKey = Tuples.staticArityFlatTupleOf(parentRecipe, mask);
270 ProjectionIndexerRecipe projectionIndexerRecipe = resultSeedRecipes.computeIfAbsent(tableKey, k ->
271 RecipesHelper.projectionIndexerRecipe(parentRecipe, CompilerHelper.toRecipeMask(mask))
272 );
273 return projectionIndexerRecipe;
274 }
275
276 /** the composite key tuple is formed as (ReteNodeRecipe, TupleMask) */
277 private Map<Tuple, ProjectionIndexerRecipe> resultSeedRecipes = CollectionsFactory.createMap();
278
279 // public synchronized Address<? extends Supplier>
280 // accessValueBinderFilterNode(
281 // Address<? extends Supplier> supplierAddress, int bindingIndex, Object
282 // bindingValue) {
283 // Supplier supplier = asSupplier(supplierAddress);
284 // Object[] paramsArray = { supplier.getNodeId(), bindingIndex, bindingValue
285 // };
286 // Tuple params = new FlatTuple(paramsArray);
287 // ValueBinderFilterNode result = valueBinderFilters.get(params);
288 // if (result == null) {
289 // result = new ValueBinderFilterNode(reteContainer, bindingIndex,
290 // bindingValue);
291 // reteContainer.connect(supplier, result); // stateless node, no synch
292 // // required
293 //
294 // if (Options.nodeSharingOption == Options.NodeSharingOption.ALL)
295 // valueBinderFilters.put(params, result);
296 // }
297 // return reteContainer.makeAddress(result);
298 // }
299
300 /**
301 * Returns a copy of the given indexer that is an active node by itself (created if does not exist). (Convention:
302 * attached with same mask to a transparent node that is attached to parent node.) Node is created if it does not
303 * exist yet.
304 *
305 * @return an identical but active indexer
306 */
307 // TODO rethink traceability
308 RecipeTraceInfo accessActiveIndexer(RecipeTraceInfo inactiveIndexerRecipeTrace) {
309 final RecipeTraceInfo parentRecipeTrace = inactiveIndexerRecipeTrace.getParentRecipeTraces().iterator().next();
310 final ProjectionIndexerRecipe inactiveIndexerRecipe = (ProjectionIndexerRecipe) inactiveIndexerRecipeTrace
311 .getRecipe();
312
313 final TransparentRecipe transparentRecipe = RecipesFactory.eINSTANCE.createTransparentRecipe();
314 transparentRecipe.setParent(parentRecipeTrace.getRecipe());
315 final ActiveNodeConflictTrace transparentRecipeTrace = new ActiveNodeConflictTrace(transparentRecipe,
316 parentRecipeTrace, inactiveIndexerRecipeTrace);
317
318 final ProjectionIndexerRecipe activeIndexerRecipe = RecipesFactory.eINSTANCE
319 .createProjectionIndexerRecipe();
320 activeIndexerRecipe.setParent(transparentRecipe);
321 activeIndexerRecipe.setMask(inactiveIndexerRecipe.getMask());
322 final ActiveNodeConflictTrace activeIndexerRecipeTrace = new ActiveNodeConflictTrace(activeIndexerRecipe,
323 transparentRecipeTrace, inactiveIndexerRecipeTrace);
324
325 return activeIndexerRecipeTrace;
326 }
327
328 // /**
329 // * @param parent
330 // * @return
331 // */
332 // private TransparentNode accessTransparentNodeInternal(Supplier parent) {
333 // nodeFactory.
334 // return null;
335 // }
336
337 // public synchronized void registerSpecializedProjectionIndexer(Node node,
338 // ProjectionIndexer indexer) {
339 // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) {
340 // Object[] paramsArray = { node.getNodeId(), indexer.getMask() };
341 // Tuple params = new FlatTuple(paramsArray);
342 // projectionIndexers.put(params, indexer);
343 // }
344 // }
345
346}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java
new file mode 100644
index 00000000..1eaa18e7
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java
@@ -0,0 +1,39 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network;
10
11import tools.refinery.viatra.runtime.matchers.context.IPosetComparator;
12import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
13import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
14import tools.refinery.viatra.runtime.matchers.util.Direction;
15
16/**
17 * @author Tamas Szabo
18 * @since 2.0
19 */
20public interface PosetAwareReceiver extends Receiver {
21
22 public TupleMask getCoreMask();
23
24 public TupleMask getPosetMask();
25
26 public IPosetComparator getPosetComparator();
27
28 /**
29 * Updates the receiver with a newly found or lost partial matching also providing information
30 * whether the update is a monotone change or not.
31 *
32 * @param direction the direction of the update
33 * @param update the update tuple
34 * @param monotone true if the update is monotone, false otherwise
35 * @since 2.4
36 */
37 public void updateWithPosetInfo(Direction direction, Tuple update, boolean monotone);
38
39}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java
new file mode 100644
index 00000000..211194c0
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java
@@ -0,0 +1,28 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.Map;
13
14import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
15
16/**
17 * Interface intended for nodes containing complete matches.
18 *
19 * @author Gabor Bergmann
20 */
21public interface ProductionNode extends Tunnel, Iterable<Tuple> {
22
23 /**
24 * @return the position mapping of this particular pattern that maps members of the tuple type to their positions
25 */
26 Map<String, Integer> getPosMapping();
27
28}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java
new file mode 100644
index 00000000..3dc9aad7
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java
@@ -0,0 +1,85 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.Collection;
13import java.util.Map;
14import java.util.Map.Entry;
15
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.util.Direction;
18import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
19import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
20
21/**
22 * ALL METHODS: FOR INTERNAL USE ONLY; ONLY INVOKE FROM {@link ReteContainer}
23 *
24 * @author Gabor Bergmann
25 * @noimplement This interface is not intended to be implemented by external clients.
26 */
27public interface Receiver extends Node {
28
29 /**
30 * Updates the receiver with a newly found or lost partial matching.
31 *
32 * @since 2.4
33 */
34 public void update(final Direction direction, final Tuple updateElement, final Timestamp timestamp);
35
36 /**
37 * Updates the receiver in batch style with a collection of updates. The input collection consists of pairs in the
38 * form (t, c) where t is an update tuple and c is the count. The count can also be negative, and it specifies how
39 * many times the tuple t gets deleted or inserted. The default implementation of this method simply calls
40 * {@link #update(Direction, Tuple, Timestamp)} individually for all updates.
41 *
42 * @since 2.8
43 */
44 public default void batchUpdate(final Collection<Map.Entry<Tuple, Integer>> updates, final Timestamp timestamp) {
45 for (final Entry<Tuple, Integer> entry : updates) {
46 int count = entry.getValue();
47
48 Direction direction;
49 if (count < 0) {
50 direction = Direction.DELETE;
51 count = -count;
52 } else {
53 direction = Direction.INSERT;
54 }
55
56 for (int i = 0; i < count; i++) {
57 update(direction, entry.getKey(), timestamp);
58 }
59 }
60 }
61
62 /**
63 * Returns the {@link Mailbox} of this receiver.
64 *
65 * @return the mailbox
66 * @since 2.0
67 */
68 public Mailbox getMailbox();
69
70 /**
71 * appends a parent that will continuously send insert and revoke updates to this supplier
72 */
73 void appendParent(final Supplier supplier);
74
75 /**
76 * removes a parent
77 */
78 void removeParent(final Supplier supplier);
79
80 /**
81 * access active parent
82 */
83 Collection<Supplier> getParents();
84
85}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java
new file mode 100644
index 00000000..cae78d37
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java
@@ -0,0 +1,34 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network;
10
11/**
12 * A rederivable node can potentially re-derive tuples after the Rete network has finished the delivery of messages.
13 *
14 * @author Tamas Szabo
15 * @since 1.6
16 */
17public interface RederivableNode extends Node, IGroupable {
18
19 /**
20 * The method is called by the {@link ReteContainer} to re-derive tuples after the normal messages have been
21 * delivered and consumed. The re-derivation process may trigger the creation and delivery of further messages
22 * and further re-derivation rounds.
23 */
24 public void rederiveOne();
25
26 /**
27 * Returns true if this node actually runs in DRed mode (not necessarily).
28 *
29 * @return true if the node is operating in DRed mode
30 * @since 2.0
31 */
32 public boolean isInDRedMode();
33
34}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java
new file mode 100644
index 00000000..09bff29e
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java
@@ -0,0 +1,14 @@
1/*
2 * SPDX-FileCopyrightText: 2023 The Refinery Authors <https://refinery.tools/>
3 *
4 * SPDX-License-Identifier: EPL-2.0
5 */
6package tools.refinery.viatra.runtime.rete.network;
7
8import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
9
10import java.util.Collection;
11
12public interface ReinitializedNode {
13 void reinitializeWith(Collection<Tuple> tuples);
14}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
new file mode 100644
index 00000000..79e0526d
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
@@ -0,0 +1,729 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10
11package tools.refinery.viatra.runtime.rete.network;
12
13import org.apache.log4j.Logger;
14import tools.refinery.viatra.runtime.CancellationToken;
15import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext;
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.util.Clearable;
18import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
19import tools.refinery.viatra.runtime.matchers.util.Direction;
20import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
21import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
22import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
23import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
25import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
26import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker;
27import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker;
28import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand;
29import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand;
30import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand;
31import tools.refinery.viatra.runtime.rete.remote.Address;
32import tools.refinery.viatra.runtime.rete.single.SingleInputNode;
33import tools.refinery.viatra.runtime.rete.single.TrimmerNode;
34import tools.refinery.viatra.runtime.rete.util.Options;
35
36import java.util.*;
37import java.util.function.Function;
38
39/**
40 * @author Gabor Bergmann
41 *
42 * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue
43 * @since 2.2
44 */
45public final class ReteContainer {
46
47 protected Thread consumerThread = null;
48 protected boolean killed = false;
49
50 protected Network network;
51
52 protected LinkedList<Clearable> clearables;
53 protected Map<Long, Node> nodesById;
54 protected long nextId = 0;
55
56 protected ConnectionFactory connectionFactory;
57 protected NodeProvisioner nodeProvisioner;
58
59 protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>();
60 protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>();
61 protected Object externalMessageLock = new Object();
62 protected Long clock = 1L; // even: steady state, odd: active queue; access
63 // ONLY with messageQueue locked!
64 protected Map<ReteContainer, Long> terminationCriteria = null;
65 protected final Logger logger;
66 protected final CommunicationTracker tracker;
67
68 protected final IQueryBackendContext backendContext;
69
70 protected Set<DelayedCommand> delayedCommandQueue;
71 protected Set<DelayedCommand> delayedCommandBuffer;
72 protected boolean executingDelayedCommands;
73
74 protected final TimelyConfiguration timelyConfiguration;
75
76 private final CancellationToken cancellationToken;
77
78 /**
79 * @param threaded
80 * false if operating in a single-threaded environment
81 */
82 public ReteContainer(Network network, boolean threaded) {
83 super();
84 this.network = network;
85 this.backendContext = network.getEngine().getBackendContext();
86 this.timelyConfiguration = network.getEngine().getTimelyConfiguration();
87 cancellationToken = backendContext.getRuntimeContext().getCancellationToken();
88
89 this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>();
90 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
91 this.executingDelayedCommands = false;
92
93 if (this.isTimelyEvaluation()) {
94 this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration());
95 } else {
96 this.tracker = new TimelessCommunicationTracker();
97 }
98
99 this.nodesById = CollectionsFactory.createMap();
100 this.clearables = new LinkedList<Clearable>();
101 this.logger = network.getEngine().getLogger();
102
103 this.connectionFactory = new ConnectionFactory(this);
104 this.nodeProvisioner = new NodeProvisioner(this);
105
106 if (threaded) {
107 this.terminationCriteria = CollectionsFactory.createMap();
108 this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) {
109 @Override
110 public void run() {
111 messageConsumptionCycle();
112 }
113 };
114 this.consumerThread.start();
115 }
116 }
117
118 /**
119 * @since 2.4
120 */
121 public boolean isTimelyEvaluation() {
122 return this.timelyConfiguration != null;
123 }
124
125 /**
126 * @since 2.4
127 */
128 public TimelyConfiguration getTimelyConfiguration() {
129 return this.timelyConfiguration;
130 }
131
132 /**
133 * @since 1.6
134 * @return the communication graph of the nodes, incl. message scheduling
135 */
136 public CommunicationTracker getCommunicationTracker() {
137 return tracker;
138 }
139
140 /**
141 * Stops this container. To be called by Network.kill()
142 */
143 public void kill() {
144 killed = true;
145 if (consumerThread != null)
146 consumerThread.interrupt();
147 }
148
149 /**
150 * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption
151 * is that this container is the home of the receiver, but it is not strictly necessary.
152 *
153 * @param synchronise
154 * indicates whether the receiver should be synchronised to the current contents of the supplier
155 */
156 public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
157 boolean synchronise) {
158 if (!isLocal(receiver))
159 receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise);
160 else {
161 Receiver child = resolveLocal(receiver);
162 connectRemoteSupplier(supplier, child, synchronise);
163 }
164 }
165
166 /**
167 * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is
168 * that this container is the home of the receiver, but it is not strictly necessary.
169 *
170 * @param desynchronise
171 * indicates whether the current contents of the supplier should be subtracted from the receiver
172 */
173 public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
174 boolean desynchronise) {
175 if (!isLocal(receiver))
176 receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise);
177 else {
178 Receiver child = resolveLocal(receiver);
179 disconnectRemoteSupplier(supplier, child, desynchronise);
180 }
181 }
182
183 /**
184 * Establishes connection between a remote supplier and a local receiver node.
185 *
186 * @param synchronise
187 * indicates whether the receiver should be synchronised to the current contents of the supplier
188 */
189 public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) {
190 Supplier parent = nodeProvisioner.asSupplier(supplier);
191 if (synchronise)
192 connectAndSynchronize(parent, receiver);
193 else
194 connect(parent, receiver);
195 }
196
197 /**
198 * Severs connection between a remote supplier and a local receiver node.
199 *
200 * @param desynchronise
201 * indicates whether the current contents of the supplier should be subtracted from the receiver
202 */
203 public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver,
204 boolean desynchronise) {
205 Supplier parent = nodeProvisioner.asSupplier(supplier);
206 if (desynchronise)
207 disconnectAndDesynchronize(parent, receiver);
208 else
209 disconnect(parent, receiver);
210 }
211
212 /**
213 * Connects a receiver to a supplier
214 */
215 public void connect(Supplier supplier, Receiver receiver) {
216 supplier.appendChild(receiver);
217 receiver.appendParent(supplier);
218 tracker.registerDependency(supplier, receiver);
219 }
220
221 /**
222 * Disconnects a receiver from a supplier
223 */
224 public void disconnect(Supplier supplier, Receiver receiver) {
225 supplier.removeChild(receiver);
226 receiver.removeParent(supplier);
227 tracker.unregisterDependency(supplier, receiver);
228 }
229
230 /**
231 * @since 2.3
232 */
233 public boolean isExecutingDelayedCommands() {
234 return this.executingDelayedCommands;
235 }
236
237 /**
238 * @since 2.3
239 */
240 public Set<DelayedCommand> getDelayedCommandQueue() {
241 if (this.executingDelayedCommands) {
242 return this.delayedCommandBuffer;
243 } else {
244 return this.delayedCommandQueue;
245 }
246 }
247
248 /**
249 * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier
250 */
251 public void connectAndSynchronize(Supplier supplier, Receiver receiver) {
252 supplier.appendChild(receiver);
253 receiver.appendParent(supplier);
254 tracker.registerDependency(supplier, receiver);
255 getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this));
256 }
257
258 /**
259 * Disconnects a receiver from a supplier
260 */
261 public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) {
262 final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver);
263 supplier.removeChild(receiver);
264 receiver.removeParent(supplier);
265 tracker.unregisterDependency(supplier, receiver);
266 getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC));
267 }
268
269 /**
270 * @since 2.3
271 */
272 public void executeDelayedCommands() {
273 if (!this.delayedCommandQueue.isEmpty()) {
274 flushUpdates();
275 this.executingDelayedCommands = true;
276 for (final DelayedCommand command : this.delayedCommandQueue) {
277 command.run();
278 }
279 this.delayedCommandQueue = this.delayedCommandBuffer;
280 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
281 flushUpdates();
282 this.executingDelayedCommands = false;
283 }
284 }
285
286 /**
287 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
288 * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre:
289 * address.container == this, e.g. address MUST be local
290 *
291 * @return the value of the container's clock at the time when the message was accepted into the local message queue
292 */
293 long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) {
294 long timestamp;
295 Receiver receiver = resolveLocal(address);
296 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
297 synchronized (externalMessageLock) {
298 externalMessageQueue.add(message);
299 timestamp = clock;
300 externalMessageLock.notifyAll();
301 }
302
303 return timestamp;
304
305 }
306
307 /**
308 * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial
309 * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other
310 * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty!
311 *
312 * @return the value of the container's clock at the time when the message was accepted into the local message queue
313 */
314 long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction,
315 Collection<Tuple> updateElements) {
316
317 long timestamp;
318 Receiver receiver = resolveLocal(address);
319 // UpdateMessage message = new UpdateMessage(receiver, direction,
320 // updateElement);
321 synchronized (externalMessageLock) {
322 for (Tuple ps : updateElements)
323 externalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
324 // messageQueue.add(new UpdateMessage(resolveLocal(address),
325 // direction, updateElement));
326 // this.sendUpdateInternal(resolveLocal(address), direction,
327 // updateElement);
328 timestamp = clock;
329 externalMessageLock.notifyAll();
330 }
331
332 return timestamp;
333 }
334
335 /**
336 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
337 * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any
338 * other way.
339 */
340 void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
341 Tuple updateElement) {
342 Receiver receiver = resolveLocal(address);
343 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
344 internalMessageQueue.add(message);
345 }
346
347 /**
348 * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The
349 * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT
350 * use in any other way.
351 *
352 * @pre: address.container == this, e.g. address MUST be local
353 */
354 void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
355 Collection<Tuple> updateElements) {
356 Receiver receiver = resolveLocal(address);
357 for (Tuple ps : updateElements)
358 internalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
359 }
360
361 /**
362 * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to
363 * be called by RemoteReceivers, DO NOT use in any other way.
364 *
365 * @since 2.4
366 */
367 public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction,
368 Tuple updateElement) {
369 ReteContainer otherContainer = address.getContainer();
370 long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement);
371 // Long criterion = terminationCriteria.get(otherContainer);
372 // if (criterion==null || otherClock > criterion)
373 terminationCriteria.put(otherContainer, otherClock);
374 }
375
376 /**
377 * Finalises all update sequences and returns. To be called from user threads (e.g. network construction).
378 */
379 public void flushUpdates() {
380 network.waitForReteTermination();
381 // synchronized (messageQueue)
382 // {
383 // while (!messageQueue.isEmpty())
384 // {
385 // try {
386 // UpdateMessage message = messageQueue.take();
387 // message.receiver.update(message.direction, message.updateElement);
388 // } catch (InterruptedException e) {}
389 // }
390 // }
391 }
392
393 /**
394 * Retrieves a safe copy of the contents of a supplier.
395 *
396 * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set.
397 *
398 * @param flush if true, a flush is performed before pulling the contents
399 * @since 2.3
400 */
401 public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) {
402 if (flush) {
403 flushUpdates();
404 }
405 final Collection<Tuple> collector = new ArrayList<Tuple>();
406 supplier.pullInto(collector, flush);
407 return collector;
408 }
409
410 /**
411 * @since 2.4
412 */
413 public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) {
414 if (flush) {
415 flushUpdates();
416 }
417 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
418 supplier.pullIntoWithTimeline(collector, flush);
419 return collector;
420 }
421
422 /**
423 * Retrieves the contents of a SingleInputNode's parentage.
424 *
425 * @since 2.3
426 */
427 public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) {
428 if (flush) {
429 flushUpdates();
430 }
431 final Collection<Tuple> collector = new LinkedList<Tuple>();
432 supplier.propagatePullInto(collector, flush);
433 return collector;
434 }
435
436 /**
437 * Retrieves the timestamp-aware contents of a SingleInputNode's parentage.
438 *
439 * @since 2.3
440 */
441 public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier,
442 final boolean flush) {
443 if (flush) {
444 flushUpdates();
445 }
446 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
447 supplier.propagatePullIntoWithTimestamp(collector, flush);
448 return collector;
449 }
450
451 /**
452 * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the
453 * supplier, but it is not strictly necessary.
454 *
455 * @param supplier
456 * the address of the supplier to be pulled.
457 * @since 2.3
458 */
459 public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) {
460 if (!isLocal(supplier))
461 return supplier.getContainer().remotePull(supplier, flush);
462 return pullContents(resolveLocal(supplier), flush);
463 }
464
465 /**
466 * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to
467 * a remote or local caller.
468 */
469 public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) {
470 if (!isLocal(production))
471 return production.getContainer().remotePosMapping(production);
472 return resolveLocal(production).getPosMapping();
473 }
474
475 /**
476 * Continually consumes update messages. Should be run on a dedicated thread.
477 */
478 void messageConsumptionCycle() {
479 while (!killed) // deliver messages on and on and on....
480 {
481 long incrementedClock = 0;
482 UpdateMessage message = null;
483
484 if (!internalMessageQueue.isEmpty()) // take internal messages first
485 message = internalMessageQueue.removeFirst();
486 else
487 // no internal message, take an incoming message
488 synchronized (externalMessageLock) { // no sleeping allowed,
489 // because external
490 // queue is locked for
491 // precise clocking of
492 // termination point!
493 if (!externalMessageQueue.isEmpty()) { // if external queue
494 // is non-empty,
495 // retrieve the next
496 // message instantly
497 message = takeExternalMessage();
498 } else { // if external queue is found empty (and this is
499 // the first time in a row)
500 incrementedClock = ++clock; // local termination point
501 // synchronized(clock){incrementedClock = ++clock;}
502 }
503 }
504
505 if (message == null) // both queues were empty
506 {
507 localUpdateTermination(incrementedClock); // report local
508 // termination point
509 while (message == null) // wait for a message while external
510 // queue is still empty
511 {
512 synchronized (externalMessageLock) {
513 while (externalMessageQueue.isEmpty()) {
514 try {
515 externalMessageLock.wait();
516 } catch (InterruptedException e) {
517 if (killed)
518 return;
519 }
520 }
521 message = takeExternalMessage();
522 }
523
524 }
525 }
526
527 // now we have a message to deliver
528 // NOTE: this method is not compatible with differential dataflow
529 message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO);
530 }
531 }
532
533 /**
534 * @since 1.6
535 */
536 public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0,
537 Math.min(30, input.toString().length()));
538
539 /**
540 * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker.
541 *
542 * @since 1.6
543 */
544 public void deliverMessagesSingleThreaded() {
545 if (!backendContext.areUpdatesDelayed()) {
546 if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) {
547 // known unreachable; enable for debugging only
548
549 CommunicationGroup lastGroup = null;
550 Set<CommunicationGroup> seenInThisCycle = new HashSet<>();
551
552 while (!tracker.isEmpty()) {
553 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
554
555 /**
556 * The current group does not violate the communication schema iff (1) it was not seen before OR (2)
557 * the last one that was seen is exactly the same as the current one this can happen if the group
558 * was added back because of in-group message passing
559 */
560 boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group);
561
562 if (!okGroup) {
563 logger.error(
564 "[INTERNAL ERROR] Violation of communication schema! The communication component with representative "
565 + group.getRepresentative() + " has already been processed!");
566 }
567
568 group.deliverMessages();
569
570 lastGroup = group;
571 }
572
573 } else {
574 while (!tracker.isEmpty()) {
575 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
576 group.deliverMessages();
577 }
578 }
579 }
580 }
581
582 private void localUpdateTermination(long incrementedClock) {
583 network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria);
584 terminationCriteria.clear();
585
586 // synchronized(clock){++clock;} // +1 incrementing for parity and easy
587 // comparison
588 }
589
590 // @pre: externalMessageQueue synchronized && nonempty
591 private UpdateMessage takeExternalMessage() {
592 UpdateMessage message = externalMessageQueue.removeFirst();
593 if (!externalMessageQueue.isEmpty()) { // copy the whole queue over
594 // for speedup
595 Deque<UpdateMessage> temp = externalMessageQueue;
596 externalMessageQueue = internalMessageQueue;
597 internalMessageQueue = temp;
598 }
599 return message;
600 }
601
602 /**
603 * Provides an external address for the selected node.
604 *
605 * @pre node belongs to this container.
606 */
607 public <N extends Node> Address<N> makeAddress(N node) {
608 return new Address<N>(node);
609 }
610
611 /**
612 * Checks whether a certain address points to a node at this container.
613 */
614 public boolean isLocal(Address<? extends Node> address) {
615 return address.getContainer() == this;
616 }
617
618 /**
619 * Returns an addressed node at this container.
620 *
621 * @pre: address.container == this, e.g. address MUST be local
622 * @throws IllegalArgumentException
623 * if address is non-local
624 */
625 @SuppressWarnings("unchecked")
626 public <N extends Node> N resolveLocal(Address<N> address) {
627 if (this != address.getContainer())
628 throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this));
629
630 N cached = address.getNodeCache();
631 if (cached != null)
632 return cached;
633 else {
634 N node = (N) nodesById.get(address.getNodeId());
635 address.setNodeCache(node);
636 return node;
637 }
638 }
639
640 /**
641 * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its
642 * constructor.
643 *
644 * @return the unique nodeId issued to the node.
645 */
646 public long registerNode(Node n) {
647 long id = nextId++;
648 nodesById.put(id, n);
649 return id;
650 }
651
652 /**
653 * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or
654 * otherwise referenced.
655 */
656 public void unregisterNode(Node n) {
657 nodesById.remove(n.getNodeId());
658 }
659
660 /**
661 * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node.
662 */
663 public void registerClearable(Clearable c) {
664 clearables.addFirst(c);
665 }
666
667 /**
668 * Unregisters a pattern memory from the rete network.
669 */
670 public void unregisterClearable(Clearable c) {
671 clearables.remove(c);
672 }
673
674 /**
675 * Clears all memory contents in the network. Reverts to initial state.
676 */
677 public void clearAll() {
678 for (Clearable c : clearables) {
679 c.clear();
680 }
681 }
682
683 public NodeFactory getNodeFactory() {
684 return network.getNodeFactory();
685 }
686
687 public ConnectionFactory getConnectionFactory() {
688 return connectionFactory;
689 }
690
691 public NodeProvisioner getProvisioner() {
692 return nodeProvisioner;
693 }
694
695 public Network getNetwork() {
696 return network;
697 }
698
699 @Override
700 public String toString() {
701 StringBuilder sb = new StringBuilder();
702 String separator = System.getProperty("line.separator");
703 sb.append(super.toString() + "[[[" + separator);
704 java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet());
705 java.util.Collections.sort(keys);
706 for (Long key : keys) {
707 sb.append(key + " -> " + nodesById.get(key) + separator);
708 }
709 sb.append("]]] of " + network);
710 return sb.toString();
711 }
712
713 /**
714 * Access all the Rete nodes inside this container.
715 *
716 * @return the collection of {@link Node} instances
717 */
718 public Collection<Node> getAllNodes() {
719 return nodesById.values();
720 }
721
722 public InputConnector getInputConnectionFactory() {
723 return network.getInputConnector();
724 }
725
726 public void checkCancelled() {
727 cancellationToken.checkCancelled();
728 }
729}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java
new file mode 100644
index 00000000..7dc7c4bc
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java
@@ -0,0 +1,123 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10
11package tools.refinery.viatra.runtime.rete.network;
12
13import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
14import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
15import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
16import tools.refinery.viatra.runtime.matchers.util.Direction;
17import tools.refinery.viatra.runtime.rete.index.GenericProjectionIndexer;
18import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
19import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
20import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
21import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
22
23import java.util.Collection;
24import java.util.HashSet;
25import java.util.List;
26import java.util.Set;
27
28/**
29 * Base implementation for a supplier node.
30 *
31 * @author Gabor Bergmann
32 *
33 */
34public abstract class StandardNode extends BaseNode implements Supplier, NetworkStructureChangeSensitiveNode {
35 protected final List<Receiver> children = CollectionsFactory.createObserverList();
36 /**
37 * @since 2.2
38 */
39 protected final List<Mailbox> childMailboxes = CollectionsFactory.createObserverList();
40
41 public StandardNode(final ReteContainer reteContainer) {
42 super(reteContainer);
43 }
44
45 /**
46 * @since 2.4
47 */
48 protected void propagateUpdate(final Direction direction, final Tuple updateElement, final Timestamp timestamp) {
49 reteContainer.checkCancelled();
50 for (final Mailbox childMailbox : childMailboxes) {
51 childMailbox.postMessage(direction, updateElement, timestamp);
52 }
53 }
54
55 @Override
56 public void appendChild(final Receiver receiver) {
57 children.add(receiver);
58 childMailboxes.add(this.getCommunicationTracker().proxifyMailbox(this, receiver.getMailbox()));
59 }
60
61 @Override
62 public void removeChild(final Receiver receiver) {
63 children.remove(receiver);
64 Mailbox mailboxToRemove = null;
65 for (final Mailbox mailbox : childMailboxes) {
66 if (mailbox.getReceiver() == receiver) {
67 mailboxToRemove = mailbox;
68 break;
69 }
70 }
71 assert mailboxToRemove != null;
72 childMailboxes.remove(mailboxToRemove);
73 }
74
75 @Override
76 public void networkStructureChanged() {
77 childMailboxes.clear();
78 for (final Receiver receiver : children) {
79 childMailboxes.add(this.getCommunicationTracker().proxifyMailbox(this, receiver.getMailbox()));
80 }
81 }
82
83 @Override
84 public Collection<Receiver> getReceivers() {
85 return children;
86 }
87
88 /**
89 * @since 2.2
90 */
91 public Collection<Mailbox> getChildMailboxes() {
92 return this.childMailboxes;
93 }
94
95 @Override
96 public Set<Tuple> getPulledContents(final boolean flush) {
97 final HashSet<Tuple> results = new HashSet<Tuple>();
98 pullInto(results, flush);
99 return results;
100 }
101
102 @Override
103 public ProjectionIndexer constructIndex(final TupleMask mask, final TraceInfo... traces) {
104 final GenericProjectionIndexer indexer = new GenericProjectionIndexer(reteContainer, mask);
105 for (final TraceInfo traceInfo : traces) {
106 indexer.assignTraceInfo(traceInfo);
107 }
108 reteContainer.connectAndSynchronize(this, indexer);
109 return indexer;
110 }
111
112 /**
113 * @since 1.6
114 */
115 protected void issueError(final String message, final Exception ex) {
116 if (ex == null) {
117 this.reteContainer.getNetwork().getEngine().getLogger().error(message);
118 } else {
119 this.reteContainer.getNetwork().getEngine().getLogger().error(message, ex);
120 }
121 }
122
123}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java
new file mode 100644
index 00000000..1917a7cf
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java
@@ -0,0 +1,82 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.Collection;
13import java.util.Map;
14import java.util.Set;
15
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
18import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
19import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
20import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
21import tools.refinery.viatra.runtime.rete.single.TrimmerNode;
22import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
23
24/**
25 * @author Gabor Bergmann
26 *
27 * A supplier is an object that can propagate insert or revoke events towards receivers.
28 */
29public interface Supplier extends Node {
30
31 /**
32 * Pulls the contents of this object in this particular moment into a target collection.
33 *
34 * @param flush if true, flushing of messages is allowed during the pull, otherwise flushing is not allowed
35 * @since 2.3
36 */
37 public void pullInto(Collection<Tuple> collector, boolean flush);
38
39 /**
40 * @since 2.4
41 */
42 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush);
43
44 /**
45 * Returns the contents of this object in this particular moment.
46 * For memoryless nodes, this may involve a costly recomputation of contents.
47 *
48 * The result is returned as a Set, even when it has multiplicities (at the output of {@link TrimmerNode}).
49 *
50 * <p> Intended mainly for debug purposes; therefore flushing is performed only if explicitly requested
51 * During runtime, flushing may be preferred; see {@link ReteContainer#pullContents(Supplier)}
52 * @since 2.3
53 */
54 public Set<Tuple> getPulledContents(boolean flush);
55
56 default public Set<Tuple> getPulledContents() {
57 return getPulledContents(true);
58 }
59
60 /**
61 * appends a receiver that will continously receive insert and revoke updates from this supplier
62 */
63 void appendChild(Receiver receiver);
64
65 /**
66 * removes a receiver
67 */
68 void removeChild(Receiver receiver);
69
70 /**
71 * Instantiates (or reuses, depending on implementation) an index according to the given mask.
72 *
73 * Intended for internal use; clients should invoke through Library instead to enable reusing.
74 */
75 ProjectionIndexer constructIndex(TupleMask mask, TraceInfo... traces);
76
77 /**
78 * lists receivers
79 */
80 Collection<Receiver> getReceivers();
81
82}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java
new file mode 100644
index 00000000..f238f47b
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java
@@ -0,0 +1,19 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12/**
13 * @author Gabor Bergmann
14 *
15 * A Tunnel is an interface into which elments can be instered and from which productions can be extracted.
16 */
17public interface Tunnel extends Supplier, Receiver {
18
19}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java
new file mode 100644
index 00000000..1334a3a9
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java
@@ -0,0 +1,31 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
13import tools.refinery.viatra.runtime.matchers.util.Direction;
14
15class UpdateMessage {
16 public Receiver receiver;
17 public Direction direction;
18 public Tuple updateElement;
19
20 public UpdateMessage(Receiver receiver, Direction direction, Tuple updateElement) {
21 this.receiver = receiver;
22 this.direction = direction;
23 this.updateElement = updateElement;
24 }
25
26 @Override
27 public String toString() {
28 return "M." + direction + ": " + updateElement + " -> " + receiver;
29 }
30
31}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java
new file mode 100644
index 00000000..8cedeb11
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java
@@ -0,0 +1,103 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11import java.util.Collection;
12import java.util.Map;
13
14import tools.refinery.viatra.runtime.rete.network.Node;
15import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
16
17/**
18 * A communication group represents a set of nodes in the communication graph that form a strongly connected component.
19 *
20 * @author Tamas Szabo
21 * @since 1.6
22 */
23public abstract class CommunicationGroup implements Comparable<CommunicationGroup> {
24
25 public static final String UNSUPPORTED_MESSAGE_KIND = "Unsupported message kind ";
26
27 /**
28 * Marker for the {@link CommunicationTracker}
29 */
30 public boolean isEnqueued = false;
31
32 protected final Node representative;
33
34 /**
35 * May be changed during bumping in {@link CommunicationTracker.registerDependency}
36 */
37 protected int identifier;
38
39 /**
40 * @since 1.7
41 */
42 protected final CommunicationTracker tracker;
43
44 /**
45 * @since 1.7
46 */
47 public CommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) {
48 this.tracker = tracker;
49 this.representative = representative;
50 this.identifier = identifier;
51 }
52
53 public abstract void deliverMessages();
54
55 public Node getRepresentative() {
56 return representative;
57 }
58
59 public abstract boolean isEmpty();
60
61 /**
62 * @since 2.0
63 */
64 public abstract void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind);
65
66 /**
67 * @since 2.0
68 */
69 public abstract void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind);
70
71 public abstract Map<MessageSelector, Collection<Mailbox>> getMailboxes();
72
73 public abstract boolean isRecursive();
74
75 @Override
76 public int hashCode() {
77 return this.identifier;
78 }
79
80 @Override
81 public String toString() {
82 return this.getClass().getSimpleName() + " " + this.identifier + " - representative: " + this.representative
83 + " - isEmpty: " + isEmpty();
84 }
85
86 @Override
87 public boolean equals(final Object obj) {
88 if (obj == null || this.getClass() != obj.getClass()) {
89 return false;
90 } else if (this == obj) {
91 return true;
92 } else {
93 final CommunicationGroup that = (CommunicationGroup) obj;
94 return this.identifier == that.identifier;
95 }
96 }
97
98 @Override
99 public int compareTo(final CommunicationGroup that) {
100 return this.identifier - that.identifier;
101 }
102
103}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java
new file mode 100644
index 00000000..d244e644
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java
@@ -0,0 +1,467 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11import java.util.HashMap;
12import java.util.HashSet;
13import java.util.List;
14import java.util.Map;
15import java.util.PriorityQueue;
16import java.util.Queue;
17import java.util.Set;
18
19import tools.refinery.viatra.runtime.rete.itc.alg.incscc.IncSCCAlg;
20import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting;
21import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph;
22import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
23import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode;
24import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode;
25import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode;
26import tools.refinery.viatra.runtime.rete.index.DualInputNode;
27import tools.refinery.viatra.runtime.rete.index.ExistenceNode;
28import tools.refinery.viatra.runtime.rete.index.Indexer;
29import tools.refinery.viatra.runtime.rete.index.IndexerListener;
30import tools.refinery.viatra.runtime.rete.index.IterableIndexer;
31import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer;
32import tools.refinery.viatra.runtime.rete.network.IGroupable;
33import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode;
34import tools.refinery.viatra.runtime.rete.network.Node;
35import tools.refinery.viatra.runtime.rete.network.ProductionNode;
36import tools.refinery.viatra.runtime.rete.network.Receiver;
37import tools.refinery.viatra.runtime.rete.network.ReteContainer;
38import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyIndexerListenerProxy;
39import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy;
40import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox;
41import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
42import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
43import tools.refinery.viatra.runtime.rete.single.TransitiveClosureNode;
44import tools.refinery.viatra.runtime.rete.single.TrimmerNode;
45
46/**
47 * An instance of this class is associated with every {@link ReteContainer}. The tracker serves two purposes: <br>
48 * (1) It allows RETE nodes to register their communication dependencies on-the-fly. These dependencies can be
49 * registered or unregistered when nodes are disposed of. <br>
50 * (2) It allows RETE nodes to register their mailboxes as dirty, that is, they can tell the tracker that they have
51 * something to send to other nodes in the network. The tracker is then responsible for ordering these messages (more
52 * precisely, the mailboxes that contain the messages) for the associated {@link ReteContainer}. The ordering is
53 * governed by the strongly connected components in the dependency network and follows a topological sorting scheme;
54 * those mailboxes will be emptied first whose owner nodes do not depend on other undelivered messages.
55 *
56 * @author Tamas Szabo
57 * @since 1.6
58 *
59 */
60public abstract class CommunicationTracker {
61
62 /**
63 * The minimum group id assigned so far
64 */
65 protected int minGroupId;
66
67 /**
68 * The maximum group id assigned so far
69 */
70 protected int maxGroupId;
71
72 /**
73 * The dependency graph of the communications in the RETE network
74 */
75 protected final Graph<Node> dependencyGraph;
76
77 /**
78 * Incremental SCC information about the dependency graph
79 */
80 protected final IncSCCAlg<Node> sccInformationProvider;
81
82 /**
83 * Precomputed node -> communication group map
84 */
85 protected final Map<Node, CommunicationGroup> groupMap;
86
87 /**
88 * Priority queue of active communication groups
89 */
90 protected final Queue<CommunicationGroup> groupQueue;
91
92 // groups should have a simple integer flag which represents its position in a priority queue
93 // priority queue only contains the ACTIVE groups
94
95 public CommunicationTracker() {
96 this.dependencyGraph = new Graph<Node>();
97 this.sccInformationProvider = new IncSCCAlg<Node>(this.dependencyGraph);
98 this.groupQueue = new PriorityQueue<CommunicationGroup>();
99 this.groupMap = new HashMap<Node, CommunicationGroup>();
100 }
101
102 public Graph<Node> getDependencyGraph() {
103 return dependencyGraph;
104 }
105
106 public CommunicationGroup getGroup(final Node node) {
107 return this.groupMap.get(node);
108 }
109
110 private void precomputeGroups() {
111 groupMap.clear();
112
113 // reconstruct group map from dependency graph
114 final Graph<Node> reducedGraph = sccInformationProvider.getReducedGraph();
115 final List<Node> representatives = TopologicalSorting.compute(reducedGraph);
116
117 for (int i = 0; i < representatives.size(); i++) { // groups for SCC representatives
118 final Node representative = representatives.get(i);
119 createAndStoreGroup(representative, i);
120 }
121
122 minGroupId = 0;
123 maxGroupId = representatives.size() - 1;
124
125 for (final Node node : dependencyGraph.getAllNodes()) { // extend group map to the rest of nodes
126 final Node representative = sccInformationProvider.getRepresentative(node);
127 final CommunicationGroup group = groupMap.get(representative);
128 if (representative != node) {
129 addToGroup(node, group);
130 }
131 }
132
133 for (final Node node : dependencyGraph.getAllNodes()) {
134 // set fall-through flags of default mailboxes
135 precomputeFallThroughFlag(node);
136 // perform further tracker-specific post-processing
137 postProcessNode(node);
138 }
139
140 // reconstruct new queue contents based on new group map
141 if (!groupQueue.isEmpty()) {
142 final Set<CommunicationGroup> oldActiveGroups = new HashSet<CommunicationGroup>(groupQueue);
143 groupQueue.clear();
144 reconstructQueueContents(oldActiveGroups);
145 }
146
147 // post process the groups
148 for (final CommunicationGroup group : groupMap.values()) {
149 postProcessGroup(group);
150 }
151 }
152
153 /**
154 * This method is responsible for reconstructing the active queue contents after the network structure has changed.
155 * It it defined as abstract because the reconstruction logic is specific to each {@link CommunicationTracker}.
156 * @since 2.4
157 */
158 protected abstract void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups);
159
160 private void addToGroup(final Node node, final CommunicationGroup group) {
161 groupMap.put(node, group);
162 if (node instanceof Receiver) {
163 ((Receiver) node).getMailbox().setCurrentGroup(group);
164 if (node instanceof IGroupable) {
165 ((IGroupable) node).setCurrentGroup(group);
166 }
167 }
168 }
169
170 /**
171 * Depends on the groups, as well as the parent nodes of the argument, so recomputation is needed if these change
172 */
173 private void precomputeFallThroughFlag(final Node node) {
174 CommunicationGroup group = groupMap.get(node);
175 if (node instanceof Receiver) {
176 IGroupable mailbox = ((Receiver) node).getMailbox();
177 if (mailbox instanceof FallThroughCapableMailbox) {
178 Set<Node> directParents = dependencyGraph.getSourceNodes(node).distinctValues();
179 // decide between using quick&cheap fall-through, or allowing for update cancellation
180 boolean fallThrough =
181 // disallow fallthrough: updates at production nodes should cancel, if they can be trimmed or
182 // disjunctive
183 (!(node instanceof ProductionNode && ( // it is a production node...
184 // with more than one parent
185 directParents.size() > 0 ||
186 // or true trimming in its sole parent
187 directParents.size() == 1 && trueTrimming(directParents.iterator().next())))) &&
188 // disallow fallthrough: external updates should be stored (if updates are delayed)
189 (!(node instanceof ExternalInputEnumeratorNode)) &&
190 // disallow fallthrough: RelationEvaluatorNode needs to be notified in batch-style, and the batching is done by the mailbox
191 // however, it is not the RelationEvaluatorNode itself that is interesting here, as that indirectly uses the BatchingReceiver
192 // so we need to disable fall-through for the BatchingReceiver
193 (!(node instanceof RelationEvaluatorNode.BatchingReceiver));
194 // do additional checks
195 if (fallThrough) {
196 // recursive parent groups generate excess updates that should be cancelled after delete&rederive
197 // phases
198 // aggregator and transitive closure parent nodes also generate excess updates that should be
199 // cancelled
200 directParentLoop: for (Node directParent : directParents) {
201 Set<Node> parentsToCheck = new HashSet<>();
202 // check the case where a direct parent is the reason for mailbox usage
203 parentsToCheck.add(directParent);
204 // check the case where an indirect parent (join slot) is the reason for mailbox usage
205 if (directParent instanceof DualInputNode) {
206 // in case of existence join (typically antijoin), a mailbox should allow
207 // an insertion and deletion (at the secondary slot) to cancel each other out
208 if (directParent instanceof ExistenceNode) {
209 fallThrough = false;
210 break directParentLoop;
211 }
212 // in beta nodes, indexer slots (or their active nodes) are considered indirect parents
213 DualInputNode dualInput = (DualInputNode) directParent;
214 IterableIndexer primarySlot = dualInput.getPrimarySlot();
215 if (primarySlot != null)
216 parentsToCheck.add(primarySlot.getActiveNode());
217 Indexer secondarySlot = dualInput.getSecondarySlot();
218 if (secondarySlot != null)
219 parentsToCheck.add(secondarySlot.getActiveNode());
220 }
221 for (Node parent : parentsToCheck) {
222 CommunicationGroup parentGroup = groupMap.get(parent);
223 if ( // parent is in a different, recursive group
224 (group != parentGroup && parentGroup.isRecursive()) ||
225 // node and parent within the same recursive group, and...
226 (group == parentGroup && group.isRecursive() && (
227 // parent is a transitive closure or aggregator node, or a trimmer
228 // allow trimmed or disjunctive tuple updates to cancel each other
229 (parent instanceof TransitiveClosureNode) || (parent instanceof IAggregatorNode)
230 || trueTrimming(parent)))) {
231 fallThrough = false;
232 break directParentLoop;
233 }
234 }
235 }
236 }
237 // overwrite fallthrough flag with newly computed value
238 ((FallThroughCapableMailbox) mailbox).setFallThrough(fallThrough);
239 }
240 }
241 }
242
243 /**
244 * A trimmer node that actually eliminates some columns (not just reorders)
245 */
246 private boolean trueTrimming(Node node) {
247 if (node instanceof TrimmerNode) {
248 TupleMask mask = ((TrimmerNode) node).getMask();
249 return (mask.indices.length != mask.sourceWidth);
250 }
251 return false;
252 }
253
254 public void activateUnenqueued(final CommunicationGroup group) {
255 groupQueue.add(group);
256 group.isEnqueued = true;
257 }
258
259 public void deactivate(final CommunicationGroup group) {
260 groupQueue.remove(group);
261 group.isEnqueued = false;
262 }
263
264 public CommunicationGroup getAndRemoveFirstGroup() {
265 final CommunicationGroup group = groupQueue.poll();
266 group.isEnqueued = false;
267 return group;
268 }
269
270 public boolean isEmpty() {
271 return groupQueue.isEmpty();
272 }
273
274 protected abstract CommunicationGroup createGroup(final Node representative, final int index);
275
276 protected CommunicationGroup createAndStoreGroup(final Node representative, final int index) {
277 final CommunicationGroup group = createGroup(representative, index);
278 addToGroup(representative, group);
279 return group;
280 }
281
282 /**
283 * Registers the dependency that the target {@link Node} depends on the source {@link Node}. In other words, source
284 * may send messages to target in the RETE network. If the dependency edge is already present, this method call is a
285 * noop.
286 *
287 * @param source
288 * the source node
289 * @param target
290 * the target node
291 */
292 public void registerDependency(final Node source, final Node target) {
293 // nodes can be immediately inserted, if they already exist in the graph, this is a noop
294 dependencyGraph.insertNode(source);
295 dependencyGraph.insertNode(target);
296
297 if (!this.dependencyGraph.getTargetNodes(source).containsNonZero(target)) {
298
299 // query all these information before the actual edge insertion
300 // because SCCs may be unified during the process
301 final Node sourceRepresentative = sccInformationProvider.getRepresentative(source);
302 final Node targetRepresentative = sccInformationProvider.getRepresentative(target);
303 final boolean targetHadOutgoingEdges = sccInformationProvider.hasOutgoingEdges(targetRepresentative);
304
305 // insert the edge
306 dependencyGraph.insertEdge(source, target);
307
308 // create groups if they do not yet exist
309 CommunicationGroup sourceGroup = groupMap.get(sourceRepresentative);
310 if (sourceGroup == null) {
311 // create on-demand with the next smaller group id
312 sourceGroup = createAndStoreGroup(sourceRepresentative, --minGroupId);
313 }
314 final int sourceIndex = sourceGroup.identifier;
315
316 CommunicationGroup targetGroup = groupMap.get(targetRepresentative);
317 if (targetGroup == null) {
318 // create on-demand with the next larger group id
319 targetGroup = createAndStoreGroup(targetRepresentative, ++maxGroupId);
320 }
321 final int targetIndex = targetGroup.identifier;
322
323 if (sourceIndex <= targetIndex) {
324 // indices obey current topological ordering
325 refreshFallThroughFlag(target);
326 postProcessNode(source);
327 postProcessNode(target);
328 postProcessGroup(sourceGroup);
329 if (sourceGroup != targetGroup) {
330 postProcessGroup(targetGroup);
331 }
332 } else if (sourceIndex > targetIndex && !targetHadOutgoingEdges) {
333 // indices violate current topological ordering, but we can simply bump the target index
334 final boolean wasEnqueued = targetGroup.isEnqueued;
335 if (wasEnqueued) {
336 groupQueue.remove(targetGroup);
337 }
338 targetGroup.identifier = ++maxGroupId;
339 if (wasEnqueued) {
340 groupQueue.add(targetGroup);
341 }
342
343 refreshFallThroughFlag(target);
344 postProcessNode(source);
345 postProcessNode(target);
346 postProcessGroup(sourceGroup);
347 postProcessGroup(targetGroup);
348 } else {
349 // needs a full re-computation because of more complex change
350 precomputeGroups();
351 }
352 }
353 }
354
355 /**
356 * Returns true if the given {@link Node} is in a recursive {@link CommunicationGroup}, false otherwise.
357 */
358 public boolean isInRecursiveGroup(final Node node) {
359 final CommunicationGroup group = this.getGroup(node);
360 if (group == null) {
361 return false;
362 } else {
363 return group.isRecursive();
364 }
365 }
366
367 /**
368 * Returns true if the given two {@link Node}s are in the same {@link CommunicationGroup}.
369 */
370 public boolean areInSameGroup(final Node left, final Node right) {
371 final CommunicationGroup leftGroup = this.getGroup(left);
372 final CommunicationGroup rightGroup = this.getGroup(right);
373 return leftGroup != null && leftGroup == rightGroup;
374 }
375
376 /**
377 * Unregisters a dependency between source and target.
378 *
379 * @param source
380 * the source node
381 * @param target
382 * the target node
383 */
384 public void unregisterDependency(final Node source, final Node target) {
385 // delete the edge first, and then query the SCC info provider
386 this.dependencyGraph.deleteEdgeIfExists(source, target);
387
388 final Node sourceRepresentative = sccInformationProvider.getRepresentative(source);
389 final Node targetRepresentative = sccInformationProvider.getRepresentative(target);
390
391 // if they are still in the same SCC,
392 // then this deletion did not affect the SCCs,
393 // and it is sufficient to recompute affected fall-through flags;
394 // otherwise, we need a new pre-computation for the groupMap and groupQueue
395 if (sourceRepresentative.equals(targetRepresentative)) {
396 // this deletion could not have affected the split flags
397 refreshFallThroughFlag(target);
398 postProcessNode(source);
399 postProcessNode(target);
400 } else {
401 // preComputeGroups takes care of the split flag maintenance
402 precomputeGroups();
403 }
404 }
405
406 /**
407 * Refresh fall-through flags if dependencies change for given target, but no SCC change
408 */
409 private void refreshFallThroughFlag(final Node target) {
410 precomputeFallThroughFlag(target);
411 if (target instanceof DualInputNode) {
412 for (final Node indirectTarget : dependencyGraph.getTargetNodes(target).distinctValues()) {
413 precomputeFallThroughFlag(indirectTarget);
414 }
415 }
416 }
417
418 /**
419 * Returns true if the given source-target edge in the communication network acts as a recursion cut point.
420 * The current implementation considers edges leading into {@link ProductionNode}s as cut point iff
421 * both source and target belong to the same group.
422 *
423 * @param source the source node
424 * @param target the target node
425 * @return true if the edge is a cut point, false otherwise
426 * @since 2.4
427 */
428 protected boolean isRecursionCutPoint(final Node source, final Node target) {
429 final Node effectiveSource = source instanceof SpecializedProjectionIndexer
430 ? ((SpecializedProjectionIndexer) source).getActiveNode()
431 : source;
432 final CommunicationGroup sourceGroup = this.getGroup(effectiveSource);
433 final CommunicationGroup targetGroup = this.getGroup(target);
434 return sourceGroup != null && sourceGroup == targetGroup && target instanceof ProductionNode;
435 }
436
437 /**
438 * This hook allows concrete tracker implementations to perform tracker-specific post processing on nodes (cf.
439 * {@link NetworkStructureChangeSensitiveNode} and {@link BehaviorChangingMailbox}). At the time of the invocation,
440 * the network topology has already been updated.
441 */
442 protected abstract void postProcessNode(final Node node);
443
444 /**
445 * This hook allows concrete tracker implementations to perform tracker-specific post processing on groups. At the
446 * time of the invocation, the network topology has already been updated.
447 * @since 2.4
448 */
449 protected abstract void postProcessGroup(final CommunicationGroup group);
450
451 /**
452 * Creates a proxy for the given {@link Mailbox} for the given requester {@link Node}. The proxy creation is
453 * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used
454 * to create {@link TimelyMailboxProxy}s depending on the network topology. There is no guarantee that the same
455 * proxy instance is returned when this method is called multiple times with the same arguments.
456 */
457 public abstract Mailbox proxifyMailbox(final Node requester, final Mailbox original);
458
459 /**
460 * Creates a proxy for the given {@link IndexerListener} for the given requester {@link Node}. The proxy creation is
461 * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used
462 * to create {@link TimelyIndexerListenerProxy}s depending on the network topology. There is no guarantee that the
463 * same proxy instance is returned when this method is called multiple times with the same arguments.
464 */
465 public abstract IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original);
466
467}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java
new file mode 100644
index 00000000..e1a61693
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java
@@ -0,0 +1,19 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11/**
12 * Subclasses of this interface represent meta data of update messages in Rete.
13 *
14 * @author Tamas Szabo
15 * @since 2.3
16 */
17public interface MessageSelector {
18
19}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java
new file mode 100644
index 00000000..27779352
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java
@@ -0,0 +1,32 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11import java.util.Comparator;
12import java.util.Map;
13
14import tools.refinery.viatra.runtime.rete.network.Node;
15
16/**
17 * @since 2.4
18 */
19public class NodeComparator implements Comparator<Node> {
20
21 protected final Map<Node, Integer> nodeMap;
22
23 public NodeComparator(final Map<Node, Integer> nodeMap) {
24 this.nodeMap = nodeMap;
25 }
26
27 @Override
28 public int compare(final Node left, final Node right) {
29 return this.nodeMap.get(left) - this.nodeMap.get(right);
30 }
31
32} \ No newline at end of file
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java
new file mode 100644
index 00000000..41cd8cd3
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java
@@ -0,0 +1,34 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11/**
12 * A default message selector that can be used to associate phases to messages.
13 *
14 * @author Tamas Szabo
15 * @since 2.3
16 */
17public enum PhasedSelector implements MessageSelector {
18
19 /**
20 * No special distinguishing feature
21 */
22 DEFAULT,
23
24 /**
25 * Inserts and delete-insert monotone change pairs
26 */
27 MONOTONE,
28
29 /**
30 * Deletes
31 */
32 ANTI_MONOTONE
33
34}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java
new file mode 100644
index 00000000..a50a63a8
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java
@@ -0,0 +1,124 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication;
10
11import java.util.AbstractMap;
12import java.util.Collection;
13import java.util.Map;
14import java.util.Set;
15
16import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
17import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines;
18
19/**
20 * A timestamp associated with update messages in timely evaluation.
21 *
22 * @author Tamas Szabo
23 * @since 2.3
24 */
25public class Timestamp implements Comparable<Timestamp>, MessageSelector {
26
27 protected final int value;
28 public static final Timestamp ZERO = new Timestamp(0);
29 /**
30 * @since 2.4
31 */
32 public static final Timeline<Timestamp> INSERT_AT_ZERO_TIMELINE = Timelines.createFrom(Timestamp.ZERO);
33
34 public Timestamp(final int value) {
35 this.value = value;
36 }
37
38 public int getValue() {
39 return value;
40 }
41
42 public Timestamp max(final Timestamp that) {
43 if (this.value >= that.value) {
44 return this;
45 } else {
46 return that;
47 }
48 }
49
50 /**
51 * @since 2.4
52 */
53 public Timestamp min(final Timestamp that) {
54 if (this.value <= that.value) {
55 return this;
56 } else {
57 return that;
58 }
59 }
60
61 @Override
62 public int compareTo(final Timestamp that) {
63 return this.value - that.value;
64 }
65
66 @Override
67 public boolean equals(final Object obj) {
68 if (obj == null || !(obj instanceof Timestamp)) {
69 return false;
70 } else {
71 return this.value == ((Timestamp) obj).value;
72 }
73 }
74
75 @Override
76 public int hashCode() {
77 return this.value;
78 }
79
80 @Override
81 public String toString() {
82 return Integer.toString(this.value);
83 }
84
85 /**
86 * A {@link Map} implementation that associates the zero timestamp with every key. There is no suppor for
87 * {@link Map#entrySet()} due to performance reasons.
88 *
89 * @author Tamas Szabo
90 */
91 public static final class AllZeroMap<T> extends AbstractMap<T, Timeline<Timestamp>> {
92
93 private final Collection<T> wrapped;
94
95 public AllZeroMap(Set<T> wrapped) {
96 this.wrapped = wrapped;
97 }
98
99 @Override
100 public Set<Entry<T, Timeline<Timestamp>>> entrySet() {
101 throw new UnsupportedOperationException("Use the combination of keySet() and get()!");
102 }
103
104 /**
105 * @since 2.4
106 */
107 @Override
108 public Timeline<Timestamp> get(final Object key) {
109 return INSERT_AT_ZERO_TIMELINE;
110 }
111
112 @Override
113 public Set<T> keySet() {
114 return (Set<T>) this.wrapped;
115 }
116
117 @Override
118 public String toString() {
119 return this.getClass().getSimpleName() + ": " + this.keySet().toString();
120 }
121
122 }
123
124}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java
new file mode 100644
index 00000000..d8260384
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java
@@ -0,0 +1,164 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timeless;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.EnumMap;
14import java.util.LinkedHashSet;
15import java.util.Map;
16import java.util.Set;
17
18import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
19import tools.refinery.viatra.runtime.rete.network.Node;
20import tools.refinery.viatra.runtime.rete.network.RederivableNode;
21import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
22import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
23import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
24import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
25import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
26
27/**
28 * A communication group representing either a single node where the
29 * node is a monotonicity aware one or a set of nodes that form an SCC.
30 *
31 * @author Tamas Szabo
32 * @since 2.4
33 */
34public class RecursiveCommunicationGroup extends CommunicationGroup {
35
36 private final Set<Mailbox> antiMonotoneMailboxes;
37 private final Set<Mailbox> monotoneMailboxes;
38 private final Set<Mailbox> defaultMailboxes;
39 private final Set<RederivableNode> rederivables;
40 private boolean currentlyDelivering;
41
42 /**
43 * @since 1.7
44 */
45 public RecursiveCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) {
46 super(tracker, representative, identifier);
47 this.antiMonotoneMailboxes = CollectionsFactory.createSet();
48 this.monotoneMailboxes = CollectionsFactory.createSet();
49 this.defaultMailboxes = CollectionsFactory.createSet();
50 this.rederivables = new LinkedHashSet<RederivableNode>();
51 this.currentlyDelivering = false;
52 }
53
54 @Override
55 public void deliverMessages() {
56 this.currentlyDelivering = true;
57
58 // ANTI-MONOTONE PHASE
59 while (!this.antiMonotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) {
60 while (!this.antiMonotoneMailboxes.isEmpty()) {
61 final Mailbox mailbox = this.antiMonotoneMailboxes.iterator().next();
62 this.antiMonotoneMailboxes.remove(mailbox);
63 mailbox.deliverAll(PhasedSelector.ANTI_MONOTONE);
64 }
65 while (!this.defaultMailboxes.isEmpty()) {
66 final Mailbox mailbox = this.defaultMailboxes.iterator().next();
67 this.defaultMailboxes.remove(mailbox);
68 mailbox.deliverAll(PhasedSelector.DEFAULT);
69 }
70 }
71
72 // REDERIVE PHASE
73 while (!this.rederivables.isEmpty()) {
74 // re-derivable nodes take care of their unregistration!!
75 final RederivableNode node = this.rederivables.iterator().next();
76 node.rederiveOne();
77 }
78
79 // MONOTONE PHASE
80 while (!this.monotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) {
81 while (!this.monotoneMailboxes.isEmpty()) {
82 final Mailbox mailbox = this.monotoneMailboxes.iterator().next();
83 this.monotoneMailboxes.remove(mailbox);
84 mailbox.deliverAll(PhasedSelector.MONOTONE);
85 }
86 while (!this.defaultMailboxes.isEmpty()) {
87 final Mailbox mailbox = this.defaultMailboxes.iterator().next();
88 this.defaultMailboxes.remove(mailbox);
89 mailbox.deliverAll(PhasedSelector.DEFAULT);
90 }
91 }
92
93 this.currentlyDelivering = false;
94 }
95
96 @Override
97 public boolean isEmpty() {
98 return this.rederivables.isEmpty() && this.antiMonotoneMailboxes.isEmpty()
99 && this.monotoneMailboxes.isEmpty() && this.defaultMailboxes.isEmpty();
100 }
101
102 @Override
103 public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) {
104 final Collection<Mailbox> mailboxes = getMailboxContainer(kind);
105 mailboxes.add(mailbox);
106 if (!this.isEnqueued && !this.currentlyDelivering) {
107 this.tracker.activateUnenqueued(this);
108 }
109 }
110
111 @Override
112 public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) {
113 final Collection<Mailbox> mailboxes = getMailboxContainer(kind);
114 mailboxes.remove(mailbox);
115 if (isEmpty()) {
116 this.tracker.deactivate(this);
117 }
118 }
119
120 private Collection<Mailbox> getMailboxContainer(final MessageSelector kind) {
121 if (kind == PhasedSelector.ANTI_MONOTONE) {
122 return this.antiMonotoneMailboxes;
123 } else if (kind == PhasedSelector.MONOTONE) {
124 return this.monotoneMailboxes;
125 } else if (kind == PhasedSelector.DEFAULT) {
126 return this.defaultMailboxes;
127 } else {
128 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
129 }
130 }
131
132 public void addRederivable(final RederivableNode node) {
133 this.rederivables.add(node);
134 if (!this.isEnqueued) {
135 this.tracker.activateUnenqueued(this);
136 }
137 }
138
139 public void removeRederivable(final RederivableNode node) {
140 this.rederivables.remove(node);
141 if (isEmpty()) {
142 this.tracker.deactivate(this);
143 }
144 }
145
146 public Collection<RederivableNode> getRederivables() {
147 return this.rederivables;
148 }
149
150 @Override
151 public Map<MessageSelector, Collection<Mailbox>> getMailboxes() {
152 Map<PhasedSelector, Collection<Mailbox>> map = new EnumMap<>(PhasedSelector.class);
153 map.put(PhasedSelector.ANTI_MONOTONE, antiMonotoneMailboxes);
154 map.put(PhasedSelector.MONOTONE, monotoneMailboxes);
155 map.put(PhasedSelector.DEFAULT, defaultMailboxes);
156 return Collections.unmodifiableMap(map);
157 }
158
159 @Override
160 public boolean isRecursive() {
161 return true;
162 }
163
164}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java
new file mode 100644
index 00000000..c51c7dbf
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java
@@ -0,0 +1,86 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timeless;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Map;
14
15import tools.refinery.viatra.runtime.rete.network.Node;
16import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
17import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
18import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
19import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
20import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
21
22/**
23 * A communication group containing only a single node with a single default
24 * mailbox.
25 *
26 * @author Tamas Szabo
27 * @since 1.6
28 */
29public class SingletonCommunicationGroup extends CommunicationGroup {
30
31 private Mailbox mailbox;
32
33 /**
34 * @since 1.7
35 */
36 public SingletonCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) {
37 super(tracker, representative, identifier);
38 }
39
40 @Override
41 public void deliverMessages() {
42 this.mailbox.deliverAll(PhasedSelector.DEFAULT);
43 }
44
45 @Override
46 public boolean isEmpty() {
47 return this.mailbox == null;
48 }
49
50 @Override
51 public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) {
52 if (kind == PhasedSelector.DEFAULT) {
53 this.mailbox = mailbox;
54 if (!this.isEnqueued) {
55 this.tracker.activateUnenqueued(this);
56 }
57 } else {
58 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
59 }
60 }
61
62 @Override
63 public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) {
64 if (kind == PhasedSelector.DEFAULT) {
65 this.mailbox = null;
66 this.tracker.deactivate(this);
67 } else {
68 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
69 }
70 }
71
72 @Override
73 public Map<MessageSelector, Collection<Mailbox>> getMailboxes() {
74 if (mailbox != null) {
75 return Collections.singletonMap(PhasedSelector.DEFAULT, Collections.singleton(mailbox));
76 } else {
77 return Collections.emptyMap();
78 }
79 }
80
81 @Override
82 public boolean isRecursive() {
83 return false;
84 }
85
86}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java
new file mode 100644
index 00000000..1c18c1cd
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java
@@ -0,0 +1,149 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timeless;
10
11import java.util.Collection;
12import java.util.HashSet;
13import java.util.Set;
14import java.util.Map.Entry;
15
16import tools.refinery.viatra.runtime.rete.index.DualInputNode;
17import tools.refinery.viatra.runtime.rete.index.Indexer;
18import tools.refinery.viatra.runtime.rete.index.IndexerListener;
19import tools.refinery.viatra.runtime.rete.index.IterableIndexer;
20import tools.refinery.viatra.runtime.rete.network.Node;
21import tools.refinery.viatra.runtime.rete.network.Receiver;
22import tools.refinery.viatra.runtime.rete.network.RederivableNode;
23import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
25import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
26import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
27import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
28
29/**
30 * Timeless implementation of the communication tracker.
31 *
32 * @author Tamas Szabo
33 * @since 2.2
34 */
35public class TimelessCommunicationTracker extends CommunicationTracker {
36
37 @Override
38 protected CommunicationGroup createGroup(Node representative, int index) {
39 final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1;
40 final boolean isReceiver = representative instanceof Receiver;
41 final boolean isPosetIndifferent = isReceiver
42 && ((Receiver) representative).getMailbox() instanceof BehaviorChangingMailbox;
43 final boolean isSingletonInDRedMode = isSingleton && (representative instanceof RederivableNode)
44 && ((RederivableNode) representative).isInDRedMode();
45
46 CommunicationGroup group = null;
47 // we can only use a singleton group iff
48 // (1) the SCC has one node AND
49 // (2) either we have a poset-indifferent mailbox OR the node is not even a receiver AND
50 // (3) the node does not run in DRed mode in a singleton group
51 if (isSingleton && (isPosetIndifferent || !isReceiver) && !isSingletonInDRedMode) {
52 group = new SingletonCommunicationGroup(this, representative, index);
53 } else {
54 group = new RecursiveCommunicationGroup(this, representative, index);
55 }
56
57 return group;
58 }
59
60 @Override
61 protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) {
62 for (final CommunicationGroup oldGroup : oldActiveGroups) {
63 for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) {
64 for (final Mailbox mailbox : entry.getValue()) {
65 final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver());
66 newGroup.notifyHasMessage(mailbox, entry.getKey());
67 }
68 }
69
70 if (oldGroup instanceof RecursiveCommunicationGroup) {
71 for (final RederivableNode node : ((RecursiveCommunicationGroup) oldGroup).getRederivables()) {
72 final CommunicationGroup newGroup = this.groupMap.get(node);
73 if (!(newGroup instanceof RecursiveCommunicationGroup)) {
74 throw new IllegalStateException("The new group must also be recursive! " + newGroup);
75 }
76 ((RecursiveCommunicationGroup) newGroup).addRederivable(node);
77 }
78 }
79 }
80 }
81
82 @Override
83 public Mailbox proxifyMailbox(final Node requester, final Mailbox original) {
84 return original;
85 }
86
87 @Override
88 public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) {
89 return original;
90 }
91
92 @Override
93 protected void postProcessNode(final Node node) {
94 if (node instanceof Receiver) {
95 final Mailbox mailbox = ((Receiver) node).getMailbox();
96 if (mailbox instanceof BehaviorChangingMailbox) {
97 final CommunicationGroup group = this.groupMap.get(node);
98 final Set<Node> sccNodes = this.sccInformationProvider.sccs.getPartition(node);
99 // a default mailbox must split its messages iff
100 // (1) its receiver is in a recursive group and
101 final boolean c1 = group.isRecursive();
102 // (2) its receiver is at the SCC boundary of that group
103 final boolean c2 = isAtSCCBoundary(node);
104 // (3) its group consists of more than one node
105 final boolean c3 = sccNodes.size() > 1;
106 ((BehaviorChangingMailbox) mailbox).setSplitFlag(c1 && c2 && c3);
107 }
108 }
109 }
110
111 @Override
112 protected void postProcessGroup(final CommunicationGroup group) {
113
114 }
115
116 /**
117 * @since 2.0
118 */
119 private boolean isAtSCCBoundary(final Node node) {
120 final CommunicationGroup ownGroup = this.groupMap.get(node);
121 assert ownGroup != null;
122 for (final Node source : this.dependencyGraph.getSourceNodes(node).distinctValues()) {
123 final Set<Node> sourcesToCheck = new HashSet<Node>();
124 sourcesToCheck.add(source);
125 // DualInputNodes must be checked additionally because they do not use a mailbox directly.
126 // It can happen that their indexers actually belong to other SCCs.
127 if (source instanceof DualInputNode) {
128 final DualInputNode dualInput = (DualInputNode) source;
129 final IterableIndexer primarySlot = dualInput.getPrimarySlot();
130 if (primarySlot != null) {
131 sourcesToCheck.add(primarySlot.getActiveNode());
132 }
133 final Indexer secondarySlot = dualInput.getSecondarySlot();
134 if (secondarySlot != null) {
135 sourcesToCheck.add(secondarySlot.getActiveNode());
136 }
137 }
138 for (final Node current : sourcesToCheck) {
139 final CommunicationGroup otherGroup = this.groupMap.get(current);
140 assert otherGroup != null;
141 if (!ownGroup.equals(otherGroup)) {
142 return true;
143 }
144 }
145 }
146 return false;
147 }
148
149}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java
new file mode 100644
index 00000000..8097bd91
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java
@@ -0,0 +1,36 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import tools.refinery.viatra.runtime.rete.network.IGroupable;
12import tools.refinery.viatra.runtime.rete.network.Node;
13import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
14
15/**
16 * {@link Node}s that implement this interface can resume folding of their states when instructed during timely evaluation.
17 *
18 * @since 2.3
19 * @author Tamas Szabo
20 */
21public interface ResumableNode extends Node, IGroupable {
22
23 /**
24 * When called, the folding of the state shall be resumed at the given timestamp. The resumable is expected to
25 * do a folding step at the given timestamp only. Afterwards, folding shall be interrupted, even if there is more
26 * folding to do towards higher timestamps.
27 */
28 public void resumeAt(final Timestamp timestamp);
29
30 /**
31 * Returns the smallest timestamp where lazy folding shall be resumed, or null if there is no more folding to do in this
32 * resumable.
33 */
34 public Timestamp getResumableTimestamp();
35
36}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java
new file mode 100644
index 00000000..0394d92c
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java
@@ -0,0 +1,171 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Comparator;
14import java.util.HashMap;
15import java.util.Map;
16import java.util.Map.Entry;
17import java.util.Set;
18import java.util.TreeMap;
19import java.util.TreeSet;
20
21import org.apache.log4j.Logger;
22import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
23import tools.refinery.viatra.runtime.rete.network.Node;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
25import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
26import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
27import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
28import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;
29import tools.refinery.viatra.runtime.rete.util.Options;
30
31/**
32 * A timely communication group implementation. {@link TimelyMailbox}es and {@link LazyFoldingNode}s are ordered in the
33 * increasing order of timestamps.
34 *
35 * @author Tamas Szabo
36 * @since 2.3
37 */
38public class TimelyCommunicationGroup extends CommunicationGroup {
39
40 private final boolean isSingleton;
41 private final TreeMap<Timestamp, Set<Mailbox>> mailboxQueue;
42 // may be null - only used in the scattered case where we need to take care of mailboxes and resumables too
43 private Comparator<Node> nodeComparator;
44 private boolean currentlyDelivering;
45 private Timestamp currentlyDeliveredTimestamp;
46
47 public TimelyCommunicationGroup(final TimelyCommunicationTracker tracker, final Node representative,
48 final int identifier, final boolean isSingleton) {
49 super(tracker, representative, identifier);
50 this.isSingleton = isSingleton;
51 this.mailboxQueue = CollectionsFactory.createTreeMap();
52 this.currentlyDelivering = false;
53 }
54
55 /**
56 * Sets the {@link Comparator} to be used to order the {@link Mailbox}es at a given {@link Timestamp} in the mailbox
57 * queue. Additionally, reorders already queued {@link Mailbox}es to reflect the new comparator. The comparator may
58 * be null, in this case, no set ordering will be enforced among the {@link Mailbox}es.
59 */
60 public void setComparatorAndReorderMailboxes(final Comparator<Node> nodeComparator) {
61 this.nodeComparator = nodeComparator;
62 if (!this.mailboxQueue.isEmpty()) {
63 final HashMap<Timestamp, Set<Mailbox>> queueCopy = new HashMap<Timestamp, Set<Mailbox>>(this.mailboxQueue);
64 this.mailboxQueue.clear();
65 for (final Entry<Timestamp, Set<Mailbox>> entry : queueCopy.entrySet()) {
66 for (final Mailbox mailbox : entry.getValue()) {
67 this.notifyHasMessage(mailbox, entry.getKey());
68 }
69 }
70 }
71 }
72
73 @Override
74 public void deliverMessages() {
75 this.currentlyDelivering = true;
76 while (!this.mailboxQueue.isEmpty()) {
77 // care must be taken here how we iterate over the mailboxes
78 // it is not okay to loop over the mailboxes at once because a mailbox may disappear from the collection as
79 // a result of delivering messages from another mailboxes under the same timestamp
80 // because of this, it is crucial that we pick the mailboxes one by one
81 final Entry<Timestamp, Set<Mailbox>> entry = this.mailboxQueue.firstEntry();
82 final Timestamp timestamp = entry.getKey();
83 final Set<Mailbox> mailboxes = entry.getValue();
84 final Mailbox mailbox = mailboxes.iterator().next();
85 mailboxes.remove(mailbox);
86 if (mailboxes.isEmpty()) {
87 this.mailboxQueue.pollFirstEntry();
88 }
89 assert mailbox instanceof TimelyMailbox;
90 /* debug */ this.currentlyDeliveredTimestamp = timestamp;
91 mailbox.deliverAll(timestamp);
92 /* debug */ this.currentlyDeliveredTimestamp = null;
93 }
94 this.currentlyDelivering = false;
95 }
96
97 @Override
98 public boolean isEmpty() {
99 return this.mailboxQueue.isEmpty();
100 }
101
102 @Override
103 public void notifyHasMessage(final Mailbox mailbox, MessageSelector kind) {
104 if (kind instanceof Timestamp) {
105 final Timestamp timestamp = (Timestamp) kind;
106 if (Options.MONITOR_VIOLATION_OF_DIFFERENTIAL_DATAFLOW_TIMESTAMPS) {
107 if (timestamp.compareTo(this.currentlyDeliveredTimestamp) < 0) {
108 final Logger logger = this.representative.getContainer().getNetwork().getEngine().getLogger();
109 logger.error(
110 "[INTERNAL ERROR] Violation of differential dataflow communication schema! The communication component with representative "
111 + this.representative + " observed decreasing timestamp during message delivery!");
112 }
113 }
114 final Set<Mailbox> mailboxes = this.mailboxQueue.computeIfAbsent(timestamp, k -> {
115 if (this.nodeComparator == null) {
116 return CollectionsFactory.createSet();
117 } else {
118 return new TreeSet<Mailbox>(new Comparator<Mailbox>() {
119 @Override
120 public int compare(final Mailbox left, final Mailbox right) {
121 return nodeComparator.compare(left.getReceiver(), right.getReceiver());
122 }
123 });
124 }
125 });
126 mailboxes.add(mailbox);
127 if (!this.isEnqueued && !this.currentlyDelivering) {
128 this.tracker.activateUnenqueued(this);
129 }
130 } else {
131 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
132 }
133 }
134
135 @Override
136 public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) {
137 if (kind instanceof Timestamp) {
138 final Timestamp timestamp = (Timestamp) kind;
139 this.mailboxQueue.compute(timestamp, (k, v) -> {
140 if (v == null) {
141 throw new IllegalStateException("No mailboxes registered at timestamp " + timestamp + "!");
142 }
143 if (!v.remove(mailbox)) {
144 throw new IllegalStateException(
145 "The mailbox " + mailbox + " was not registered at timestamp " + timestamp + "!");
146 }
147 if (v.isEmpty()) {
148 return null;
149 } else {
150 return v;
151 }
152 });
153 if (this.mailboxQueue.isEmpty()) {
154 this.tracker.deactivate(this);
155 }
156 } else {
157 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
158 }
159 }
160
161 @Override
162 public Map<MessageSelector, Collection<Mailbox>> getMailboxes() {
163 return Collections.unmodifiableMap(this.mailboxQueue);
164 }
165
166 @Override
167 public boolean isRecursive() {
168 return !this.isSingleton;
169 }
170
171}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java
new file mode 100644
index 00000000..79179880
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java
@@ -0,0 +1,216 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import java.util.Collection;
12import java.util.List;
13import java.util.Map;
14import java.util.Map.Entry;
15import java.util.Set;
16import java.util.function.Function;
17
18import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting;
19import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph;
20import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
21import tools.refinery.viatra.runtime.rete.index.IndexerListener;
22import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer;
23import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
24import tools.refinery.viatra.runtime.rete.index.StandardIndexer;
25import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
26import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
27import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode;
28import tools.refinery.viatra.runtime.rete.network.Node;
29import tools.refinery.viatra.runtime.rete.network.ProductionNode;
30import tools.refinery.viatra.runtime.rete.network.StandardNode;
31import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
32import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
33import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
34import tools.refinery.viatra.runtime.rete.network.communication.NodeComparator;
35import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
36import tools.refinery.viatra.runtime.rete.single.DiscriminatorDispatcherNode;
37
38/**
39 * Timely (DDF) implementation of the {@link CommunicationTracker}.
40 *
41 * @author Tamas Szabo
42 * @since 2.3
43 */
44public class TimelyCommunicationTracker extends CommunicationTracker {
45
46 protected final TimelyConfiguration configuration;
47
48 public TimelyCommunicationTracker(final TimelyConfiguration configuration) {
49 this.configuration = configuration;
50 }
51
52 @Override
53 protected CommunicationGroup createGroup(final Node representative, final int index) {
54 final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1;
55 return new TimelyCommunicationGroup(this, representative, index, isSingleton);
56 }
57
58 @Override
59 protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) {
60 for (final CommunicationGroup oldGroup : oldActiveGroups) {
61 for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) {
62 for (final Mailbox mailbox : entry.getValue()) {
63 final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver());
64 newGroup.notifyHasMessage(mailbox, entry.getKey());
65 }
66 }
67 }
68 }
69
70 @Override
71 public Mailbox proxifyMailbox(final Node requester, final Mailbox original) {
72 final Mailbox mailboxToProxify = (original instanceof TimelyMailboxProxy)
73 ? ((TimelyMailboxProxy) original).getWrappedMailbox()
74 : original;
75 final TimestampTransformation preprocessor = getPreprocessor(requester, mailboxToProxify.getReceiver());
76 if (preprocessor == null) {
77 return mailboxToProxify;
78 } else {
79 return new TimelyMailboxProxy(mailboxToProxify, preprocessor);
80 }
81 }
82
83 @Override
84 public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) {
85 final IndexerListener listenerToProxify = (original instanceof TimelyIndexerListenerProxy)
86 ? ((TimelyIndexerListenerProxy) original).getWrappedIndexerListener()
87 : original;
88 final TimestampTransformation preprocessor = getPreprocessor(requester, listenerToProxify.getOwner());
89 if (preprocessor == null) {
90 return listenerToProxify;
91 } else {
92 return new TimelyIndexerListenerProxy(listenerToProxify, preprocessor);
93 }
94 }
95
96 protected TimestampTransformation getPreprocessor(final Node source, final Node target) {
97 final Node effectiveSource = source instanceof SpecializedProjectionIndexer
98 ? ((SpecializedProjectionIndexer) source).getActiveNode()
99 : source;
100 final CommunicationGroup sourceGroup = this.getGroup(effectiveSource);
101 final CommunicationGroup targetGroup = this.getGroup(target);
102
103 if (sourceGroup != null && targetGroup != null) {
104 // during RETE construction, the groups may be still null
105 if (sourceGroup != targetGroup && sourceGroup.isRecursive()) {
106 // targetGroup is a successor SCC of sourceGroup
107 // and sourceGroup is a recursive SCC
108 // then we need to zero out the timestamps
109 return TimestampTransformation.RESET;
110 }
111 if (sourceGroup == targetGroup && target instanceof ProductionNode) {
112 // if requester and receiver are in the same SCC
113 // and receiver is a production node
114 // then we need to increment the timestamps
115 return TimestampTransformation.INCREMENT;
116 }
117 }
118
119 return null;
120 }
121
122 @Override
123 protected void postProcessNode(final Node node) {
124 if (node instanceof NetworkStructureChangeSensitiveNode) {
125 ((NetworkStructureChangeSensitiveNode) node).networkStructureChanged();
126 }
127 }
128
129 @Override
130 protected void postProcessGroup(final CommunicationGroup group) {
131 if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) {
132 final Node representative = group.getRepresentative();
133 final Set<Node> groupMembers = this.sccInformationProvider.sccs.getPartition(representative);
134 if (groupMembers.size() > 1) {
135 final Graph<Node> graph = new Graph<Node>();
136
137 for (final Node node : groupMembers) {
138 graph.insertNode(node);
139 }
140
141 for (final Node source : groupMembers) {
142 for (final Node target : this.dependencyGraph.getTargetNodes(source)) {
143 // (1) the edge is not a recursion cut point
144 // (2) the edge is within this group
145 if (!this.isRecursionCutPoint(source, target) && groupMembers.contains(target)) {
146 graph.insertEdge(source, target);
147 }
148 }
149 }
150
151 final List<Node> orderedNodes = TopologicalSorting.compute(graph);
152 final Map<Node, Integer> nodeMap = CollectionsFactory.createMap();
153 int identifier = 0;
154 for (final Node orderedNode : orderedNodes) {
155 nodeMap.put(orderedNode, identifier++);
156 }
157
158 ((TimelyCommunicationGroup) group).setComparatorAndReorderMailboxes(new NodeComparator(nodeMap));
159 }
160 }
161 }
162
163 /**
164 * This static field is used for debug purposes in the DotGenerator.
165 */
166 public static final Function<Node, Function<Node, String>> EDGE_LABEL_FUNCTION = new Function<Node, Function<Node, String>>() {
167
168 @Override
169 public Function<Node, String> apply(final Node source) {
170 return new Function<Node, String>() {
171 @Override
172 public String apply(final Node target) {
173 if (source instanceof SpecializedProjectionIndexer) {
174 final Collection<ListenerSubscription> subscriptions = ((SpecializedProjectionIndexer) source)
175 .getSubscriptions();
176 for (final ListenerSubscription subscription : subscriptions) {
177 if (subscription.getListener().getOwner() == target
178 && subscription.getListener() instanceof TimelyIndexerListenerProxy) {
179 return ((TimelyIndexerListenerProxy) subscription.getListener()).preprocessor
180 .toString();
181 }
182 }
183 }
184 if (source instanceof StandardIndexer) {
185 final Collection<IndexerListener> listeners = ((StandardIndexer) source).getListeners();
186 for (final IndexerListener listener : listeners) {
187 if (listener.getOwner() == target && listener instanceof TimelyIndexerListenerProxy) {
188 return ((TimelyIndexerListenerProxy) listener).preprocessor.toString();
189 }
190 }
191 }
192 if (source instanceof StandardNode) {
193 final Collection<Mailbox> mailboxes = ((StandardNode) source).getChildMailboxes();
194 for (final Mailbox mailbox : mailboxes) {
195 if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) {
196 return ((TimelyMailboxProxy) mailbox).preprocessor.toString();
197 }
198 }
199 }
200 if (source instanceof DiscriminatorDispatcherNode) {
201 final Collection<Mailbox> mailboxes = ((DiscriminatorDispatcherNode) source)
202 .getBucketMailboxes().values();
203 for (final Mailbox mailbox : mailboxes) {
204 if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) {
205 return ((TimelyMailboxProxy) mailbox).preprocessor.toString();
206 }
207 }
208 }
209 return null;
210 }
211 };
212 }
213
214 };
215
216}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java
new file mode 100644
index 00000000..e8fbf84e
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java
@@ -0,0 +1,81 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
12import tools.refinery.viatra.runtime.matchers.util.Direction;
13import tools.refinery.viatra.runtime.matchers.util.Preconditions;
14import tools.refinery.viatra.runtime.rete.index.IndexerListener;
15import tools.refinery.viatra.runtime.rete.network.Node;
16import tools.refinery.viatra.runtime.rete.network.ProductionNode;
17import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
18
19/**
20 * A timely proxy for another {@link IndexerListener}, which performs some preprocessing
21 * on the differential timestamps before passing it on to the real recipient.
22 * <p>
23 * These proxies are used on edges leading into {@link ProductionNode}s. Because {@link ProductionNode}s
24 * never ask back the indexer for its contents, there is no need to also apply the proxy on that direction.
25 *
26 * @author Tamas Szabo
27 * @since 2.3
28 */
29public class TimelyIndexerListenerProxy implements IndexerListener {
30
31 protected final TimestampTransformation preprocessor;
32 protected final IndexerListener wrapped;
33
34 public TimelyIndexerListenerProxy(final IndexerListener wrapped,
35 final TimestampTransformation preprocessor) {
36 Preconditions.checkArgument(!(wrapped instanceof TimelyIndexerListenerProxy), "Proxy in a proxy is not allowed!");
37 this.wrapped = wrapped;
38 this.preprocessor = preprocessor;
39 }
40
41 public IndexerListener getWrappedIndexerListener() {
42 return wrapped;
43 }
44
45 @Override
46 public Node getOwner() {
47 return this.wrapped.getOwner();
48 }
49
50 @Override
51 public void notifyIndexerUpdate(final Direction direction, final Tuple updateElement, final Tuple signature,
52 final boolean change, final Timestamp timestamp) {
53 this.wrapped.notifyIndexerUpdate(direction, updateElement, signature, change, preprocessor.process(timestamp));
54 }
55
56 @Override
57 public String toString() {
58 return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString();
59 }
60
61 @Override
62 public boolean equals(final Object obj) {
63 if (obj == null || obj.getClass() != this.getClass()) {
64 return false;
65 } else if (obj == this) {
66 return true;
67 } else {
68 final TimelyIndexerListenerProxy that = (TimelyIndexerListenerProxy) obj;
69 return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor;
70 }
71 }
72
73 @Override
74 public int hashCode() {
75 int hash = 1;
76 hash = hash * 17 + this.wrapped.hashCode();
77 hash = hash * 31 + this.preprocessor.hashCode();
78 return hash;
79 }
80
81}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java
new file mode 100644
index 00000000..550bfbeb
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java
@@ -0,0 +1,102 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
12import tools.refinery.viatra.runtime.matchers.util.Direction;
13import tools.refinery.viatra.runtime.matchers.util.Preconditions;
14import tools.refinery.viatra.runtime.rete.network.Receiver;
15import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
16import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
17import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
18import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
19
20/**
21 * A timely proxy for another {@link Mailbox}, which performs some preprocessing
22 * on the differential timestamps before passing it on to the real recipient.
23 *
24 * @author Tamas Szabo
25 * @since 2.3
26 */
27public class TimelyMailboxProxy implements Mailbox {
28
29 protected final TimestampTransformation preprocessor;
30 protected final Mailbox wrapped;
31
32 public TimelyMailboxProxy(final Mailbox wrapped, final TimestampTransformation preprocessor) {
33 Preconditions.checkArgument(!(wrapped instanceof TimelyMailboxProxy), "Proxy in a proxy is not allowed!");
34 this.wrapped = wrapped;
35 this.preprocessor = preprocessor;
36 }
37
38 public Mailbox getWrappedMailbox() {
39 return wrapped;
40 }
41
42 @Override
43 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
44 this.wrapped.postMessage(direction, update, preprocessor.process(timestamp));
45 }
46
47 @Override
48 public String toString() {
49 return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString();
50 }
51
52 @Override
53 public void clear() {
54 this.wrapped.clear();
55 }
56
57 @Override
58 public void deliverAll(final MessageSelector selector) {
59 this.wrapped.deliverAll(selector);
60 }
61
62 @Override
63 public CommunicationGroup getCurrentGroup() {
64 return this.wrapped.getCurrentGroup();
65 }
66
67 @Override
68 public void setCurrentGroup(final CommunicationGroup group) {
69 this.wrapped.setCurrentGroup(group);
70 }
71
72 @Override
73 public Receiver getReceiver() {
74 return this.wrapped.getReceiver();
75 }
76
77 @Override
78 public boolean isEmpty() {
79 return this.wrapped.isEmpty();
80 }
81
82 @Override
83 public boolean equals(final Object obj) {
84 if (obj == null || obj.getClass() != this.getClass()) {
85 return false;
86 } else if (obj == this) {
87 return true;
88 } else {
89 final TimelyMailboxProxy that = (TimelyMailboxProxy) obj;
90 return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor;
91 }
92 }
93
94 @Override
95 public int hashCode() {
96 int hash = 1;
97 hash = hash * 17 + this.wrapped.hashCode();
98 hash = hash * 31 + this.preprocessor.hashCode();
99 return hash;
100 }
101
102}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java
new file mode 100644
index 00000000..8929eb5c
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java
@@ -0,0 +1,48 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import tools.refinery.viatra.runtime.rete.network.Node;
12import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
13
14/**
15 * Values of this enum perform different kind of preprocessing on {@link Timestamp}s.
16 * This is used on edges leading in and out from {@link Node}s in recursive {@link TimelyCommunicationGroup}s.
17 *
18 * @author Tamas Szabo
19 * @since 2.3
20 */
21public enum TimestampTransformation {
22
23 INCREMENT {
24 @Override
25 public Timestamp process(final Timestamp timestamp) {
26 return new Timestamp(timestamp.getValue() + 1);
27 }
28
29 @Override
30 public String toString() {
31 return "INCREMENT";
32 }
33 },
34 RESET {
35 @Override
36 public Timestamp process(final Timestamp timestamp) {
37 return Timestamp.ZERO;
38 }
39
40 @Override
41 public String toString() {
42 return "RESET";
43 }
44 };
45
46 public abstract Timestamp process(final Timestamp timestamp);
47
48}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java
new file mode 100644
index 00000000..d6312671
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java
@@ -0,0 +1,81 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.delayed;
10
11import java.util.Collection;
12import java.util.Map;
13import java.util.Map.Entry;
14
15import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
16import tools.refinery.viatra.runtime.matchers.util.Direction;
17import tools.refinery.viatra.runtime.matchers.util.Signed;
18import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
19import tools.refinery.viatra.runtime.rete.network.Network;
20import tools.refinery.viatra.runtime.rete.network.Node;
21import tools.refinery.viatra.runtime.rete.network.Receiver;
22import tools.refinery.viatra.runtime.rete.network.ReteContainer;
23import tools.refinery.viatra.runtime.rete.network.Supplier;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
25import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
26import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
27
28/**
29 * Instances of this class are responsible for initializing a {@link Receiver} with the contents of a {@link Supplier}.
30 * However, due to the dynamic nature of the Rete {@link Network} and to the fact that certain {@link Node}s in the
31 * {@link Network} are sensitive to the shape of the {@link Network}, the commands must be delayed until the
32 * construction of the {@link Network} has stabilized.
33 *
34 * @author Tamas Szabo
35 * @since 2.3
36 */
37public abstract class DelayedCommand implements Runnable {
38
39 protected final Supplier supplier;
40 protected final Receiver receiver;
41 protected final Direction direction;
42 protected final ReteContainer container;
43
44 public DelayedCommand(final Supplier supplier, final Receiver receiver, final Direction direction,
45 final ReteContainer container) {
46 this.supplier = supplier;
47 this.receiver = receiver;
48 this.direction = direction;
49 this.container = container;
50 }
51
52 @Override
53 public void run() {
54 final CommunicationTracker tracker = this.container.getCommunicationTracker();
55 final Mailbox mailbox = tracker.proxifyMailbox(this.supplier, this.receiver.getMailbox());
56
57 if (this.isTimestampAware()) {
58 final Map<Tuple, Timeline<Timestamp>> contents = this.container.pullContentsWithTimeline(this.supplier,
59 false);
60 for (final Entry<Tuple, Timeline<Timestamp>> entry : contents.entrySet()) {
61 for (final Signed<Timestamp> change : entry.getValue().asChangeSequence()) {
62 mailbox.postMessage(change.getDirection().multiply(this.direction), entry.getKey(),
63 change.getPayload());
64 }
65 }
66 } else {
67 final Collection<Tuple> contents = this.container.pullContents(this.supplier, false);
68 for (final Tuple tuple : contents) {
69 mailbox.postMessage(this.direction, tuple, Timestamp.ZERO);
70 }
71 }
72 }
73
74 @Override
75 public String toString() {
76 return this.supplier + " -> " + this.receiver.toString();
77 }
78
79 protected abstract boolean isTimestampAware();
80
81}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java
new file mode 100644
index 00000000..1bfdbec6
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java
@@ -0,0 +1,27 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.delayed;
10
11import tools.refinery.viatra.runtime.matchers.util.Direction;
12import tools.refinery.viatra.runtime.rete.network.Receiver;
13import tools.refinery.viatra.runtime.rete.network.ReteContainer;
14import tools.refinery.viatra.runtime.rete.network.Supplier;
15
16public class DelayedConnectCommand extends DelayedCommand {
17
18 public DelayedConnectCommand(final Supplier supplier, final Receiver receiver, final ReteContainer container) {
19 super(supplier, receiver, Direction.INSERT, container);
20 }
21
22 @Override
23 protected boolean isTimestampAware() {
24 return this.container.isTimelyEvaluation() && this.container.getCommunicationTracker().areInSameGroup(this.supplier, this.receiver);
25 }
26
27}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java
new file mode 100644
index 00000000..5825a971
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java
@@ -0,0 +1,30 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.delayed;
10
11import tools.refinery.viatra.runtime.matchers.util.Direction;
12import tools.refinery.viatra.runtime.rete.network.Receiver;
13import tools.refinery.viatra.runtime.rete.network.ReteContainer;
14import tools.refinery.viatra.runtime.rete.network.Supplier;
15
16public class DelayedDisconnectCommand extends DelayedCommand {
17
18 protected final boolean wasInSameSCC;
19
20 public DelayedDisconnectCommand(final Supplier supplier, final Receiver receiver, final ReteContainer container, final boolean wasInSameSCC) {
21 super(supplier, receiver, Direction.DELETE, container);
22 this.wasInSameSCC = wasInSameSCC;
23 }
24
25 @Override
26 protected boolean isTimestampAware() {
27 return this.wasInSameSCC;
28 }
29
30}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java
new file mode 100644
index 00000000..da9bc47e
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java
@@ -0,0 +1,74 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.indexer;
10
11import java.util.Collections;
12import java.util.Map;
13
14import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
15import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
16
17/**
18 * @author Tamas Szabo
19 * @since 2.0
20 */
21public class DefaultMessageIndexer implements MessageIndexer {
22
23 protected final Map<Tuple, Integer> indexer;
24
25 public DefaultMessageIndexer() {
26 this.indexer = CollectionsFactory.createMap();
27 }
28
29 public Map<Tuple, Integer> getTuples() {
30 return Collections.unmodifiableMap(this.indexer);
31 }
32
33 @Override
34 public int getCount(final Tuple update) {
35 final Integer count = getTuples().get(update);
36 if (count == null) {
37 return 0;
38 } else {
39 return count;
40 }
41 }
42
43 @Override
44 public void insert(final Tuple update) {
45 update(update, 1);
46 }
47
48 @Override
49 public void delete(final Tuple update) {
50 update(update, -1);
51 }
52
53 @Override
54 public void update(final Tuple update, final int delta) {
55 final Integer oldCount = this.indexer.get(update);
56 final int newCount = (oldCount == null ? 0 : oldCount) + delta;
57 if (newCount == 0) {
58 this.indexer.remove(update);
59 } else {
60 this.indexer.put(update, newCount);
61 }
62 }
63
64 @Override
65 public boolean isEmpty() {
66 return this.indexer.isEmpty();
67 }
68
69 @Override
70 public void clear() {
71 this.indexer.clear();
72 }
73
74}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java
new file mode 100644
index 00000000..80271252
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java
@@ -0,0 +1,95 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.indexer;
10
11import java.util.Collections;
12import java.util.Map;
13import java.util.Set;
14
15import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
16import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
17import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
18
19/**
20 * @author Tamas Szabo
21 * @since 2.0
22 */
23public class GroupBasedMessageIndexer implements MessageIndexer {
24
25 protected final Map<Tuple, DefaultMessageIndexer> indexer;
26 protected final TupleMask groupMask;
27
28 public GroupBasedMessageIndexer(final TupleMask groupMask) {
29 this.indexer = CollectionsFactory.createMap();
30 this.groupMask = groupMask;
31 }
32
33 public Map<Tuple, Integer> getTuplesByGroup(final Tuple group) {
34 final DefaultMessageIndexer values = this.indexer.get(group);
35 if (values == null) {
36 return Collections.emptyMap();
37 } else {
38 return Collections.unmodifiableMap(values.getTuples());
39 }
40 }
41
42 @Override
43 public int getCount(final Tuple update) {
44 final Tuple group = this.groupMask.transform(update);
45 final Integer count = getTuplesByGroup(group).get(update);
46 if (count == null) {
47 return 0;
48 } else {
49 return count;
50 }
51 }
52
53 public Set<Tuple> getGroups() {
54 return Collections.unmodifiableSet(this.indexer.keySet());
55 }
56
57 @Override
58 public void insert(final Tuple update) {
59 update(update, 1);
60 }
61
62 @Override
63 public void delete(final Tuple update) {
64 update(update, -1);
65 }
66
67 @Override
68 public void update(final Tuple update, final int delta) {
69 final Tuple group = this.groupMask.transform(update);
70 DefaultMessageIndexer valueIndexer = this.indexer.get(group);
71
72 if (valueIndexer == null) {
73 valueIndexer = new DefaultMessageIndexer();
74 this.indexer.put(group, valueIndexer);
75 }
76
77 valueIndexer.update(update, delta);
78
79 // it may happen that the indexer becomes empty as a result of the update
80 if (valueIndexer.isEmpty()) {
81 this.indexer.remove(group);
82 }
83 }
84
85 @Override
86 public boolean isEmpty() {
87 return this.indexer.isEmpty();
88 }
89
90 @Override
91 public void clear() {
92 this.indexer.clear();
93 }
94
95}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java
new file mode 100644
index 00000000..271aaa44
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java
@@ -0,0 +1,33 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.indexer;
10
11import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
12import tools.refinery.viatra.runtime.matchers.util.Clearable;
13import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
14
15/**
16 * A message indexer is used by {@link Mailbox}es to index their contents.
17 *
18 * @author Tamas Szabo
19 * @since 2.0
20 */
21public interface MessageIndexer extends Clearable {
22
23 public void insert(final Tuple update);
24
25 public void delete(final Tuple update);
26
27 public void update(final Tuple update, final int delta);
28
29 public boolean isEmpty();
30
31 public int getCount(final Tuple update);
32
33}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java
new file mode 100644
index 00000000..99097f56
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java
@@ -0,0 +1,32 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox;
10
11import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
12import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy;
13import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
14
15/**
16 * An adaptable mailbox can be wrapped by another mailbox to act in behalf of that. The significance of the adaptation
17 * is that the adaptee will notify the {@link CommunicationTracker} about updates by promoting the adapter itself.
18 * Adaptable mailboxes are used by the {@link BehaviorChangingMailbox}.
19 *
20 * Compare this with {@link TimelyMailboxProxy}. That one also wraps another mailbox in order to
21 * perform preprocessing on the messages sent to the original recipient.
22 *
23 * @author Tamas Szabo
24 * @since 2.0
25 */
26public interface AdaptableMailbox extends Mailbox {
27
28 public Mailbox getAdapter();
29
30 public void setAdapter(final Mailbox adapter);
31
32}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java
new file mode 100644
index 00000000..8797e254
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java
@@ -0,0 +1,30 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox;
10
11import tools.refinery.viatra.runtime.rete.network.Receiver;
12import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
13
14/**
15 * A fall through capable mailbox can directly call the update method of its {@link Receiver} instead of using the
16 * standard post-deliver mailbox semantics. If the fall through flag is set to true, the mailbox uses direct delivery,
17 * otherwise it operates in the original behavior. The fall through operation is preferable whenever applicable because
18 * it improves performance. The fall through flag is controlled by the {@link CommunicationTracker} based on the
19 * receiver node type and network topology.
20 *
21 * @author Tamas Szabo
22 * @since 2.2
23 */
24public interface FallThroughCapableMailbox extends Mailbox {
25
26 public boolean isFallThrough();
27
28 public void setFallThrough(final boolean fallThrough);
29
30}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java
new file mode 100644
index 00000000..05005974
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java
@@ -0,0 +1,78 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox;
10
11import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
12import tools.refinery.viatra.runtime.matchers.util.Clearable;
13import tools.refinery.viatra.runtime.matchers.util.Direction;
14import tools.refinery.viatra.runtime.rete.network.IGroupable;
15import tools.refinery.viatra.runtime.rete.network.Receiver;
16import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
17import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
18import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
19
20/**
21 * A mailbox is associated with every {@link Receiver}. Messages can be sent to a {@link Receiver} by posting them into
22 * the mailbox. Different mailbox implementations may differ in the way how they deliver the posted messages.
23 *
24 * @author Tamas Szabo
25 * @since 2.0
26 *
27 */
28public interface Mailbox extends Clearable, IGroupable {
29
30 /**
31 * Posts a new message to this mailbox.
32 *
33 * @param direction
34 * the direction of the update
35 * @param update
36 * the update element
37 * @since 2.4
38 */
39 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp);
40
41 /**
42 * Delivers all messages according to the given selector from this mailbox. The selector can also be null. In this case, no
43 * special separation is expected between the messages.
44 *
45 * @param selector the message selector
46 */
47 public void deliverAll(final MessageSelector selector);
48
49 /**
50 * Returns the {@link Receiver} of this mailbox.
51 *
52 * @return the receiver
53 */
54 public Receiver getReceiver();
55
56 /**
57 * Returns the {@link CommunicationGroup} of the receiver of this mailbox.
58 *
59 * @return the communication group
60 */
61 public CommunicationGroup getCurrentGroup();
62
63 /**
64 * Sets the {@link CommunicationGroup} that the receiver of this mailbox is associated with.
65 *
66 * @param group
67 * the communication group
68 */
69 public void setCurrentGroup(final CommunicationGroup group);
70
71 /**
72 * Returns true if this mailbox is empty.
73 *
74 * @return
75 */
76 public boolean isEmpty();
77
78}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java
new file mode 100644
index 00000000..2c5255fb
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java
@@ -0,0 +1,23 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox;
10
11import tools.refinery.viatra.runtime.rete.network.indexer.MessageIndexer;
12
13/**
14 * A factory used to create message indexers for {@link Mailbox}es.
15 *
16 * @author Tamas Szabo
17 * @since 2.0
18 */
19public interface MessageIndexerFactory<I extends MessageIndexer> {
20
21 public I create();
22
23}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java
new file mode 100644
index 00000000..1e1ada71
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java
@@ -0,0 +1,109 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import tools.refinery.viatra.runtime.rete.network.Receiver;
12import tools.refinery.viatra.runtime.rete.network.ReteContainer;
13import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
14import tools.refinery.viatra.runtime.rete.network.indexer.MessageIndexer;
15import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
16import tools.refinery.viatra.runtime.rete.network.mailbox.MessageIndexerFactory;
17
18/**
19 * An abstract mailbox implementation that is capable of splitting update messages based on some form of monotonicity
20 * (anti-monotone and monotone). The monotonicity is either defined by the less or equal operator of a poset or, it can
21 * be the standard subset ordering among sets of tuples.
22 *
23 * @author Tamas Szabo
24 * @since 2.0
25 *
26 */
27public abstract class AbstractUpdateSplittingMailbox<IndexerType extends MessageIndexer, ReceiverType extends Receiver> implements Mailbox {
28
29 protected IndexerType monotoneQueue;
30 protected IndexerType antiMonotoneQueue;
31 protected IndexerType monotoneBuffer;
32 protected IndexerType antiMonotoneBuffer;
33 protected boolean deliveringMonotone;
34 protected boolean deliveringAntiMonotone;
35 protected final ReceiverType receiver;
36 protected final ReteContainer container;
37 protected CommunicationGroup group;
38
39 public AbstractUpdateSplittingMailbox(final ReceiverType receiver, final ReteContainer container,
40 final MessageIndexerFactory<IndexerType> factory) {
41 this.receiver = receiver;
42 this.container = container;
43 this.monotoneQueue = factory.create();
44 this.antiMonotoneQueue = factory.create();
45 this.monotoneBuffer = factory.create();
46 this.antiMonotoneBuffer = factory.create();
47 this.deliveringMonotone = false;
48 this.deliveringAntiMonotone = false;
49 }
50
51 protected void swapAndClearMonotone() {
52 final IndexerType tmp = this.monotoneQueue;
53 this.monotoneQueue = this.monotoneBuffer;
54 this.monotoneBuffer = tmp;
55 this.monotoneBuffer.clear();
56 }
57
58 protected void swapAndClearAntiMonotone() {
59 final IndexerType tmp = this.antiMonotoneQueue;
60 this.antiMonotoneQueue = this.antiMonotoneBuffer;
61 this.antiMonotoneBuffer = tmp;
62 this.antiMonotoneBuffer.clear();
63 }
64
65 protected IndexerType getActiveMonotoneQueue() {
66 if (this.deliveringMonotone) {
67 return this.monotoneBuffer;
68 } else {
69 return this.monotoneQueue;
70 }
71 }
72
73 protected IndexerType getActiveAntiMonotoneQueue() {
74 if (this.deliveringAntiMonotone) {
75 return this.antiMonotoneBuffer;
76 } else {
77 return this.antiMonotoneQueue;
78 }
79 }
80
81 @Override
82 public ReceiverType getReceiver() {
83 return this.receiver;
84 }
85
86 @Override
87 public void clear() {
88 this.monotoneQueue.clear();
89 this.antiMonotoneQueue.clear();
90 this.monotoneBuffer.clear();
91 this.antiMonotoneBuffer.clear();
92 }
93
94 @Override
95 public boolean isEmpty() {
96 return this.getActiveMonotoneQueue().isEmpty() && this.getActiveAntiMonotoneQueue().isEmpty();
97 }
98
99 @Override
100 public CommunicationGroup getCurrentGroup() {
101 return this.group;
102 }
103
104 @Override
105 public void setCurrentGroup(final CommunicationGroup group) {
106 this.group = group;
107 }
108
109}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java
new file mode 100644
index 00000000..fe822d7c
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java
@@ -0,0 +1,117 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
12import tools.refinery.viatra.runtime.matchers.util.Direction;
13import tools.refinery.viatra.runtime.rete.network.Node;
14import tools.refinery.viatra.runtime.rete.network.Receiver;
15import tools.refinery.viatra.runtime.rete.network.ReteContainer;
16import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
17import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
18import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
19import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
20import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker;
21import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox;
22import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox;
23
24/**
25 * This mailbox changes its behavior based on the position of its {@link Receiver} in the network topology.
26 * It either behaves as a {@link DefaultMailbox} or as an {@link UpdateSplittingMailbox}. The decision is made by the
27 * {@link CommunicationTracker}, see {@link TimelessCommunicationTracker#postProcessNode(Node)} for more details.
28 *
29 * @author Tamas Szabo
30 */
31public class BehaviorChangingMailbox implements FallThroughCapableMailbox {
32
33 protected boolean fallThrough;
34 protected boolean split;
35 protected AdaptableMailbox wrapped;
36 protected final Receiver receiver;
37 protected final ReteContainer container;
38 protected CommunicationGroup group;
39
40 public BehaviorChangingMailbox(final Receiver receiver, final ReteContainer container) {
41 this.fallThrough = false;
42 this.split = false;
43 this.receiver = receiver;
44 this.container = container;
45 this.wrapped = new DefaultMailbox(receiver, container);
46 this.wrapped.setAdapter(this);
47 }
48
49 @Override
50 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
51 if (this.fallThrough && !this.container.isExecutingDelayedCommands()) {
52 // disable fall through while we are in the middle of executing delayed construction commands
53 this.receiver.update(direction, update, timestamp);
54 } else {
55 this.wrapped.postMessage(direction, update, timestamp);
56 }
57 }
58
59 @Override
60 public void deliverAll(final MessageSelector kind) {
61 this.wrapped.deliverAll(kind);
62 }
63
64 @Override
65 public String toString() {
66 return "A_MBOX -> " + this.wrapped;
67 }
68
69 public void setSplitFlag(final boolean splitValue) {
70 if (this.split != splitValue) {
71 assert isEmpty();
72 if (splitValue) {
73 this.wrapped = new UpdateSplittingMailbox(this.receiver, this.container);
74 } else {
75 this.wrapped = new DefaultMailbox(this.receiver, this.container);
76 }
77 this.wrapped.setAdapter(this);
78 this.split = splitValue;
79 }
80 }
81
82 @Override
83 public boolean isEmpty() {
84 return this.wrapped.isEmpty();
85 }
86
87 @Override
88 public void clear() {
89 this.wrapped.clear();
90 }
91
92 @Override
93 public Receiver getReceiver() {
94 return this.receiver;
95 }
96
97 @Override
98 public CommunicationGroup getCurrentGroup() {
99 return this.group;
100 }
101
102 @Override
103 public void setCurrentGroup(final CommunicationGroup group) {
104 this.group = group;
105 }
106
107 @Override
108 public boolean isFallThrough() {
109 return this.fallThrough;
110 }
111
112 @Override
113 public void setFallThrough(final boolean fallThrough) {
114 this.fallThrough = fallThrough;
115 }
116
117} \ No newline at end of file
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java
new file mode 100644
index 00000000..baf7270f
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java
@@ -0,0 +1,163 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import java.util.Map;
12
13import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
14import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
15import tools.refinery.viatra.runtime.matchers.util.Direction;
16import tools.refinery.viatra.runtime.rete.network.Receiver;
17import tools.refinery.viatra.runtime.rete.network.ReteContainer;
18import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
19import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
20import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
21import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
22import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox;
23import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
24
25/**
26 * Default mailbox implementation.
27 * <p>
28 * Usually, the mailbox performs counting of messages so that they can cancel each other out. However, if marked as a
29 * fall-through mailbox, than update messages are delivered directly to the receiver node to reduce overhead.
30 *
31 * @author Tamas Szabo
32 * @since 2.0
33 */
34public class DefaultMailbox implements AdaptableMailbox {
35
36 private static int SIZE_TRESHOLD = 127;
37
38 protected Map<Tuple, Integer> queue;
39 protected Map<Tuple, Integer> buffer;
40 protected final Receiver receiver;
41 protected final ReteContainer container;
42 protected boolean delivering;
43 protected Mailbox adapter;
44 protected CommunicationGroup group;
45
46 public DefaultMailbox(final Receiver receiver, final ReteContainer container) {
47 this.receiver = receiver;
48 this.container = container;
49 this.queue = CollectionsFactory.createMap();
50 this.buffer = CollectionsFactory.createMap();
51 this.adapter = this;
52 }
53
54 protected Map<Tuple, Integer> getActiveQueue() {
55 if (this.delivering) {
56 return this.buffer;
57 } else {
58 return this.queue;
59 }
60 }
61
62 @Override
63 public Mailbox getAdapter() {
64 return this.adapter;
65 }
66
67 @Override
68 public void setAdapter(final Mailbox adapter) {
69 this.adapter = adapter;
70 }
71
72 @Override
73 public boolean isEmpty() {
74 return getActiveQueue().isEmpty();
75 }
76
77 @Override
78 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
79 final Map<Tuple, Integer> activeQueue = getActiveQueue();
80 final boolean wasEmpty = activeQueue.isEmpty();
81
82 boolean significantChange = false;
83 Integer count = activeQueue.get(update);
84 if (count == null) {
85 count = 0;
86 significantChange = true;
87 }
88
89 if (direction == Direction.DELETE) {
90 count--;
91 } else {
92 count++;
93 }
94
95 if (count == 0) {
96 activeQueue.remove(update);
97 significantChange = true;
98 } else {
99 activeQueue.put(update, count);
100 }
101
102 if (significantChange) {
103 final Mailbox targetMailbox = this.adapter;
104 final CommunicationGroup targetGroup = this.adapter.getCurrentGroup();
105
106 if (wasEmpty) {
107 targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.DEFAULT);
108 } else if (activeQueue.isEmpty()) {
109 targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.DEFAULT);
110 }
111 }
112 }
113
114 @Override
115 public void deliverAll(final MessageSelector kind) {
116 if (kind == PhasedSelector.DEFAULT) {
117 // use the buffer during delivering so that there is a clear
118 // separation between the stages
119 this.delivering = true;
120 this.receiver.batchUpdate(this.queue.entrySet(), Timestamp.ZERO);
121 this.delivering = false;
122
123 if (queue.size() > SIZE_TRESHOLD) {
124 this.queue = this.buffer;
125 this.buffer = CollectionsFactory.createMap();
126 } else {
127 this.queue.clear();
128 final Map<Tuple, Integer> tmpQueue = this.queue;
129 this.queue = this.buffer;
130 this.buffer = tmpQueue;
131 }
132 } else {
133 throw new IllegalArgumentException("Unsupported message kind " + kind);
134 }
135 }
136
137 @Override
138 public String toString() {
139 return "D_MBOX (" + this.receiver + ") " + this.getActiveQueue();
140 }
141
142 @Override
143 public Receiver getReceiver() {
144 return this.receiver;
145 }
146
147 @Override
148 public void clear() {
149 this.queue.clear();
150 this.buffer.clear();
151 }
152
153 @Override
154 public CommunicationGroup getCurrentGroup() {
155 return this.group;
156 }
157
158 @Override
159 public void setCurrentGroup(final CommunicationGroup group) {
160 this.group = group;
161 }
162
163}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java
new file mode 100644
index 00000000..50d19882
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java
@@ -0,0 +1,218 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import java.util.HashSet;
12import java.util.Map.Entry;
13import java.util.Set;
14
15import tools.refinery.viatra.runtime.matchers.context.IPosetComparator;
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
18import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
19import tools.refinery.viatra.runtime.matchers.util.Direction;
20import tools.refinery.viatra.runtime.rete.network.PosetAwareReceiver;
21import tools.refinery.viatra.runtime.rete.network.ReteContainer;
22import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
23import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
24import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
25import tools.refinery.viatra.runtime.rete.network.indexer.GroupBasedMessageIndexer;
26
27/**
28 * A monotonicity aware mailbox implementation. The mailbox uses an {@link IPosetComparator} to identify if a pair of
29 * REVOKE - INSERT updates represent a monotone change pair. The mailbox is used by {@link PosetAwareReceiver}s.
30 *
31 * @author Tamas Szabo
32 * @since 2.0
33 */
34public class PosetAwareMailbox extends AbstractUpdateSplittingMailbox<GroupBasedMessageIndexer, PosetAwareReceiver> {
35
36 protected final TupleMask groupMask;
37
38 public PosetAwareMailbox(final PosetAwareReceiver receiver, final ReteContainer container) {
39 super(receiver, container, () -> new GroupBasedMessageIndexer(receiver.getCoreMask()));
40 this.groupMask = receiver.getCoreMask();
41 }
42
43 @Override
44 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
45 final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue();
46 final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue();
47 final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0;
48 final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0;
49 final TupleMask coreMask = this.receiver.getCoreMask();
50
51 // it cannot happen that it was present in both
52 assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone);
53
54 if (direction == Direction.INSERT) {
55 if (wasPresentAsAntiMonotone) {
56 // it was an anti-monotone one before
57 antiMonotoneQueue.insert(update);
58 } else {
59 // it was a monotone one before or did not exist at all
60 monotoneQueue.insert(update);
61
62 // if it was not present in the monotone queue before, then
63 // we need to check whether it makes REVOKE updates monotone
64 if (!wasPresentAsMonotone) {
65 final Set<Tuple> counterParts = tryFindCounterPart(update, false, true);
66 for (final Tuple counterPart : counterParts) {
67 final int count = antiMonotoneQueue.getCount(counterPart);
68 assert count < 0;
69 antiMonotoneQueue.update(counterPart, -count);
70 monotoneQueue.update(counterPart, count);
71 }
72 }
73 }
74 } else {
75 if (wasPresentAsAntiMonotone) {
76 // it was an anti-monotone one before
77 antiMonotoneQueue.delete(update);
78 } else if (wasPresentAsMonotone) {
79 // it was a monotone one before
80 monotoneQueue.delete(update);
81
82 // and we need to check whether the monotone REVOKE updates
83 // still have a reinforcing counterpart
84 final Set<Tuple> candidates = new HashSet<Tuple>();
85 final Tuple key = coreMask.transform(update);
86 for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(key).entrySet()) {
87 if (entry.getValue() < 0) {
88 final Tuple candidate = entry.getKey();
89 final Set<Tuple> counterParts = tryFindCounterPart(candidate, true, false);
90 if (counterParts.isEmpty()) {
91 // all of them are gone
92 candidates.add(candidate);
93 }
94 }
95 }
96
97 // move the candidates from the monotone queue to the
98 // anti-monotone queue because they do not have a
99 // counterpart anymore
100 for (final Tuple candidate : candidates) {
101 final int count = monotoneQueue.getCount(candidate);
102 assert count < 0;
103 monotoneQueue.update(candidate, -count);
104 antiMonotoneQueue.update(candidate, count);
105 }
106 } else {
107 // it did not exist before
108 final Set<Tuple> counterParts = tryFindCounterPart(update, true, false);
109 if (counterParts.isEmpty()) {
110 // there is no tuple that would make this update monotone
111 antiMonotoneQueue.delete(update);
112 } else {
113 // there is a reinforcing counterpart
114 monotoneQueue.delete(update);
115 }
116 }
117 }
118
119 if (antiMonotoneQueue.isEmpty()) {
120 this.group.notifyLostAllMessages(this, PhasedSelector.ANTI_MONOTONE);
121 } else {
122 this.group.notifyHasMessage(this, PhasedSelector.ANTI_MONOTONE);
123 }
124
125 if (monotoneQueue.isEmpty()) {
126 this.group.notifyLostAllMessages(this, PhasedSelector.MONOTONE);
127 } else {
128 this.group.notifyHasMessage(this, PhasedSelector.MONOTONE);
129 }
130 }
131
132 protected Set<Tuple> tryFindCounterPart(final Tuple first, final boolean findPositiveCounterPart,
133 final boolean findAllCounterParts) {
134 final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue();
135 final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue();
136 final TupleMask coreMask = this.receiver.getCoreMask();
137 final TupleMask posetMask = this.receiver.getPosetMask();
138 final IPosetComparator posetComparator = this.receiver.getPosetComparator();
139 final Set<Tuple> result = CollectionsFactory.createSet();
140 final Tuple firstKey = coreMask.transform(first);
141 final Tuple firstValue = posetMask.transform(first);
142
143 if (findPositiveCounterPart) {
144 for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(firstKey).entrySet()) {
145 final Tuple secondValue = posetMask.transform(entry.getKey());
146 if (entry.getValue() > 0 && posetComparator.isLessOrEqual(firstValue, secondValue)) {
147 result.add(entry.getKey());
148 if (!findAllCounterParts) {
149 return result;
150 }
151 }
152 }
153 } else {
154 for (final Entry<Tuple, Integer> entry : antiMonotoneQueue.getTuplesByGroup(firstKey).entrySet()) {
155 final Tuple secondValue = posetMask.transform(entry.getKey());
156 if (posetComparator.isLessOrEqual(secondValue, firstValue)) {
157 result.add(entry.getKey());
158 if (!findAllCounterParts) {
159 return result;
160 }
161 }
162 }
163 }
164
165 return result;
166 }
167
168 @Override
169 public void deliverAll(final MessageSelector kind) {
170 if (kind == PhasedSelector.ANTI_MONOTONE) {
171 // use the buffer during delivering so that there is a clear
172 // separation between the stages
173 this.deliveringAntiMonotone = true;
174
175 for (final Tuple group : this.antiMonotoneQueue.getGroups()) {
176 for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuplesByGroup(group).entrySet()) {
177 final Tuple update = entry.getKey();
178 final int count = entry.getValue();
179 assert count < 0;
180 for (int i = 0; i < Math.abs(count); i++) {
181 this.receiver.updateWithPosetInfo(Direction.DELETE, update, false);
182 }
183 }
184 }
185
186 this.deliveringAntiMonotone = false;
187 swapAndClearAntiMonotone();
188 } else if (kind == PhasedSelector.MONOTONE) {
189 // use the buffer during delivering so that there is a clear
190 // separation between the stages
191 this.deliveringMonotone = true;
192
193 for (final Tuple group : this.monotoneQueue.getGroups()) {
194 for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuplesByGroup(group).entrySet()) {
195 final Tuple update = entry.getKey();
196 final int count = entry.getValue();
197 assert count != 0;
198 final Direction direction = count < 0 ? Direction.DELETE : Direction.INSERT;
199 for (int i = 0; i < Math.abs(count); i++) {
200 this.receiver.updateWithPosetInfo(direction, update, true);
201 }
202 }
203 }
204
205 this.deliveringMonotone = false;
206 swapAndClearMonotone();
207 } else {
208 throw new IllegalArgumentException("Unsupported message kind " + kind);
209 }
210 }
211
212 @Override
213 public String toString() {
214 return "PA_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " "
215 + this.getActiveAntiMonotoneQueue();
216 }
217
218}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java
new file mode 100644
index 00000000..afa155b2
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java
@@ -0,0 +1,135 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import java.util.Map.Entry;
12
13import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
14import tools.refinery.viatra.runtime.matchers.util.Direction;
15import tools.refinery.viatra.runtime.rete.network.Receiver;
16import tools.refinery.viatra.runtime.rete.network.ReteContainer;
17import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
18import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
19import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
20import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
21import tools.refinery.viatra.runtime.rete.network.indexer.DefaultMessageIndexer;
22import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox;
23import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
24
25/**
26 * A mailbox implementation that splits updates messages according to the standard subset ordering into anti-monotonic
27 * (deletions) and monotonic (insertions) updates.
28 *
29 * @author Tamas Szabo
30 * @since 2.0
31 */
32public class UpdateSplittingMailbox extends AbstractUpdateSplittingMailbox<DefaultMessageIndexer, Receiver>
33 implements AdaptableMailbox {
34
35 protected Mailbox adapter;
36
37 public UpdateSplittingMailbox(final Receiver receiver, final ReteContainer container) {
38 super(receiver, container, DefaultMessageIndexer::new);
39 this.adapter = this;
40 }
41
42 @Override
43 public Mailbox getAdapter() {
44 return this.adapter;
45 }
46
47 @Override
48 public void setAdapter(final Mailbox adapter) {
49 this.adapter = adapter;
50 }
51
52 @Override
53 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
54 final DefaultMessageIndexer monotoneQueue = getActiveMonotoneQueue();
55 final DefaultMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue();
56 final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0;
57 final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0;
58
59 // it cannot happen that it was present in both
60 assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone);
61
62 if (direction == Direction.INSERT) {
63 if (wasPresentAsAntiMonotone) {
64 // it was an anti-monotone one before
65 antiMonotoneQueue.insert(update);
66 } else {
67 // it was a monotone one before or did not exist at all
68 monotoneQueue.insert(update);
69 }
70 } else {
71 if (wasPresentAsMonotone) {
72 // it was a monotone one before
73 monotoneQueue.delete(update);
74 } else {
75 // it was an anti-monotone one before or did not exist at all
76 antiMonotoneQueue.delete(update);
77 }
78 }
79
80 final Mailbox targetMailbox = this.adapter;
81 final CommunicationGroup targetGroup = this.adapter.getCurrentGroup();
82
83 if (antiMonotoneQueue.isEmpty()) {
84 targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.ANTI_MONOTONE);
85 } else {
86 targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.ANTI_MONOTONE);
87 }
88
89 if (monotoneQueue.isEmpty()) {
90 targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.MONOTONE);
91 } else {
92 targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.MONOTONE);
93 }
94 }
95
96 @Override
97 public void deliverAll(final MessageSelector kind) {
98 if (kind == PhasedSelector.ANTI_MONOTONE) {
99 // deliver anti-monotone
100 this.deliveringAntiMonotone = true;
101 for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuples().entrySet()) {
102 final Tuple update = entry.getKey();
103 final int count = entry.getValue();
104 assert count < 0;
105 for (int i = 0; i < Math.abs(count); i++) {
106 this.receiver.update(Direction.DELETE, update, Timestamp.ZERO);
107 }
108 }
109 this.deliveringAntiMonotone = false;
110 swapAndClearAntiMonotone();
111 } else if (kind == PhasedSelector.MONOTONE) {
112 // deliver monotone
113 this.deliveringMonotone = true;
114 for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuples().entrySet()) {
115 final Tuple update = entry.getKey();
116 final int count = entry.getValue();
117 assert count > 0;
118 for (int i = 0; i < count; i++) {
119 this.receiver.update(Direction.INSERT, update, Timestamp.ZERO);
120 }
121 }
122 this.deliveringMonotone = false;
123 swapAndClearMonotone();
124 } else {
125 throw new IllegalArgumentException("Unsupported message kind " + kind);
126 }
127 }
128
129 @Override
130 public String toString() {
131 return "US_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " "
132 + this.getActiveAntiMonotoneQueue();
133 }
134
135}
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java
new file mode 100644
index 00000000..bf3b8e14
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java
@@ -0,0 +1,150 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timely;
10
11import java.util.Map;
12import java.util.TreeMap;
13
14import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
15import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
16import tools.refinery.viatra.runtime.matchers.util.Direction;
17import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
18import tools.refinery.viatra.runtime.rete.network.Receiver;
19import tools.refinery.viatra.runtime.rete.network.ReteContainer;
20import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
21import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
22import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
23import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
24import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
25
26public class TimelyMailbox implements Mailbox {
27
28 protected TreeMap<Timestamp, Map<Tuple, Integer>> queue;
29 protected final Receiver receiver;
30 protected final ReteContainer container;
31 protected CommunicationGroup group;
32 protected boolean fallThrough;
33
34 public TimelyMailbox(final Receiver receiver, final ReteContainer container) {
35 this.receiver = receiver;
36 this.container = container;
37 this.queue = CollectionsFactory.createTreeMap();
38 }
39
40 protected TreeMap<Timestamp, Map<Tuple, Integer>> getActiveQueue() {
41 return this.queue;
42 }
43
44 @Override
45 public boolean isEmpty() {
46 return getActiveQueue().isEmpty();
47 }
48
49 @Override
50 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
51 final TreeMap<Timestamp, Map<Tuple, Integer>> activeQueue = getActiveQueue();
52
53 Map<Tuple, Integer> tupleMap = activeQueue.get(timestamp);
54 final boolean wasEmpty = tupleMap == null;
55 boolean significantChange = false;
56
57 if (tupleMap == null) {
58 tupleMap = CollectionsFactory.createMap();
59 activeQueue.put(timestamp, tupleMap);
60 significantChange = true;
61 }
62
63 Integer count = tupleMap.get(update);
64 if (count == null) {
65 count = 0;
66 significantChange = true;
67 }
68
69 if (direction == Direction.DELETE) {
70 count--;
71 } else {
72 count++;
73 }
74
75 if (count == 0) {
76 tupleMap.remove(update);
77 if (tupleMap.isEmpty()) {
78 activeQueue.remove(timestamp);
79 }
80 significantChange = true;
81 } else {
82 tupleMap.put(update, count);
83 }
84
85 if (significantChange) {
86 if (wasEmpty) {
87 this.group.notifyHasMessage(this, timestamp);
88 } else if (tupleMap.isEmpty()) {
89 final Timestamp resumableTimestamp = (this.receiver instanceof ResumableNode)
90 ? ((ResumableNode) this.receiver).getResumableTimestamp()
91 : null;
92 // check if there is folding left to do before unsubscribing just based on the message queue being empty
93 if (resumableTimestamp == null || resumableTimestamp.compareTo(timestamp) != 0) {
94 this.group.notifyLostAllMessages(this, timestamp);
95 }
96 }
97 }
98 }
99
100 @Override
101 public void deliverAll(final MessageSelector selector) {
102 if (selector instanceof Timestamp) {
103 final Timestamp timestamp = (Timestamp) selector;
104 // REMOVE the tuples associated with the selector, dont just query them
105 final Map<Tuple, Integer> tupleMap = this.queue.remove(timestamp);
106
107 // tupleMap may be empty if we only have lazy folding to do
108 if (tupleMap != null) {
109 this.receiver.batchUpdate(tupleMap.entrySet(), timestamp);
110 }
111
112 if (this.container.getTimelyConfiguration()
113 .getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) {
114 // (1) either normal delivery, which ended up being a lazy folding state
115 // (2) and/or lazy folding needs to be resumed
116 if (this.receiver instanceof ResumableNode) {
117 ((ResumableNode) this.receiver).resumeAt(timestamp);
118 }
119 }
120 } else {
121 throw new IllegalArgumentException("Unsupported message selector " + selector);
122 }
123 }
124
125 @Override
126 public String toString() {
127 return "DDF_MBOX (" + this.receiver + ") " + this.getActiveQueue();
128 }
129
130 @Override
131 public Receiver getReceiver() {
132 return this.receiver;
133 }
134
135 @Override
136 public void clear() {
137 this.queue.clear();
138 }
139
140 @Override
141 public CommunicationGroup getCurrentGroup() {
142 return this.group;
143 }
144
145 @Override
146 public void setCurrentGroup(final CommunicationGroup group) {
147 this.group = group;
148 }
149
150}