diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network')
49 files changed, 5943 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java new file mode 100644 index 00000000..2469d6bd --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/BaseNode.java | |||
@@ -0,0 +1,108 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2012, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network; | ||
10 | |||
11 | import java.util.Collections; | ||
12 | import java.util.HashSet; | ||
13 | import java.util.Set; | ||
14 | import java.util.TreeSet; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.rete.traceability.PatternTraceInfo; | ||
17 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
18 | |||
19 | /** | ||
20 | * Base implementation for a Rete node. | ||
21 | * | ||
22 | * @author Bergmann Gabor | ||
23 | * | ||
24 | */ | ||
25 | public abstract class BaseNode implements Node { | ||
26 | |||
27 | protected ReteContainer reteContainer; | ||
28 | protected long nodeId; | ||
29 | protected Object tag; | ||
30 | protected Set<TraceInfo> traceInfos; | ||
31 | |||
32 | /** | ||
33 | * @param reteContainer | ||
34 | * the container to create this node in | ||
35 | */ | ||
36 | public BaseNode(ReteContainer reteContainer) { | ||
37 | super(); | ||
38 | this.reteContainer = reteContainer; | ||
39 | this.nodeId = reteContainer.registerNode(this); | ||
40 | this.traceInfos = new HashSet<TraceInfo>(); | ||
41 | } | ||
42 | |||
43 | @Override | ||
44 | public String toString() { | ||
45 | if (tag != null) | ||
46 | return toStringCore() + "->" + getTraceInfoPatternsEnumerated() + "{" + tag.toString() + "}"; | ||
47 | else | ||
48 | return toStringCore() + "->" + getTraceInfoPatternsEnumerated(); | ||
49 | } | ||
50 | |||
51 | /** | ||
52 | * clients should override this to append before the tag / trace indicators | ||
53 | */ | ||
54 | protected String toStringCore() { | ||
55 | return "[" + nodeId + "]" + getClass().getSimpleName(); | ||
56 | } | ||
57 | |||
58 | @Override | ||
59 | public ReteContainer getContainer() { | ||
60 | return reteContainer; | ||
61 | } | ||
62 | |||
63 | @Override | ||
64 | public long getNodeId() { | ||
65 | return nodeId; | ||
66 | } | ||
67 | |||
68 | @Override | ||
69 | public Object getTag() { | ||
70 | return tag; | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public void setTag(Object tag) { | ||
75 | this.tag = tag; | ||
76 | } | ||
77 | |||
78 | @Override | ||
79 | public Set<TraceInfo> getTraceInfos() { | ||
80 | return Collections.unmodifiableSet(traceInfos); | ||
81 | } | ||
82 | |||
83 | @Override | ||
84 | public void assignTraceInfo(TraceInfo traceInfo) { | ||
85 | traceInfos.add(traceInfo); | ||
86 | traceInfo.assignNode(this); | ||
87 | } | ||
88 | |||
89 | @Override | ||
90 | public void acceptPropagatedTraceInfo(TraceInfo traceInfo) { | ||
91 | assignTraceInfo(traceInfo); | ||
92 | } | ||
93 | |||
94 | /** | ||
95 | * Descendants should use this in e.g. logging | ||
96 | */ | ||
97 | protected String getTraceInfoPatternsEnumerated() { | ||
98 | TreeSet<String> patternNames = new TreeSet<String>(); | ||
99 | for (TraceInfo trInfo : traceInfos) { | ||
100 | if (trInfo instanceof PatternTraceInfo) { | ||
101 | final String pName = ((PatternTraceInfo) trInfo).getPatternName(); | ||
102 | patternNames.add(pName); | ||
103 | } | ||
104 | } | ||
105 | return patternNames.toString(); | ||
106 | } | ||
107 | |||
108 | } \ No newline at end of file | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java new file mode 100644 index 00000000..b261d19d --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java | |||
@@ -0,0 +1,171 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
13 | import tools.refinery.viatra.runtime.rete.aggregation.IndexerBasedAggregatorNode; | ||
14 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
15 | import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode; | ||
16 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
17 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
18 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
19 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
20 | import tools.refinery.viatra.runtime.rete.recipes.*; | ||
21 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
22 | import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo; | ||
23 | |||
24 | import java.util.ArrayList; | ||
25 | import java.util.Collection; | ||
26 | import java.util.List; | ||
27 | |||
28 | /** | ||
29 | * Class responsible for connecting freshly instantiating Rete nodes to their parents. | ||
30 | * | ||
31 | * @author Bergmann Gabor | ||
32 | * | ||
33 | */ | ||
34 | class ConnectionFactory { | ||
35 | ReteContainer reteContainer; | ||
36 | |||
37 | public ConnectionFactory(ReteContainer reteContainer) { | ||
38 | super(); | ||
39 | this.reteContainer = reteContainer; | ||
40 | } | ||
41 | |||
42 | // TODO move to node implementation instead? | ||
43 | private boolean isStateful(ReteNodeRecipe recipe) { | ||
44 | return recipe instanceof ProjectionIndexerRecipe || recipe instanceof IndexerBasedAggregatorRecipe | ||
45 | || recipe instanceof SingleColumnAggregatorRecipe || recipe instanceof ExpressionEnforcerRecipe | ||
46 | || recipe instanceof TransitiveClosureRecipe || recipe instanceof ProductionRecipe | ||
47 | || recipe instanceof UniquenessEnforcerRecipe || recipe instanceof RelationEvaluationRecipe; | ||
48 | |||
49 | } | ||
50 | |||
51 | /** | ||
52 | * PRE: nodes for parent recipes must already be created and registered | ||
53 | * <p> | ||
54 | * PRE: must not be an input node (for which {@link InputConnector} is responsible) | ||
55 | */ | ||
56 | public void connectToParents(RecipeTraceInfo recipeTrace, Node freshNode) { | ||
57 | final ReteNodeRecipe recipe = recipeTrace.getRecipe(); | ||
58 | if (recipe instanceof ConstantRecipe) { | ||
59 | // NO-OP | ||
60 | } else if (recipe instanceof InputRecipe) { | ||
61 | throw new IllegalArgumentException( | ||
62 | ConnectionFactory.class.getSimpleName() + " not intended for input connection: " + recipe); | ||
63 | } else if (recipe instanceof SingleParentNodeRecipe) { | ||
64 | final Receiver receiver = (Receiver) freshNode; | ||
65 | ReteNodeRecipe parentRecipe = ((SingleParentNodeRecipe) recipe).getParent(); | ||
66 | connectToParent(recipe, receiver, parentRecipe); | ||
67 | } else if (recipe instanceof RelationEvaluationRecipe) { | ||
68 | List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents(); | ||
69 | List<Supplier> parentSuppliers = new ArrayList<Supplier>(); | ||
70 | for (final ReteNodeRecipe parentRecipe : parentRecipes) { | ||
71 | parentSuppliers.add(getSupplierForRecipe(parentRecipe)); | ||
72 | } | ||
73 | ((RelationEvaluatorNode) freshNode).connectToParents(parentSuppliers); | ||
74 | } else if (recipe instanceof BetaRecipe) { | ||
75 | final DualInputNode beta = (DualInputNode) freshNode; | ||
76 | final ArrayList<RecipeTraceInfo> parentTraces = new ArrayList<RecipeTraceInfo>( | ||
77 | recipeTrace.getParentRecipeTraces()); | ||
78 | Slots slots = avoidActiveNodeConflict(parentTraces.get(0), parentTraces.get(1)); | ||
79 | beta.connectToIndexers(slots.primary, slots.secondary); | ||
80 | } else if (recipe instanceof IndexerBasedAggregatorRecipe) { | ||
81 | final IndexerBasedAggregatorNode aggregator = (IndexerBasedAggregatorNode) freshNode; | ||
82 | final IndexerBasedAggregatorRecipe aggregatorRecipe = (IndexerBasedAggregatorRecipe) recipe; | ||
83 | aggregator.initializeWith((ProjectionIndexer) resolveIndexer(aggregatorRecipe.getParent())); | ||
84 | } else if (recipe instanceof MultiParentNodeRecipe) { | ||
85 | final Receiver receiver = (Receiver) freshNode; | ||
86 | List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents(); | ||
87 | for (ReteNodeRecipe parentRecipe : parentRecipes) { | ||
88 | connectToParent(recipe, receiver, parentRecipe); | ||
89 | } | ||
90 | } | ||
91 | } | ||
92 | |||
93 | private Indexer resolveIndexer(final IndexerRecipe indexerRecipe) { | ||
94 | final Address<? extends Node> address = reteContainer.getNetwork().getExistingNodeByRecipe(indexerRecipe); | ||
95 | return (Indexer) reteContainer.resolveLocal(address); | ||
96 | } | ||
97 | |||
98 | private void connectToParent(ReteNodeRecipe recipe, Receiver freshNode, ReteNodeRecipe parentRecipe) { | ||
99 | final Supplier parentSupplier = getSupplierForRecipe(parentRecipe); | ||
100 | |||
101 | // special synch | ||
102 | if (freshNode instanceof ReinitializedNode) { | ||
103 | Collection<Tuple> tuples = new ArrayList<Tuple>(); | ||
104 | parentSupplier.pullInto(tuples, true); | ||
105 | ((ReinitializedNode) freshNode).reinitializeWith(tuples); | ||
106 | reteContainer.connect(parentSupplier, freshNode); | ||
107 | } else { // default case | ||
108 | // stateless nodes do not have to be synced with contents UNLESS they already have children (recursive | ||
109 | // corner case) | ||
110 | if (isStateful(recipe) | ||
111 | || ((freshNode instanceof Supplier) && !((Supplier) freshNode).getReceivers().isEmpty())) { | ||
112 | reteContainer.connectAndSynchronize(parentSupplier, freshNode); | ||
113 | } else { | ||
114 | // stateless node, no synch | ||
115 | reteContainer.connect(parentSupplier, freshNode); | ||
116 | } | ||
117 | } | ||
118 | } | ||
119 | |||
120 | private Supplier getSupplierForRecipe(ReteNodeRecipe recipe) { | ||
121 | @SuppressWarnings("unchecked") | ||
122 | final Address<? extends Supplier> parentAddress = (Address<? extends Supplier>) reteContainer.getNetwork() | ||
123 | .getExistingNodeByRecipe(recipe); | ||
124 | final Supplier supplier = reteContainer.getProvisioner().asSupplier(parentAddress); | ||
125 | return supplier; | ||
126 | } | ||
127 | |||
128 | /** | ||
129 | * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of | ||
130 | * the two indexers is supported. | ||
131 | * | ||
132 | * @return a replacement for the secondary Indexers, if needed | ||
133 | */ | ||
134 | private Slots avoidActiveNodeConflict(final RecipeTraceInfo primarySlot, final RecipeTraceInfo secondarySlot) { | ||
135 | Slots result = new Slots() { | ||
136 | { | ||
137 | primary = (IterableIndexer) resolveIndexer((ProjectionIndexerRecipe) primarySlot.getRecipe()); | ||
138 | secondary = resolveIndexer((IndexerRecipe) secondarySlot.getRecipe()); | ||
139 | } | ||
140 | }; | ||
141 | if (activeNodeConflict(result.primary, result.secondary)) | ||
142 | if (result.secondary instanceof IterableIndexer) | ||
143 | result.secondary = resolveActiveIndexer(secondarySlot); | ||
144 | else | ||
145 | result.primary = (IterableIndexer) resolveActiveIndexer(primarySlot); | ||
146 | return result; | ||
147 | } | ||
148 | |||
149 | private Indexer resolveActiveIndexer(final RecipeTraceInfo inactiveIndexerTrace) { | ||
150 | final RecipeTraceInfo activeIndexerTrace = reteContainer.getProvisioner() | ||
151 | .accessActiveIndexer(inactiveIndexerTrace); | ||
152 | reteContainer.getProvisioner().getOrCreateNodeByRecipe(activeIndexerTrace); | ||
153 | return resolveIndexer((ProjectionIndexerRecipe) activeIndexerTrace.getRecipe()); | ||
154 | } | ||
155 | |||
156 | private static class Slots { | ||
157 | IterableIndexer primary; | ||
158 | Indexer secondary; | ||
159 | } | ||
160 | |||
161 | /** | ||
162 | * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of | ||
163 | * the two indexers is supported. | ||
164 | * | ||
165 | * @return true if there is a conflict of active nodes. | ||
166 | */ | ||
167 | private boolean activeNodeConflict(Indexer primarySlot, Indexer secondarySlot) { | ||
168 | return !primarySlot.equals(secondarySlot) && primarySlot.getActiveNode().equals(secondarySlot.getActiveNode()); | ||
169 | } | ||
170 | |||
171 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java new file mode 100644 index 00000000..c22b06d8 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/IGroupable.java | |||
@@ -0,0 +1,31 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Gabor Bergmann, IncQueryLabs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
12 | |||
13 | /** | ||
14 | * @author Gabor Bergmann | ||
15 | * @since 1.7 | ||
16 | */ | ||
17 | public interface IGroupable { | ||
18 | |||
19 | /** | ||
20 | * @return the current group of the mailbox | ||
21 | * @since 1.7 | ||
22 | */ | ||
23 | CommunicationGroup getCurrentGroup(); | ||
24 | |||
25 | /** | ||
26 | * Sets the current group of the mailbox | ||
27 | * @since 1.7 | ||
28 | */ | ||
29 | void setCurrentGroup(CommunicationGroup group); | ||
30 | |||
31 | } \ No newline at end of file | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java new file mode 100644 index 00000000..64f59ff3 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java | |||
@@ -0,0 +1,408 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.ArrayList; | ||
13 | import java.util.Collection; | ||
14 | import java.util.Collections; | ||
15 | import java.util.List; | ||
16 | import java.util.Map; | ||
17 | import java.util.Map.Entry; | ||
18 | import java.util.Set; | ||
19 | import java.util.concurrent.locks.Lock; | ||
20 | import java.util.concurrent.locks.ReadWriteLock; | ||
21 | import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
22 | |||
23 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
25 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
26 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
27 | import tools.refinery.viatra.runtime.rete.matcher.ReteEngine; | ||
28 | import tools.refinery.viatra.runtime.rete.recipes.ReteNodeRecipe; | ||
29 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
30 | import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo; | ||
31 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
32 | |||
33 | /** | ||
34 | * @author Gabor Bergmann | ||
35 | * | ||
36 | */ | ||
37 | public class Network { | ||
38 | final int threads; | ||
39 | |||
40 | protected ArrayList<ReteContainer> containers; | ||
41 | ReteContainer headContainer; | ||
42 | private int firstContainer = 0; | ||
43 | private int nextContainer = 0; | ||
44 | |||
45 | // the following fields exist only if threads > 0 | ||
46 | protected Map<ReteContainer, Long> globalTerminationCriteria = null; | ||
47 | protected Map<ReteContainer, Long> reportedClocks = null; | ||
48 | protected Lock updateLock = null; // grab during normal update operations | ||
49 | protected Lock structuralChangeLock = null; // grab if the network structure | ||
50 | // is to | ||
51 | // be changed | ||
52 | |||
53 | // Knowledge of the outside world | ||
54 | private ReteEngine engine; | ||
55 | protected NodeFactory nodeFactory; | ||
56 | protected InputConnector inputConnector; | ||
57 | |||
58 | // Node and recipe administration | ||
59 | // incl. addresses for existing nodes by recipe (where available) | ||
60 | // Maintained by NodeProvisioner of each container | ||
61 | Map<ReteNodeRecipe, Address<? extends Node>> nodesByRecipe = CollectionsFactory.createMap(); | ||
62 | Set<RecipeTraceInfo> recipeTraces = CollectionsFactory.createSet(); | ||
63 | |||
64 | /** | ||
65 | * @throws IllegalStateException | ||
66 | * if no node has been constructed for the recipe | ||
67 | */ | ||
68 | public synchronized Address<? extends Node> getExistingNodeByRecipe(ReteNodeRecipe recipe) { | ||
69 | final Address<? extends Node> node = nodesByRecipe.get(recipe); | ||
70 | if (node == null) | ||
71 | throw new IllegalStateException(String.format("Rete node for recipe %s not constructed yet.", recipe)); | ||
72 | return node; | ||
73 | } | ||
74 | |||
75 | /** | ||
76 | * @return null if no node has been constructed for the recipe | ||
77 | */ | ||
78 | public synchronized Address<? extends Node> getNodeByRecipeIfExists(ReteNodeRecipe recipe) { | ||
79 | final Address<? extends Node> node = nodesByRecipe.get(recipe); | ||
80 | return node; | ||
81 | } | ||
82 | |||
83 | /** | ||
84 | * @param threads | ||
85 | * the number of threads to operate the network with; 0 means single-threaded operation, 1 starts an | ||
86 | * asynchronous thread to operate the RETE net, >1 uses multiple RETE containers. | ||
87 | */ | ||
88 | public Network(int threads, ReteEngine engine) { | ||
89 | super(); | ||
90 | this.threads = threads; | ||
91 | this.engine = engine; | ||
92 | this.inputConnector = new InputConnector(this); | ||
93 | this.nodeFactory = new NodeFactory(engine.getLogger()); | ||
94 | |||
95 | containers = new ArrayList<ReteContainer>(); | ||
96 | firstContainer = (threads > 1) ? Options.firstFreeContainer : 0; // NOPMD | ||
97 | nextContainer = firstContainer; | ||
98 | |||
99 | if (threads > 0) { | ||
100 | globalTerminationCriteria = CollectionsFactory.createMap(); | ||
101 | reportedClocks = CollectionsFactory.createMap(); | ||
102 | ReadWriteLock rwl = new ReentrantReadWriteLock(); | ||
103 | updateLock = rwl.readLock(); | ||
104 | structuralChangeLock = rwl.writeLock(); | ||
105 | for (int i = 0; i < threads; ++i) | ||
106 | containers.add(new ReteContainer(this, true)); | ||
107 | } else | ||
108 | containers.add(new ReteContainer(this, false)); | ||
109 | |||
110 | headContainer = containers.get(0); | ||
111 | } | ||
112 | |||
113 | /** | ||
114 | * Kills this Network along with all containers and message consumption cycles. | ||
115 | */ | ||
116 | public void kill() { | ||
117 | for (ReteContainer container : containers) { | ||
118 | container.kill(); | ||
119 | } | ||
120 | containers.clear(); | ||
121 | } | ||
122 | |||
123 | /** | ||
124 | * Returns the head container, that is guaranteed to reside in the same JVM as the Network object. | ||
125 | */ | ||
126 | public ReteContainer getHeadContainer() { | ||
127 | return headContainer; | ||
128 | } | ||
129 | |||
130 | /** | ||
131 | * Returns the next container in round-robin fashion. Configurable not to yield head container. | ||
132 | */ | ||
133 | public ReteContainer getNextContainer() { | ||
134 | if (nextContainer >= containers.size()) | ||
135 | nextContainer = firstContainer; | ||
136 | return containers.get(nextContainer++); | ||
137 | } | ||
138 | |||
139 | /** | ||
140 | * Internal message delivery method. | ||
141 | * | ||
142 | * @pre threads > 0 | ||
143 | */ | ||
144 | private void sendUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
145 | ReteContainer affectedContainer = receiver.getContainer(); | ||
146 | synchronized (globalTerminationCriteria) { | ||
147 | long newCriterion = affectedContainer.sendUpdateToLocalAddress(receiver, direction, updateElement); | ||
148 | terminationCriterion(affectedContainer, newCriterion); | ||
149 | } | ||
150 | } | ||
151 | |||
152 | /** | ||
153 | * Internal message delivery method for single-threaded operation | ||
154 | * | ||
155 | * @pre threads == 0 | ||
156 | */ | ||
157 | private void sendUpdateSingleThreaded(Address<? extends Receiver> receiver, Direction direction, | ||
158 | Tuple updateElement) { | ||
159 | ReteContainer affectedContainer = receiver.getContainer(); | ||
160 | affectedContainer.sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement); | ||
161 | } | ||
162 | |||
163 | /** | ||
164 | * Internal message delivery method. | ||
165 | * | ||
166 | * @pre threads > 0 | ||
167 | */ | ||
168 | private void sendUpdates(Address<? extends Receiver> receiver, Direction direction, | ||
169 | Collection<Tuple> updateElements) { | ||
170 | if (updateElements.isEmpty()) | ||
171 | return; | ||
172 | ReteContainer affectedContainer = receiver.getContainer(); | ||
173 | synchronized (globalTerminationCriteria) { | ||
174 | long newCriterion = affectedContainer.sendUpdatesToLocalAddress(receiver, direction, updateElements); | ||
175 | terminationCriterion(affectedContainer, newCriterion); | ||
176 | } | ||
177 | } | ||
178 | |||
179 | /** | ||
180 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may | ||
181 | * reside in any of the containers associated with this network. To be called from a user thread during normal | ||
182 | * operation, NOT during construction. | ||
183 | * | ||
184 | * @since 2.4 | ||
185 | */ | ||
186 | public void sendExternalUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
187 | if (threads > 0) { | ||
188 | try { | ||
189 | updateLock.lock(); | ||
190 | sendUpdate(receiver, direction, updateElement); | ||
191 | } finally { | ||
192 | updateLock.unlock(); | ||
193 | } | ||
194 | } else { | ||
195 | sendUpdateSingleThreaded(receiver, direction, updateElement); | ||
196 | // getHeadContainer(). | ||
197 | } | ||
198 | } | ||
199 | |||
200 | /** | ||
201 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may | ||
202 | * reside in any of the containers associated with this network. To be called from a user thread during | ||
203 | * construction. | ||
204 | * | ||
205 | * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may | ||
206 | * span through network calls, that's why it's not enforced here ) | ||
207 | * | ||
208 | * @return the value of the target container's clock at the time when the message was accepted into its message | ||
209 | * queue | ||
210 | * @since 2.4 | ||
211 | */ | ||
212 | public void sendConstructionUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
213 | // structuralChangeLock.lock(); | ||
214 | if (threads > 0) | ||
215 | sendUpdate(receiver, direction, updateElement); | ||
216 | else | ||
217 | receiver.getContainer().sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement); | ||
218 | // structuralChangeLock.unlock(); | ||
219 | } | ||
220 | |||
221 | /** | ||
222 | * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial | ||
223 | * matching. The node may reside in any of the containers associated with this network. To be called from a user | ||
224 | * thread during construction. | ||
225 | * | ||
226 | * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may | ||
227 | * span through network calls, that's why it's not enforced here ) | ||
228 | * | ||
229 | * @since 2.4 | ||
230 | */ | ||
231 | public void sendConstructionUpdates(Address<? extends Receiver> receiver, Direction direction, | ||
232 | Collection<Tuple> updateElements) { | ||
233 | // structuralChangeLock.lock(); | ||
234 | if (threads > 0) | ||
235 | sendUpdates(receiver, direction, updateElements); | ||
236 | else | ||
237 | receiver.getContainer().sendUpdatesToLocalAddressSingleThreaded(receiver, direction, updateElements); | ||
238 | // structuralChangeLock.unlock(); | ||
239 | } | ||
240 | |||
241 | /** | ||
242 | * Establishes connection between a supplier and a receiver node, regardless which container they are in. Not to be | ||
243 | * called remotely, because this method enforces the structural lock. | ||
244 | * | ||
245 | * @param supplier | ||
246 | * @param receiver | ||
247 | * @param synchronise | ||
248 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
249 | */ | ||
250 | public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
251 | boolean synchronise) { | ||
252 | try { | ||
253 | if (threads > 0) | ||
254 | structuralChangeLock.lock(); | ||
255 | receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise); | ||
256 | } finally { | ||
257 | if (threads > 0) | ||
258 | structuralChangeLock.unlock(); | ||
259 | } | ||
260 | } | ||
261 | |||
262 | /** | ||
263 | * Severs connection between a supplier and a receiver node, regardless which container they are in. Not to be | ||
264 | * called remotely, because this method enforces the structural lock. | ||
265 | * | ||
266 | * @param supplier | ||
267 | * @param receiver | ||
268 | * @param desynchronise | ||
269 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
270 | */ | ||
271 | public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
272 | boolean desynchronise) { | ||
273 | try { | ||
274 | if (threads > 0) | ||
275 | structuralChangeLock.lock(); | ||
276 | receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise); | ||
277 | } finally { | ||
278 | if (threads > 0) | ||
279 | structuralChangeLock.unlock(); | ||
280 | } | ||
281 | } | ||
282 | |||
283 | /** | ||
284 | * Containers use this method to report whenever they run out of messages in their queue. | ||
285 | * | ||
286 | * To be called from the thread of the reporting container. | ||
287 | * | ||
288 | * @pre threads > 0. | ||
289 | * @param reportingContainer | ||
290 | * the container reporting the emptiness of its message queue. | ||
291 | * @param clock | ||
292 | * the value of the container's clock when reporting. | ||
293 | * @param localTerminationCriteria | ||
294 | * the latest clock values this container has received from other containers since the last time it | ||
295 | * reported termination. | ||
296 | */ | ||
297 | void reportLocalUpdateTermination(ReteContainer reportingContainer, long clock, | ||
298 | Map<ReteContainer, Long> localTerminationCriteria) { | ||
299 | synchronized (globalTerminationCriteria) { | ||
300 | for (Entry<ReteContainer, Long> criterion : localTerminationCriteria.entrySet()) { | ||
301 | terminationCriterion(criterion.getKey(), criterion.getValue()); | ||
302 | } | ||
303 | |||
304 | reportedClocks.put(reportingContainer, clock); | ||
305 | Long criterion = globalTerminationCriteria.get(reportingContainer); | ||
306 | if (criterion != null && criterion < clock) | ||
307 | globalTerminationCriteria.remove(reportingContainer); | ||
308 | |||
309 | if (globalTerminationCriteria.isEmpty()) | ||
310 | globalTerminationCriteria.notifyAll(); | ||
311 | } | ||
312 | } | ||
313 | |||
314 | /** | ||
315 | * @pre threads > 0 | ||
316 | */ | ||
317 | private void terminationCriterion(ReteContainer affectedContainer, long newCriterion) { | ||
318 | synchronized (globalTerminationCriteria) { | ||
319 | Long oldCriterion = globalTerminationCriteria.get(affectedContainer); | ||
320 | Long oldClock = reportedClocks.get(affectedContainer); | ||
321 | long relevantClock = oldClock == null ? 0 : oldClock; | ||
322 | if ((relevantClock <= newCriterion) && (oldCriterion == null || oldCriterion < newCriterion)) { | ||
323 | globalTerminationCriteria.put(affectedContainer, newCriterion); | ||
324 | } | ||
325 | } | ||
326 | } | ||
327 | |||
328 | /** | ||
329 | * Waits until all rete update operations are settled in all containers. Returns immediately, if no updates are | ||
330 | * pending. | ||
331 | * | ||
332 | * To be called from any user thread. | ||
333 | */ | ||
334 | public void waitForReteTermination() { | ||
335 | if (threads > 0) { | ||
336 | synchronized (globalTerminationCriteria) { | ||
337 | while (!globalTerminationCriteria.isEmpty()) { | ||
338 | try { | ||
339 | globalTerminationCriteria.wait(); | ||
340 | } catch (InterruptedException e) { | ||
341 | |||
342 | } | ||
343 | } | ||
344 | } | ||
345 | } else | ||
346 | headContainer.deliverMessagesSingleThreaded(); | ||
347 | } | ||
348 | |||
349 | /** | ||
350 | * Waits to execute action until all rete update operations are settled in all containers. Runs action and returns | ||
351 | * immediately, if no updates are pending. The given action is guaranteed to be run when the terminated state still | ||
352 | * persists. | ||
353 | * | ||
354 | * @param action | ||
355 | * the action to be run when reaching the steady-state. | ||
356 | * | ||
357 | * To be called from any user thread. | ||
358 | */ | ||
359 | public void waitForReteTermination(Runnable action) { | ||
360 | if (threads > 0) { | ||
361 | synchronized (globalTerminationCriteria) { | ||
362 | while (!globalTerminationCriteria.isEmpty()) { | ||
363 | try { | ||
364 | globalTerminationCriteria.wait(); | ||
365 | } catch (InterruptedException e) { | ||
366 | |||
367 | } | ||
368 | } | ||
369 | action.run(); | ||
370 | } | ||
371 | } else { | ||
372 | headContainer.deliverMessagesSingleThreaded(); | ||
373 | action.run(); | ||
374 | } | ||
375 | |||
376 | } | ||
377 | |||
378 | /** | ||
379 | * @return an unmodifiable set of known recipe traces | ||
380 | */ | ||
381 | public Set<RecipeTraceInfo> getRecipeTraces() { | ||
382 | return Collections.unmodifiableSet(recipeTraces); | ||
383 | } | ||
384 | |||
385 | /** | ||
386 | * @return an unmodifiable list of containers | ||
387 | */ | ||
388 | public List<ReteContainer> getContainers() { | ||
389 | return Collections.unmodifiableList(containers); | ||
390 | } | ||
391 | |||
392 | public Lock getStructuralChangeLock() { | ||
393 | return structuralChangeLock; | ||
394 | } | ||
395 | |||
396 | public NodeFactory getNodeFactory() { | ||
397 | return nodeFactory; | ||
398 | } | ||
399 | |||
400 | public InputConnector getInputConnector() { | ||
401 | return inputConnector; | ||
402 | } | ||
403 | |||
404 | public ReteEngine getEngine() { | ||
405 | return engine; | ||
406 | } | ||
407 | |||
408 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java new file mode 100644 index 00000000..c6ba34c4 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NetworkStructureChangeSensitiveNode.java | |||
@@ -0,0 +1,30 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
12 | |||
13 | /** | ||
14 | * {@link Node}s implementing this interface are sensitive to changes in the dependency graph maintained by the | ||
15 | * {@link CommunicationTracker}. The {@link CommunicationTracker} notifies these nodes whenever the SCC of this node is | ||
16 | * affected by changes to the dependency graph. Depending on whether this node is contained in a recursive group or not, | ||
17 | * it may behave differently, and the {@link NetworkStructureChangeSensitiveNode#networkStructureChanged()} method can | ||
18 | * be used to perform changes in behavior. | ||
19 | * | ||
20 | * @author Tamas Szabo | ||
21 | * @since 2.3 | ||
22 | */ | ||
23 | public interface NetworkStructureChangeSensitiveNode extends Node { | ||
24 | |||
25 | /** | ||
26 | * At the time of the invocation, the dependency graph has already been updated. | ||
27 | */ | ||
28 | public void networkStructureChanged(); | ||
29 | |||
30 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java new file mode 100644 index 00000000..e8ab615a --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Node.java | |||
@@ -0,0 +1,62 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.Set; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
15 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
16 | |||
17 | /** | ||
18 | * A node of a rete network, should be uniquely identified by network and nodeId. NodeId can be requested by registering | ||
19 | * at the Network on construction. | ||
20 | * | ||
21 | * @author Gabor Bergmann | ||
22 | */ | ||
23 | public interface Node { | ||
24 | /** | ||
25 | * @return the network this node belongs to. | ||
26 | */ | ||
27 | ReteContainer getContainer(); | ||
28 | |||
29 | /** | ||
30 | * @return the identifier unique to this node within the network. | ||
31 | */ | ||
32 | long getNodeId(); | ||
33 | |||
34 | /** | ||
35 | * Assigns a descriptive tag to the node | ||
36 | */ | ||
37 | void setTag(Object tag); | ||
38 | |||
39 | /** | ||
40 | * @return the tag of the node | ||
41 | */ | ||
42 | Object getTag(); | ||
43 | |||
44 | /** | ||
45 | * @return unmodifiable view of the list of traceability infos assigned to this node | ||
46 | */ | ||
47 | Set<TraceInfo> getTraceInfos(); | ||
48 | |||
49 | /** | ||
50 | * assigns new traceability info to this node | ||
51 | */ | ||
52 | void assignTraceInfo(TraceInfo traceInfo); | ||
53 | /** | ||
54 | * accepts traceability info propagated to this node | ||
55 | */ | ||
56 | void acceptPropagatedTraceInfo(TraceInfo traceInfo); | ||
57 | |||
58 | default CommunicationTracker getCommunicationTracker() { | ||
59 | return getContainer().getCommunicationTracker(); | ||
60 | } | ||
61 | |||
62 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java new file mode 100644 index 00000000..3e4ea4e0 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeFactory.java | |||
@@ -0,0 +1,376 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import org.apache.log4j.Logger; | ||
13 | import org.eclipse.emf.common.util.EMap; | ||
14 | import tools.refinery.viatra.runtime.rete.itc.alg.representative.RepresentativeElectionAlgorithm; | ||
15 | import tools.refinery.viatra.runtime.rete.itc.alg.representative.StronglyConnectedComponentAlgorithm; | ||
16 | import tools.refinery.viatra.runtime.rete.itc.alg.representative.WeaklyConnectedComponentAlgorithm; | ||
17 | import tools.refinery.viatra.runtime.matchers.context.IPosetComparator; | ||
18 | import tools.refinery.viatra.runtime.matchers.psystem.IExpressionEvaluator; | ||
19 | import tools.refinery.viatra.runtime.matchers.psystem.IRelationEvaluator; | ||
20 | import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator; | ||
21 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
22 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
23 | import tools.refinery.viatra.runtime.rete.aggregation.ColumnAggregatorNode; | ||
24 | import tools.refinery.viatra.runtime.rete.aggregation.CountNode; | ||
25 | import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode; | ||
26 | import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulParallelTimelyColumnAggregatorNode; | ||
27 | import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulSequentialTimelyColumnAggregatorNode; | ||
28 | import tools.refinery.viatra.runtime.rete.aggregation.timely.FirstOnlyParallelTimelyColumnAggregatorNode; | ||
29 | import tools.refinery.viatra.runtime.rete.aggregation.timely.FirstOnlySequentialTimelyColumnAggregatorNode; | ||
30 | import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode; | ||
31 | import tools.refinery.viatra.runtime.rete.boundary.ExternalInputStatelessFilterNode; | ||
32 | import tools.refinery.viatra.runtime.rete.eval.EvaluatorCore; | ||
33 | import tools.refinery.viatra.runtime.rete.eval.MemorylessEvaluatorNode; | ||
34 | import tools.refinery.viatra.runtime.rete.eval.OutputCachingEvaluatorNode; | ||
35 | import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode; | ||
36 | import tools.refinery.viatra.runtime.rete.index.ExistenceNode; | ||
37 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
38 | import tools.refinery.viatra.runtime.rete.index.JoinNode; | ||
39 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
40 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.AggregatorArchitecture; | ||
41 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
42 | import tools.refinery.viatra.runtime.rete.misc.ConstantNode; | ||
43 | import tools.refinery.viatra.runtime.rete.recipes.*; | ||
44 | import tools.refinery.viatra.runtime.rete.single.*; | ||
45 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
46 | |||
47 | import java.util.HashMap; | ||
48 | import java.util.List; | ||
49 | import java.util.Map; | ||
50 | |||
51 | /** | ||
52 | * Factory for instantiating Rete nodes. The created nodes are not connected to the network yet. | ||
53 | * | ||
54 | * @author Bergmann Gabor | ||
55 | * | ||
56 | */ | ||
57 | class NodeFactory { | ||
58 | Logger logger; | ||
59 | |||
60 | public NodeFactory(Logger logger) { | ||
61 | super(); | ||
62 | this.logger = logger; | ||
63 | } | ||
64 | |||
65 | /** | ||
66 | * PRE: parent node must already be created | ||
67 | */ | ||
68 | public Indexer createIndexer(ReteContainer reteContainer, IndexerRecipe recipe, Supplier parentNode, | ||
69 | TraceInfo... traces) { | ||
70 | |||
71 | if (recipe instanceof ProjectionIndexerRecipe) { | ||
72 | return parentNode.constructIndex(toMask(recipe.getMask()), traces); | ||
73 | // already traced | ||
74 | } else if (recipe instanceof AggregatorIndexerRecipe) { | ||
75 | int indexOfAggregateResult = recipe.getParent().getArity(); | ||
76 | int resultPosition = recipe.getMask().getSourceIndices().lastIndexOf(indexOfAggregateResult); | ||
77 | |||
78 | IAggregatorNode aggregatorNode = (IAggregatorNode) parentNode; | ||
79 | final Indexer result = (resultPosition == -1) ? aggregatorNode.getAggregatorOuterIndexer() | ||
80 | : aggregatorNode.getAggregatorOuterIdentityIndexer(resultPosition); | ||
81 | |||
82 | for (TraceInfo traceInfo : traces) | ||
83 | result.assignTraceInfo(traceInfo); | ||
84 | return result; | ||
85 | } else | ||
86 | throw new IllegalArgumentException("Unkown Indexer recipe: " + recipe); | ||
87 | } | ||
88 | |||
89 | /** | ||
90 | * PRE: recipe is not an indexer recipe. | ||
91 | */ | ||
92 | public Supplier createNode(ReteContainer reteContainer, ReteNodeRecipe recipe, TraceInfo... traces) { | ||
93 | if (recipe instanceof IndexerRecipe) | ||
94 | throw new IllegalArgumentException("Indexers are not created by NodeFactory: " + recipe); | ||
95 | |||
96 | Supplier result = instantiateNodeDispatch(reteContainer, recipe); | ||
97 | for (TraceInfo traceInfo : traces) | ||
98 | result.assignTraceInfo(traceInfo); | ||
99 | return result; | ||
100 | } | ||
101 | |||
102 | private Supplier instantiateNodeDispatch(ReteContainer reteContainer, ReteNodeRecipe recipe) { | ||
103 | |||
104 | // Parentless | ||
105 | |||
106 | if (recipe instanceof ConstantRecipe) | ||
107 | return instantiateNode(reteContainer, (ConstantRecipe) recipe); | ||
108 | if (recipe instanceof InputRecipe) | ||
109 | return instantiateNode(reteContainer, (InputRecipe) recipe); | ||
110 | |||
111 | // SingleParentNodeRecipe | ||
112 | |||
113 | // if (recipe instanceof ProjectionIndexer) | ||
114 | // return instantiateNode((ProjectionIndexer)recipe); | ||
115 | if (recipe instanceof InputFilterRecipe) | ||
116 | return instantiateNode(reteContainer, (InputFilterRecipe) recipe); | ||
117 | if (recipe instanceof InequalityFilterRecipe) | ||
118 | return instantiateNode(reteContainer, (InequalityFilterRecipe) recipe); | ||
119 | if (recipe instanceof EqualityFilterRecipe) | ||
120 | return instantiateNode(reteContainer, (EqualityFilterRecipe) recipe); | ||
121 | if (recipe instanceof TransparentRecipe) | ||
122 | return instantiateNode(reteContainer, (TransparentRecipe) recipe); | ||
123 | if (recipe instanceof TrimmerRecipe) | ||
124 | return instantiateNode(reteContainer, (TrimmerRecipe) recipe); | ||
125 | if (recipe instanceof TransitiveClosureRecipe) | ||
126 | return instantiateNode(reteContainer, (TransitiveClosureRecipe) recipe); | ||
127 | if (recipe instanceof RepresentativeElectionRecipe) | ||
128 | return instantiateNode(reteContainer, (RepresentativeElectionRecipe) recipe); | ||
129 | if (recipe instanceof RelationEvaluationRecipe) | ||
130 | return instantiateNode(reteContainer, (RelationEvaluationRecipe) recipe); | ||
131 | if (recipe instanceof ExpressionEnforcerRecipe) | ||
132 | return instantiateNode(reteContainer, (ExpressionEnforcerRecipe) recipe); | ||
133 | if (recipe instanceof CountAggregatorRecipe) | ||
134 | return instantiateNode(reteContainer, (CountAggregatorRecipe) recipe); | ||
135 | if (recipe instanceof SingleColumnAggregatorRecipe) | ||
136 | return instantiateNode(reteContainer, (SingleColumnAggregatorRecipe) recipe); | ||
137 | if (recipe instanceof DiscriminatorDispatcherRecipe) | ||
138 | return instantiateNode(reteContainer, (DiscriminatorDispatcherRecipe) recipe); | ||
139 | if (recipe instanceof DiscriminatorBucketRecipe) | ||
140 | return instantiateNode(reteContainer, (DiscriminatorBucketRecipe) recipe); | ||
141 | |||
142 | // MultiParentNodeRecipe | ||
143 | if (recipe instanceof UniquenessEnforcerRecipe) | ||
144 | return instantiateNode(reteContainer, (UniquenessEnforcerRecipe) recipe); | ||
145 | if (recipe instanceof ProductionRecipe) | ||
146 | return instantiateNode(reteContainer, (ProductionRecipe) recipe); | ||
147 | |||
148 | // BetaNodeRecipe | ||
149 | if (recipe instanceof JoinRecipe) | ||
150 | return instantiateNode(reteContainer, (JoinRecipe) recipe); | ||
151 | if (recipe instanceof SemiJoinRecipe) | ||
152 | return instantiateNode(reteContainer, (SemiJoinRecipe) recipe); | ||
153 | if (recipe instanceof AntiJoinRecipe) | ||
154 | return instantiateNode(reteContainer, (AntiJoinRecipe) recipe); | ||
155 | |||
156 | // ... else | ||
157 | throw new IllegalArgumentException("Unsupported recipe type: " + recipe); | ||
158 | } | ||
159 | |||
160 | // INSTANTIATION for recipe types | ||
161 | |||
162 | private Supplier instantiateNode(ReteContainer reteContainer, InputRecipe recipe) { | ||
163 | return new ExternalInputEnumeratorNode(reteContainer); | ||
164 | } | ||
165 | |||
166 | private Supplier instantiateNode(ReteContainer reteContainer, InputFilterRecipe recipe) { | ||
167 | return new ExternalInputStatelessFilterNode(reteContainer, toMaskOrNull(recipe.getMask())); | ||
168 | } | ||
169 | |||
170 | private Supplier instantiateNode(ReteContainer reteContainer, CountAggregatorRecipe recipe) { | ||
171 | return new CountNode(reteContainer); | ||
172 | } | ||
173 | |||
174 | private Supplier instantiateNode(ReteContainer reteContainer, TransparentRecipe recipe) { | ||
175 | return new TransparentNode(reteContainer); | ||
176 | } | ||
177 | |||
178 | private Supplier instantiateNode(ReteContainer reteContainer, ExpressionEnforcerRecipe recipe) { | ||
179 | final IExpressionEvaluator evaluator = toIExpressionEvaluator(recipe.getExpression()); | ||
180 | final Map<String, Integer> posMapping = toStringIndexMap(recipe.getMappedIndices()); | ||
181 | final int sourceTupleWidth = recipe.getParent().getArity(); | ||
182 | EvaluatorCore core = null; | ||
183 | if (recipe instanceof CheckRecipe) { | ||
184 | core = new EvaluatorCore.PredicateEvaluatorCore(logger, evaluator, posMapping, sourceTupleWidth); | ||
185 | } else if (recipe instanceof EvalRecipe) { | ||
186 | final boolean isUnwinding = ((EvalRecipe) recipe).isUnwinding(); | ||
187 | core = new EvaluatorCore.FunctionEvaluatorCore(logger, evaluator, posMapping, sourceTupleWidth, isUnwinding); | ||
188 | } else { | ||
189 | throw new IllegalArgumentException("Unhandled expression enforcer recipe: " + recipe.getClass() + "!"); | ||
190 | } | ||
191 | if (recipe.isCacheOutput()) { | ||
192 | return new OutputCachingEvaluatorNode(reteContainer, core); | ||
193 | } else { | ||
194 | return new MemorylessEvaluatorNode(reteContainer, core); | ||
195 | } | ||
196 | } | ||
197 | |||
198 | @SuppressWarnings({ "rawtypes", "unchecked" }) | ||
199 | private Supplier instantiateNode(ReteContainer reteContainer, SingleColumnAggregatorRecipe recipe) { | ||
200 | final IMultisetAggregationOperator operator = recipe.getMultisetAggregationOperator(); | ||
201 | TupleMask coreMask = null; | ||
202 | if (recipe.getOptionalMonotonicityInfo() != null) { | ||
203 | coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask()); | ||
204 | } else { | ||
205 | coreMask = toMask(recipe.getGroupByMask()); | ||
206 | } | ||
207 | |||
208 | if (reteContainer.isTimelyEvaluation()) { | ||
209 | final TimelyConfiguration timelyConfiguration = reteContainer.getTimelyConfiguration(); | ||
210 | final AggregatorArchitecture aggregatorArchitecture = timelyConfiguration.getAggregatorArchitecture(); | ||
211 | final TimelineRepresentation timelineRepresentation = timelyConfiguration.getTimelineRepresentation(); | ||
212 | |||
213 | TupleMask posetMask = null; | ||
214 | |||
215 | if (recipe.getOptionalMonotonicityInfo() != null) { | ||
216 | posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask()); | ||
217 | } else { | ||
218 | final int aggregatedColumn = recipe.getAggregableIndex(); | ||
219 | posetMask = TupleMask.selectSingle(aggregatedColumn, coreMask.sourceWidth); | ||
220 | } | ||
221 | |||
222 | if (timelineRepresentation == TimelineRepresentation.FIRST_ONLY | ||
223 | && aggregatorArchitecture == AggregatorArchitecture.SEQUENTIAL) { | ||
224 | return new FirstOnlySequentialTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask); | ||
225 | } else if (timelineRepresentation == TimelineRepresentation.FIRST_ONLY | ||
226 | && aggregatorArchitecture == AggregatorArchitecture.PARALLEL) { | ||
227 | return new FirstOnlyParallelTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask); | ||
228 | } else if (timelineRepresentation == TimelineRepresentation.FAITHFUL | ||
229 | && aggregatorArchitecture == AggregatorArchitecture.SEQUENTIAL) { | ||
230 | return new FaithfulSequentialTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask); | ||
231 | } else if (timelineRepresentation == TimelineRepresentation.FAITHFUL | ||
232 | && aggregatorArchitecture == AggregatorArchitecture.PARALLEL) { | ||
233 | return new FaithfulParallelTimelyColumnAggregatorNode(reteContainer, operator, coreMask, posetMask); | ||
234 | } else { | ||
235 | throw new IllegalArgumentException("Unsupported timely configuration!"); | ||
236 | } | ||
237 | } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) { | ||
238 | final TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask()); | ||
239 | final IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo() | ||
240 | .getPosetComparator(); | ||
241 | return new ColumnAggregatorNode(reteContainer, operator, recipe.isDeleteRederiveEvaluation(), coreMask, | ||
242 | posetMask, posetComparator); | ||
243 | } else { | ||
244 | final int aggregatedColumn = recipe.getAggregableIndex(); | ||
245 | return new ColumnAggregatorNode(reteContainer, operator, coreMask, aggregatedColumn); | ||
246 | } | ||
247 | } | ||
248 | |||
249 | private Supplier instantiateNode(ReteContainer reteContainer, TransitiveClosureRecipe recipe) { | ||
250 | return new TransitiveClosureNode(reteContainer); | ||
251 | } | ||
252 | |||
253 | private Supplier instantiateNode(ReteContainer reteContainer, RepresentativeElectionRecipe recipe) { | ||
254 | RepresentativeElectionAlgorithm.Factory algorithmFactory = switch (recipe.getConnectivity()) { | ||
255 | case STRONG -> StronglyConnectedComponentAlgorithm::new; | ||
256 | case WEAK -> WeaklyConnectedComponentAlgorithm::new; | ||
257 | }; | ||
258 | return new RepresentativeElectionNode(reteContainer, algorithmFactory); | ||
259 | } | ||
260 | |||
261 | private Supplier instantiateNode(ReteContainer reteContainer, RelationEvaluationRecipe recipe) { | ||
262 | return new RelationEvaluatorNode(reteContainer, toIRelationEvaluator(recipe.getEvaluator())); | ||
263 | } | ||
264 | |||
265 | private Supplier instantiateNode(ReteContainer reteContainer, ProductionRecipe recipe) { | ||
266 | if (reteContainer.isTimelyEvaluation()) { | ||
267 | return new TimelyProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices())); | ||
268 | } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) { | ||
269 | TupleMask coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask()); | ||
270 | TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask()); | ||
271 | IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo() | ||
272 | .getPosetComparator(); | ||
273 | return new DefaultProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices()), | ||
274 | recipe.isDeleteRederiveEvaluation(), coreMask, posetMask, posetComparator); | ||
275 | } else { | ||
276 | return new DefaultProductionNode(reteContainer, toStringIndexMap(recipe.getMappedIndices()), | ||
277 | recipe.isDeleteRederiveEvaluation()); | ||
278 | } | ||
279 | } | ||
280 | |||
281 | private Supplier instantiateNode(ReteContainer reteContainer, UniquenessEnforcerRecipe recipe) { | ||
282 | if (reteContainer.isTimelyEvaluation()) { | ||
283 | return new TimelyUniquenessEnforcerNode(reteContainer, recipe.getArity()); | ||
284 | } else if (recipe.isDeleteRederiveEvaluation() && recipe.getOptionalMonotonicityInfo() != null) { | ||
285 | TupleMask coreMask = toMask(recipe.getOptionalMonotonicityInfo().getCoreMask()); | ||
286 | TupleMask posetMask = toMask(recipe.getOptionalMonotonicityInfo().getPosetMask()); | ||
287 | IPosetComparator posetComparator = (IPosetComparator) recipe.getOptionalMonotonicityInfo() | ||
288 | .getPosetComparator(); | ||
289 | return new UniquenessEnforcerNode(reteContainer, recipe.getArity(), recipe.isDeleteRederiveEvaluation(), | ||
290 | coreMask, posetMask, posetComparator); | ||
291 | } else { | ||
292 | return new UniquenessEnforcerNode(reteContainer, recipe.getArity(), recipe.isDeleteRederiveEvaluation()); | ||
293 | } | ||
294 | } | ||
295 | |||
296 | private Supplier instantiateNode(ReteContainer reteContainer, ConstantRecipe recipe) { | ||
297 | final List<Object> constantValues = recipe.getConstantValues(); | ||
298 | final Object[] constantArray = constantValues.toArray(new Object[constantValues.size()]); | ||
299 | return new ConstantNode(reteContainer, Tuples.flatTupleOf(constantArray)); | ||
300 | } | ||
301 | |||
302 | private Supplier instantiateNode(ReteContainer reteContainer, DiscriminatorBucketRecipe recipe) { | ||
303 | return new DiscriminatorBucketNode(reteContainer, recipe.getBucketKey()); | ||
304 | } | ||
305 | |||
306 | private Supplier instantiateNode(ReteContainer reteContainer, DiscriminatorDispatcherRecipe recipe) { | ||
307 | return new DiscriminatorDispatcherNode(reteContainer, recipe.getDiscriminationColumnIndex()); | ||
308 | } | ||
309 | |||
310 | private Supplier instantiateNode(ReteContainer reteContainer, TrimmerRecipe recipe) { | ||
311 | return new TrimmerNode(reteContainer, toMask(recipe.getMask())); | ||
312 | } | ||
313 | |||
314 | private Supplier instantiateNode(ReteContainer reteContainer, InequalityFilterRecipe recipe) { | ||
315 | Tunnel result = new InequalityFilterNode(reteContainer, recipe.getSubject(), | ||
316 | TupleMask.fromSelectedIndices(recipe.getParent().getArity(), recipe.getInequals())); | ||
317 | return result; | ||
318 | } | ||
319 | |||
320 | private Supplier instantiateNode(ReteContainer reteContainer, EqualityFilterRecipe recipe) { | ||
321 | final int[] equalIndices = TupleMask.integersToIntArray(recipe.getIndices()); | ||
322 | return new EqualityFilterNode(reteContainer, equalIndices); | ||
323 | } | ||
324 | |||
325 | private Supplier instantiateNode(ReteContainer reteContainer, AntiJoinRecipe recipe) { | ||
326 | return new ExistenceNode(reteContainer, true); | ||
327 | } | ||
328 | |||
329 | private Supplier instantiateNode(ReteContainer reteContainer, SemiJoinRecipe recipe) { | ||
330 | return new ExistenceNode(reteContainer, false); | ||
331 | } | ||
332 | |||
333 | private Supplier instantiateNode(ReteContainer reteContainer, JoinRecipe recipe) { | ||
334 | return new JoinNode(reteContainer, toMask(recipe.getRightParentComplementaryMask())); | ||
335 | } | ||
336 | |||
337 | // HELPERS | ||
338 | |||
339 | private IExpressionEvaluator toIExpressionEvaluator(ExpressionDefinition expressionDefinition) { | ||
340 | final Object evaluator = expressionDefinition.getEvaluator(); | ||
341 | if (evaluator instanceof IExpressionEvaluator) { | ||
342 | return (IExpressionEvaluator) evaluator; | ||
343 | } | ||
344 | throw new IllegalArgumentException("No runtime support for expression evaluator: " + evaluator); | ||
345 | } | ||
346 | |||
347 | private IRelationEvaluator toIRelationEvaluator(ExpressionDefinition expressionDefinition) { | ||
348 | final Object evaluator = expressionDefinition.getEvaluator(); | ||
349 | if (evaluator instanceof IRelationEvaluator) { | ||
350 | return (IRelationEvaluator) evaluator; | ||
351 | } | ||
352 | throw new IllegalArgumentException("No runtime support for relation evaluator: " + evaluator); | ||
353 | } | ||
354 | |||
355 | private Map<String, Integer> toStringIndexMap(final EMap<String, Integer> mappedIndices) { | ||
356 | final HashMap<String, Integer> result = new HashMap<String, Integer>(); | ||
357 | for (java.util.Map.Entry<String, Integer> entry : mappedIndices) { | ||
358 | result.put(entry.getKey(), entry.getValue()); | ||
359 | } | ||
360 | return result; | ||
361 | } | ||
362 | |||
363 | /** Mask can be null */ | ||
364 | private TupleMask toMaskOrNull(Mask mask) { | ||
365 | if (mask == null) | ||
366 | return null; | ||
367 | else | ||
368 | return toMask(mask); | ||
369 | } | ||
370 | |||
371 | /** Mask is non-null. */ | ||
372 | private TupleMask toMask(Mask mask) { | ||
373 | return TupleMask.fromSelectedIndices(mask.getSourceArity(), mask.getSourceIndices()); | ||
374 | } | ||
375 | |||
376 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java new file mode 100644 index 00000000..9121fc44 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/NodeProvisioner.java | |||
@@ -0,0 +1,346 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext; | ||
13 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
14 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
15 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
16 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
17 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
18 | import tools.refinery.viatra.runtime.rete.construction.plancompiler.CompilerHelper; | ||
19 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
20 | import tools.refinery.viatra.runtime.rete.index.OnetimeIndexer; | ||
21 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
22 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand; | ||
23 | import tools.refinery.viatra.runtime.rete.recipes.*; | ||
24 | import tools.refinery.viatra.runtime.rete.recipes.helper.RecipeRecognizer; | ||
25 | import tools.refinery.viatra.runtime.rete.recipes.helper.RecipesHelper; | ||
26 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
27 | import tools.refinery.viatra.runtime.rete.remote.RemoteReceiver; | ||
28 | import tools.refinery.viatra.runtime.rete.remote.RemoteSupplier; | ||
29 | import tools.refinery.viatra.runtime.rete.traceability.ActiveNodeConflictTrace; | ||
30 | import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo; | ||
31 | import tools.refinery.viatra.runtime.rete.traceability.UserRequestTrace; | ||
32 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
33 | |||
34 | import java.util.Map; | ||
35 | import java.util.Set; | ||
36 | |||
37 | /** | ||
38 | * Stores the internal parts of a rete network. Nodes are stored according to type and parameters. | ||
39 | * | ||
40 | * @author Gabor Bergmann | ||
41 | */ | ||
42 | public class NodeProvisioner { | ||
43 | |||
44 | // boolean activeStorage = true; | ||
45 | |||
46 | ReteContainer reteContainer; | ||
47 | NodeFactory nodeFactory; | ||
48 | ConnectionFactory connectionFactory; | ||
49 | InputConnector inputConnector; | ||
50 | IQueryRuntimeContext runtimeContext; | ||
51 | |||
52 | // TODO as recipe? | ||
53 | Map<Supplier, RemoteReceiver> remoteReceivers = CollectionsFactory.createMap(); | ||
54 | Map<Address<? extends Supplier>, RemoteSupplier> remoteSuppliers = CollectionsFactory.createMap(); | ||
55 | |||
56 | private RecipeRecognizer recognizer; | ||
57 | |||
58 | /** | ||
59 | * PRE: NodeFactory, ConnectionFactory must exist | ||
60 | * | ||
61 | * @param reteContainer | ||
62 | * the ReteNet whose interior is to be mapped. | ||
63 | */ | ||
64 | public NodeProvisioner(ReteContainer reteContainer) { | ||
65 | super(); | ||
66 | this.reteContainer = reteContainer; | ||
67 | this.nodeFactory = reteContainer.getNodeFactory(); | ||
68 | this.connectionFactory = reteContainer.getConnectionFactory(); | ||
69 | this.inputConnector = reteContainer.getInputConnectionFactory(); | ||
70 | runtimeContext = reteContainer.getNetwork().getEngine().getRuntimeContext(); | ||
71 | recognizer = new RecipeRecognizer(runtimeContext); | ||
72 | } | ||
73 | |||
74 | public synchronized Address<? extends Node> getOrCreateNodeByRecipe(RecipeTraceInfo recipeTrace) { | ||
75 | ReteNodeRecipe recipe = recipeTrace.getRecipe(); | ||
76 | Address<? extends Node> result = getNodesByRecipe().get(recipe); | ||
77 | if (result != null) { | ||
78 | // NODE ALREADY CONSTRUCTED FOR RECIPE, only needs to add trace | ||
79 | if (getRecipeTraces().add(recipeTrace)) | ||
80 | result.getNodeCache().assignTraceInfo(recipeTrace); | ||
81 | } else { | ||
82 | // No node for this recipe object - but equivalent recipes still | ||
83 | // reusable | ||
84 | ReteNodeRecipe canonicalRecipe = recognizer.canonicalizeRecipe(recipe); | ||
85 | if (canonicalRecipe != recipe) { | ||
86 | // FOUND EQUIVALENT RECIPE | ||
87 | result = getNodesByRecipe().get(canonicalRecipe); | ||
88 | if (result != null) { | ||
89 | // NODE ALREADY CONSTRUCTED FOR EQUIVALENT RECIPE | ||
90 | recipeTrace.shadowWithEquivalentRecipe(canonicalRecipe); | ||
91 | getNodesByRecipe().put(recipe, result); | ||
92 | if (getRecipeTraces().add(recipeTrace)) | ||
93 | result.getNodeCache().assignTraceInfo(recipeTrace); | ||
94 | // Bug 491922: ensure that recipe shadowing propagates to | ||
95 | // parent traces | ||
96 | // note that if equivalentRecipes() becomes more | ||
97 | // sophisticated | ||
98 | // and considers recipes with different parents, this might | ||
99 | // have to be changed | ||
100 | ensureParents(recipeTrace); | ||
101 | } else { | ||
102 | // CONSTRUCTION IN PROGRESS FOR EQUIVALENT RECIPE | ||
103 | if (recipe instanceof IndexerRecipe) { | ||
104 | // this is allowed for indexers; | ||
105 | // go on with the construction, as the same indexer node | ||
106 | // will be obtained anyways | ||
107 | } else { | ||
108 | throw new IllegalStateException( | ||
109 | "This should not happen: " + "non-indexer nodes are are supposed to be constructed " | ||
110 | + "as soon as they are designated as canonical recipes"); | ||
111 | } | ||
112 | } | ||
113 | } | ||
114 | if (result == null) { | ||
115 | // MUST INSTANTIATE NEW NODE FOR RECIPE | ||
116 | final Node freshNode = instantiateNodeForRecipe(recipeTrace, recipe); | ||
117 | result = reteContainer.makeAddress(freshNode); | ||
118 | } | ||
119 | } | ||
120 | return result; | ||
121 | } | ||
122 | |||
123 | private Set<RecipeTraceInfo> getRecipeTraces() { | ||
124 | return reteContainer.network.recipeTraces; | ||
125 | } | ||
126 | |||
127 | private Node instantiateNodeForRecipe(RecipeTraceInfo recipeTrace, final ReteNodeRecipe recipe) { | ||
128 | this.getRecipeTraces().add(recipeTrace); | ||
129 | if (recipe instanceof IndexerRecipe) { | ||
130 | |||
131 | // INSTANTIATE AND HOOK UP | ||
132 | // (cannot delay hooking up, because parent determines indexer | ||
133 | // implementation) | ||
134 | ensureParents(recipeTrace); | ||
135 | final ReteNodeRecipe parentRecipe = recipeTrace.getParentRecipeTraces().iterator().next().getRecipe(); | ||
136 | final Indexer result = nodeFactory.createIndexer(reteContainer, (IndexerRecipe) recipe, | ||
137 | asSupplier( | ||
138 | (Address<? extends Supplier>) reteContainer.network.getExistingNodeByRecipe(parentRecipe)), | ||
139 | recipeTrace); | ||
140 | |||
141 | // REMEMBER | ||
142 | if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) { | ||
143 | getNodesByRecipe().put(recipe, reteContainer.makeAddress(result)); | ||
144 | } | ||
145 | |||
146 | return result; | ||
147 | } else { | ||
148 | |||
149 | // INSTANTIATE | ||
150 | Node result = nodeFactory.createNode(reteContainer, recipe, recipeTrace); | ||
151 | |||
152 | // REMEMBER | ||
153 | if (Options.nodeSharingOption == Options.NodeSharingOption.ALL) { | ||
154 | getNodesByRecipe().put(recipe, reteContainer.makeAddress(result)); | ||
155 | } | ||
156 | |||
157 | // HOOK UP | ||
158 | // (recursion-tolerant due to this delayed order of initialization) | ||
159 | if (recipe instanceof InputRecipe) { | ||
160 | inputConnector.connectInput((InputRecipe) recipe, result); | ||
161 | } else { | ||
162 | if (recipe instanceof InputFilterRecipe) | ||
163 | inputConnector.connectInputFilter((InputFilterRecipe) recipe, result); | ||
164 | ensureParents(recipeTrace); | ||
165 | connectionFactory.connectToParents(recipeTrace, result); | ||
166 | } | ||
167 | return result; | ||
168 | } | ||
169 | } | ||
170 | |||
171 | private Map<ReteNodeRecipe, Address<? extends Node>> getNodesByRecipe() { | ||
172 | return reteContainer.network.nodesByRecipe; | ||
173 | } | ||
174 | |||
175 | private void ensureParents(RecipeTraceInfo recipeTrace) { | ||
176 | for (RecipeTraceInfo parentTrace : recipeTrace.getParentRecipeTraces()) { | ||
177 | getOrCreateNodeByRecipe(parentTrace); | ||
178 | } | ||
179 | } | ||
180 | |||
181 | //// Remoting - TODO eliminate? | ||
182 | |||
183 | synchronized RemoteReceiver accessRemoteReceiver(Address<? extends Supplier> address) { | ||
184 | throw new UnsupportedOperationException("Multi-container Rete not supported yet"); | ||
185 | // if (!reteContainer.isLocal(address)) | ||
186 | // return | ||
187 | // address.getContainer().getProvisioner().accessRemoteReceiver(address); | ||
188 | // Supplier localSupplier = reteContainer.resolveLocal(address); | ||
189 | // RemoteReceiver result = remoteReceivers.get(localSupplier); | ||
190 | // if (result == null) { | ||
191 | // result = new RemoteReceiver(reteContainer); | ||
192 | // reteContainer.connect(localSupplier, result); // stateless node, no | ||
193 | // // synch required | ||
194 | // | ||
195 | // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) | ||
196 | // remoteReceivers.put(localSupplier, result); | ||
197 | // } | ||
198 | // return result; | ||
199 | } | ||
200 | |||
201 | /** | ||
202 | * @pre: address is NOT local | ||
203 | */ | ||
204 | synchronized RemoteSupplier accessRemoteSupplier(Address<? extends Supplier> address) { | ||
205 | throw new UnsupportedOperationException("Multi-container Rete not supported yet"); | ||
206 | // RemoteSupplier result = remoteSuppliers.get(address); | ||
207 | // if (result == null) { | ||
208 | // result = new RemoteSupplier(reteContainer, | ||
209 | // address.getContainer().getProvisioner() | ||
210 | // .accessRemoteReceiver(address)); | ||
211 | // // network.connectAndSynchronize(supplier, result); | ||
212 | // | ||
213 | // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) | ||
214 | // remoteSuppliers.put(address, result); | ||
215 | // } | ||
216 | // return result; | ||
217 | } | ||
218 | |||
219 | /** | ||
220 | * The powerful method for accessing any (supplier) Address as a local supplier. | ||
221 | */ | ||
222 | public Supplier asSupplier(Address<? extends Supplier> address) { | ||
223 | if (!reteContainer.isLocal(address)) | ||
224 | return accessRemoteSupplier(address); | ||
225 | else | ||
226 | return reteContainer.resolveLocal(address); | ||
227 | } | ||
228 | |||
229 | /** the composite key tuple is formed as (RecipeTraceInfo, TupleMask) */ | ||
230 | private Map<Tuple, UserRequestTrace> projectionIndexerUserRequests = CollectionsFactory.createMap(); | ||
231 | |||
232 | // local version | ||
233 | // TODO remove? | ||
234 | public synchronized ProjectionIndexer accessProjectionIndexer(RecipeTraceInfo productionTrace, TupleMask mask) { | ||
235 | Tuple tableKey = Tuples.staticArityFlatTupleOf(productionTrace, mask); | ||
236 | UserRequestTrace indexerTrace = projectionIndexerUserRequests.computeIfAbsent(tableKey, k -> { | ||
237 | final ProjectionIndexerRecipe projectionIndexerRecipe = projectionIndexerRecipe( | ||
238 | productionTrace, mask); | ||
239 | return new UserRequestTrace(projectionIndexerRecipe, productionTrace); | ||
240 | }); | ||
241 | final Address<? extends Node> address = getOrCreateNodeByRecipe(indexerTrace); | ||
242 | return (ProjectionIndexer) reteContainer.resolveLocal(address); | ||
243 | } | ||
244 | |||
245 | // local version | ||
246 | public synchronized ProjectionIndexer accessProjectionIndexerOnetime(RecipeTraceInfo supplierTrace, | ||
247 | TupleMask mask) { | ||
248 | if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) | ||
249 | return accessProjectionIndexer(supplierTrace, mask); | ||
250 | |||
251 | final Address<? extends Node> supplierAddress = getOrCreateNodeByRecipe(supplierTrace); | ||
252 | Supplier supplier = (Supplier) reteContainer.resolveLocal(supplierAddress); | ||
253 | |||
254 | OnetimeIndexer result = new OnetimeIndexer(reteContainer, mask); | ||
255 | reteContainer.getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, result, reteContainer)); | ||
256 | |||
257 | return result; | ||
258 | } | ||
259 | |||
260 | // local, read-only version | ||
261 | public synchronized ProjectionIndexer peekProjectionIndexer(RecipeTraceInfo supplierTrace, TupleMask mask) { | ||
262 | final Address<? extends Node> address = getNodesByRecipe().get(projectionIndexerRecipe(supplierTrace, mask)); | ||
263 | return address == null ? null : (ProjectionIndexer) reteContainer.resolveLocal(address); | ||
264 | } | ||
265 | |||
266 | private ProjectionIndexerRecipe projectionIndexerRecipe( | ||
267 | RecipeTraceInfo parentTrace, TupleMask mask) { | ||
268 | final ReteNodeRecipe parentRecipe = parentTrace.getRecipe(); | ||
269 | Tuple tableKey = Tuples.staticArityFlatTupleOf(parentRecipe, mask); | ||
270 | ProjectionIndexerRecipe projectionIndexerRecipe = resultSeedRecipes.computeIfAbsent(tableKey, k -> | ||
271 | RecipesHelper.projectionIndexerRecipe(parentRecipe, CompilerHelper.toRecipeMask(mask)) | ||
272 | ); | ||
273 | return projectionIndexerRecipe; | ||
274 | } | ||
275 | |||
276 | /** the composite key tuple is formed as (ReteNodeRecipe, TupleMask) */ | ||
277 | private Map<Tuple, ProjectionIndexerRecipe> resultSeedRecipes = CollectionsFactory.createMap(); | ||
278 | |||
279 | // public synchronized Address<? extends Supplier> | ||
280 | // accessValueBinderFilterNode( | ||
281 | // Address<? extends Supplier> supplierAddress, int bindingIndex, Object | ||
282 | // bindingValue) { | ||
283 | // Supplier supplier = asSupplier(supplierAddress); | ||
284 | // Object[] paramsArray = { supplier.getNodeId(), bindingIndex, bindingValue | ||
285 | // }; | ||
286 | // Tuple params = new FlatTuple(paramsArray); | ||
287 | // ValueBinderFilterNode result = valueBinderFilters.get(params); | ||
288 | // if (result == null) { | ||
289 | // result = new ValueBinderFilterNode(reteContainer, bindingIndex, | ||
290 | // bindingValue); | ||
291 | // reteContainer.connect(supplier, result); // stateless node, no synch | ||
292 | // // required | ||
293 | // | ||
294 | // if (Options.nodeSharingOption == Options.NodeSharingOption.ALL) | ||
295 | // valueBinderFilters.put(params, result); | ||
296 | // } | ||
297 | // return reteContainer.makeAddress(result); | ||
298 | // } | ||
299 | |||
300 | /** | ||
301 | * Returns a copy of the given indexer that is an active node by itself (created if does not exist). (Convention: | ||
302 | * attached with same mask to a transparent node that is attached to parent node.) Node is created if it does not | ||
303 | * exist yet. | ||
304 | * | ||
305 | * @return an identical but active indexer | ||
306 | */ | ||
307 | // TODO rethink traceability | ||
308 | RecipeTraceInfo accessActiveIndexer(RecipeTraceInfo inactiveIndexerRecipeTrace) { | ||
309 | final RecipeTraceInfo parentRecipeTrace = inactiveIndexerRecipeTrace.getParentRecipeTraces().iterator().next(); | ||
310 | final ProjectionIndexerRecipe inactiveIndexerRecipe = (ProjectionIndexerRecipe) inactiveIndexerRecipeTrace | ||
311 | .getRecipe(); | ||
312 | |||
313 | final TransparentRecipe transparentRecipe = RecipesFactory.eINSTANCE.createTransparentRecipe(); | ||
314 | transparentRecipe.setParent(parentRecipeTrace.getRecipe()); | ||
315 | final ActiveNodeConflictTrace transparentRecipeTrace = new ActiveNodeConflictTrace(transparentRecipe, | ||
316 | parentRecipeTrace, inactiveIndexerRecipeTrace); | ||
317 | |||
318 | final ProjectionIndexerRecipe activeIndexerRecipe = RecipesFactory.eINSTANCE | ||
319 | .createProjectionIndexerRecipe(); | ||
320 | activeIndexerRecipe.setParent(transparentRecipe); | ||
321 | activeIndexerRecipe.setMask(inactiveIndexerRecipe.getMask()); | ||
322 | final ActiveNodeConflictTrace activeIndexerRecipeTrace = new ActiveNodeConflictTrace(activeIndexerRecipe, | ||
323 | transparentRecipeTrace, inactiveIndexerRecipeTrace); | ||
324 | |||
325 | return activeIndexerRecipeTrace; | ||
326 | } | ||
327 | |||
328 | // /** | ||
329 | // * @param parent | ||
330 | // * @return | ||
331 | // */ | ||
332 | // private TransparentNode accessTransparentNodeInternal(Supplier parent) { | ||
333 | // nodeFactory. | ||
334 | // return null; | ||
335 | // } | ||
336 | |||
337 | // public synchronized void registerSpecializedProjectionIndexer(Node node, | ||
338 | // ProjectionIndexer indexer) { | ||
339 | // if (Options.nodeSharingOption != Options.NodeSharingOption.NEVER) { | ||
340 | // Object[] paramsArray = { node.getNodeId(), indexer.getMask() }; | ||
341 | // Tuple params = new FlatTuple(paramsArray); | ||
342 | // projectionIndexers.put(params, indexer); | ||
343 | // } | ||
344 | // } | ||
345 | |||
346 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java new file mode 100644 index 00000000..1eaa18e7 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/PosetAwareReceiver.java | |||
@@ -0,0 +1,39 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.context.IPosetComparator; | ||
12 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
13 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
14 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
15 | |||
16 | /** | ||
17 | * @author Tamas Szabo | ||
18 | * @since 2.0 | ||
19 | */ | ||
20 | public interface PosetAwareReceiver extends Receiver { | ||
21 | |||
22 | public TupleMask getCoreMask(); | ||
23 | |||
24 | public TupleMask getPosetMask(); | ||
25 | |||
26 | public IPosetComparator getPosetComparator(); | ||
27 | |||
28 | /** | ||
29 | * Updates the receiver with a newly found or lost partial matching also providing information | ||
30 | * whether the update is a monotone change or not. | ||
31 | * | ||
32 | * @param direction the direction of the update | ||
33 | * @param update the update tuple | ||
34 | * @param monotone true if the update is monotone, false otherwise | ||
35 | * @since 2.4 | ||
36 | */ | ||
37 | public void updateWithPosetInfo(Direction direction, Tuple update, boolean monotone); | ||
38 | |||
39 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java new file mode 100644 index 00000000..211194c0 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ProductionNode.java | |||
@@ -0,0 +1,28 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
15 | |||
16 | /** | ||
17 | * Interface intended for nodes containing complete matches. | ||
18 | * | ||
19 | * @author Gabor Bergmann | ||
20 | */ | ||
21 | public interface ProductionNode extends Tunnel, Iterable<Tuple> { | ||
22 | |||
23 | /** | ||
24 | * @return the position mapping of this particular pattern that maps members of the tuple type to their positions | ||
25 | */ | ||
26 | Map<String, Integer> getPosMapping(); | ||
27 | |||
28 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java new file mode 100644 index 00000000..3dc9aad7 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Receiver.java | |||
@@ -0,0 +1,85 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.Collection; | ||
13 | import java.util.Map; | ||
14 | import java.util.Map.Entry; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
19 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
20 | |||
21 | /** | ||
22 | * ALL METHODS: FOR INTERNAL USE ONLY; ONLY INVOKE FROM {@link ReteContainer} | ||
23 | * | ||
24 | * @author Gabor Bergmann | ||
25 | * @noimplement This interface is not intended to be implemented by external clients. | ||
26 | */ | ||
27 | public interface Receiver extends Node { | ||
28 | |||
29 | /** | ||
30 | * Updates the receiver with a newly found or lost partial matching. | ||
31 | * | ||
32 | * @since 2.4 | ||
33 | */ | ||
34 | public void update(final Direction direction, final Tuple updateElement, final Timestamp timestamp); | ||
35 | |||
36 | /** | ||
37 | * Updates the receiver in batch style with a collection of updates. The input collection consists of pairs in the | ||
38 | * form (t, c) where t is an update tuple and c is the count. The count can also be negative, and it specifies how | ||
39 | * many times the tuple t gets deleted or inserted. The default implementation of this method simply calls | ||
40 | * {@link #update(Direction, Tuple, Timestamp)} individually for all updates. | ||
41 | * | ||
42 | * @since 2.8 | ||
43 | */ | ||
44 | public default void batchUpdate(final Collection<Map.Entry<Tuple, Integer>> updates, final Timestamp timestamp) { | ||
45 | for (final Entry<Tuple, Integer> entry : updates) { | ||
46 | int count = entry.getValue(); | ||
47 | |||
48 | Direction direction; | ||
49 | if (count < 0) { | ||
50 | direction = Direction.DELETE; | ||
51 | count = -count; | ||
52 | } else { | ||
53 | direction = Direction.INSERT; | ||
54 | } | ||
55 | |||
56 | for (int i = 0; i < count; i++) { | ||
57 | update(direction, entry.getKey(), timestamp); | ||
58 | } | ||
59 | } | ||
60 | } | ||
61 | |||
62 | /** | ||
63 | * Returns the {@link Mailbox} of this receiver. | ||
64 | * | ||
65 | * @return the mailbox | ||
66 | * @since 2.0 | ||
67 | */ | ||
68 | public Mailbox getMailbox(); | ||
69 | |||
70 | /** | ||
71 | * appends a parent that will continuously send insert and revoke updates to this supplier | ||
72 | */ | ||
73 | void appendParent(final Supplier supplier); | ||
74 | |||
75 | /** | ||
76 | * removes a parent | ||
77 | */ | ||
78 | void removeParent(final Supplier supplier); | ||
79 | |||
80 | /** | ||
81 | * access active parent | ||
82 | */ | ||
83 | Collection<Supplier> getParents(); | ||
84 | |||
85 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java new file mode 100644 index 00000000..cae78d37 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/RederivableNode.java | |||
@@ -0,0 +1,34 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network; | ||
10 | |||
11 | /** | ||
12 | * A rederivable node can potentially re-derive tuples after the Rete network has finished the delivery of messages. | ||
13 | * | ||
14 | * @author Tamas Szabo | ||
15 | * @since 1.6 | ||
16 | */ | ||
17 | public interface RederivableNode extends Node, IGroupable { | ||
18 | |||
19 | /** | ||
20 | * The method is called by the {@link ReteContainer} to re-derive tuples after the normal messages have been | ||
21 | * delivered and consumed. The re-derivation process may trigger the creation and delivery of further messages | ||
22 | * and further re-derivation rounds. | ||
23 | */ | ||
24 | public void rederiveOne(); | ||
25 | |||
26 | /** | ||
27 | * Returns true if this node actually runs in DRed mode (not necessarily). | ||
28 | * | ||
29 | * @return true if the node is operating in DRed mode | ||
30 | * @since 2.0 | ||
31 | */ | ||
32 | public boolean isInDRedMode(); | ||
33 | |||
34 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java new file mode 100644 index 00000000..09bff29e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReinitializedNode.java | |||
@@ -0,0 +1,14 @@ | |||
1 | /* | ||
2 | * SPDX-FileCopyrightText: 2023 The Refinery Authors <https://refinery.tools/> | ||
3 | * | ||
4 | * SPDX-License-Identifier: EPL-2.0 | ||
5 | */ | ||
6 | package tools.refinery.viatra.runtime.rete.network; | ||
7 | |||
8 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
9 | |||
10 | import java.util.Collection; | ||
11 | |||
12 | public interface ReinitializedNode { | ||
13 | void reinitializeWith(Collection<Tuple> tuples); | ||
14 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java new file mode 100644 index 00000000..79e0526d --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java | |||
@@ -0,0 +1,729 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | |||
11 | package tools.refinery.viatra.runtime.rete.network; | ||
12 | |||
13 | import org.apache.log4j.Logger; | ||
14 | import tools.refinery.viatra.runtime.CancellationToken; | ||
15 | import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
21 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
22 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker; | ||
27 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker; | ||
28 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand; | ||
29 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand; | ||
30 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand; | ||
31 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
32 | import tools.refinery.viatra.runtime.rete.single.SingleInputNode; | ||
33 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
34 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
35 | |||
36 | import java.util.*; | ||
37 | import java.util.function.Function; | ||
38 | |||
39 | /** | ||
40 | * @author Gabor Bergmann | ||
41 | * | ||
42 | * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue | ||
43 | * @since 2.2 | ||
44 | */ | ||
45 | public final class ReteContainer { | ||
46 | |||
47 | protected Thread consumerThread = null; | ||
48 | protected boolean killed = false; | ||
49 | |||
50 | protected Network network; | ||
51 | |||
52 | protected LinkedList<Clearable> clearables; | ||
53 | protected Map<Long, Node> nodesById; | ||
54 | protected long nextId = 0; | ||
55 | |||
56 | protected ConnectionFactory connectionFactory; | ||
57 | protected NodeProvisioner nodeProvisioner; | ||
58 | |||
59 | protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
60 | protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
61 | protected Object externalMessageLock = new Object(); | ||
62 | protected Long clock = 1L; // even: steady state, odd: active queue; access | ||
63 | // ONLY with messageQueue locked! | ||
64 | protected Map<ReteContainer, Long> terminationCriteria = null; | ||
65 | protected final Logger logger; | ||
66 | protected final CommunicationTracker tracker; | ||
67 | |||
68 | protected final IQueryBackendContext backendContext; | ||
69 | |||
70 | protected Set<DelayedCommand> delayedCommandQueue; | ||
71 | protected Set<DelayedCommand> delayedCommandBuffer; | ||
72 | protected boolean executingDelayedCommands; | ||
73 | |||
74 | protected final TimelyConfiguration timelyConfiguration; | ||
75 | |||
76 | private final CancellationToken cancellationToken; | ||
77 | |||
78 | /** | ||
79 | * @param threaded | ||
80 | * false if operating in a single-threaded environment | ||
81 | */ | ||
82 | public ReteContainer(Network network, boolean threaded) { | ||
83 | super(); | ||
84 | this.network = network; | ||
85 | this.backendContext = network.getEngine().getBackendContext(); | ||
86 | this.timelyConfiguration = network.getEngine().getTimelyConfiguration(); | ||
87 | cancellationToken = backendContext.getRuntimeContext().getCancellationToken(); | ||
88 | |||
89 | this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>(); | ||
90 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
91 | this.executingDelayedCommands = false; | ||
92 | |||
93 | if (this.isTimelyEvaluation()) { | ||
94 | this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration()); | ||
95 | } else { | ||
96 | this.tracker = new TimelessCommunicationTracker(); | ||
97 | } | ||
98 | |||
99 | this.nodesById = CollectionsFactory.createMap(); | ||
100 | this.clearables = new LinkedList<Clearable>(); | ||
101 | this.logger = network.getEngine().getLogger(); | ||
102 | |||
103 | this.connectionFactory = new ConnectionFactory(this); | ||
104 | this.nodeProvisioner = new NodeProvisioner(this); | ||
105 | |||
106 | if (threaded) { | ||
107 | this.terminationCriteria = CollectionsFactory.createMap(); | ||
108 | this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) { | ||
109 | @Override | ||
110 | public void run() { | ||
111 | messageConsumptionCycle(); | ||
112 | } | ||
113 | }; | ||
114 | this.consumerThread.start(); | ||
115 | } | ||
116 | } | ||
117 | |||
118 | /** | ||
119 | * @since 2.4 | ||
120 | */ | ||
121 | public boolean isTimelyEvaluation() { | ||
122 | return this.timelyConfiguration != null; | ||
123 | } | ||
124 | |||
125 | /** | ||
126 | * @since 2.4 | ||
127 | */ | ||
128 | public TimelyConfiguration getTimelyConfiguration() { | ||
129 | return this.timelyConfiguration; | ||
130 | } | ||
131 | |||
132 | /** | ||
133 | * @since 1.6 | ||
134 | * @return the communication graph of the nodes, incl. message scheduling | ||
135 | */ | ||
136 | public CommunicationTracker getCommunicationTracker() { | ||
137 | return tracker; | ||
138 | } | ||
139 | |||
140 | /** | ||
141 | * Stops this container. To be called by Network.kill() | ||
142 | */ | ||
143 | public void kill() { | ||
144 | killed = true; | ||
145 | if (consumerThread != null) | ||
146 | consumerThread.interrupt(); | ||
147 | } | ||
148 | |||
149 | /** | ||
150 | * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption | ||
151 | * is that this container is the home of the receiver, but it is not strictly necessary. | ||
152 | * | ||
153 | * @param synchronise | ||
154 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
155 | */ | ||
156 | public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
157 | boolean synchronise) { | ||
158 | if (!isLocal(receiver)) | ||
159 | receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise); | ||
160 | else { | ||
161 | Receiver child = resolveLocal(receiver); | ||
162 | connectRemoteSupplier(supplier, child, synchronise); | ||
163 | } | ||
164 | } | ||
165 | |||
166 | /** | ||
167 | * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is | ||
168 | * that this container is the home of the receiver, but it is not strictly necessary. | ||
169 | * | ||
170 | * @param desynchronise | ||
171 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
172 | */ | ||
173 | public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
174 | boolean desynchronise) { | ||
175 | if (!isLocal(receiver)) | ||
176 | receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise); | ||
177 | else { | ||
178 | Receiver child = resolveLocal(receiver); | ||
179 | disconnectRemoteSupplier(supplier, child, desynchronise); | ||
180 | } | ||
181 | } | ||
182 | |||
183 | /** | ||
184 | * Establishes connection between a remote supplier and a local receiver node. | ||
185 | * | ||
186 | * @param synchronise | ||
187 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
188 | */ | ||
189 | public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) { | ||
190 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
191 | if (synchronise) | ||
192 | connectAndSynchronize(parent, receiver); | ||
193 | else | ||
194 | connect(parent, receiver); | ||
195 | } | ||
196 | |||
197 | /** | ||
198 | * Severs connection between a remote supplier and a local receiver node. | ||
199 | * | ||
200 | * @param desynchronise | ||
201 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
202 | */ | ||
203 | public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, | ||
204 | boolean desynchronise) { | ||
205 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
206 | if (desynchronise) | ||
207 | disconnectAndDesynchronize(parent, receiver); | ||
208 | else | ||
209 | disconnect(parent, receiver); | ||
210 | } | ||
211 | |||
212 | /** | ||
213 | * Connects a receiver to a supplier | ||
214 | */ | ||
215 | public void connect(Supplier supplier, Receiver receiver) { | ||
216 | supplier.appendChild(receiver); | ||
217 | receiver.appendParent(supplier); | ||
218 | tracker.registerDependency(supplier, receiver); | ||
219 | } | ||
220 | |||
221 | /** | ||
222 | * Disconnects a receiver from a supplier | ||
223 | */ | ||
224 | public void disconnect(Supplier supplier, Receiver receiver) { | ||
225 | supplier.removeChild(receiver); | ||
226 | receiver.removeParent(supplier); | ||
227 | tracker.unregisterDependency(supplier, receiver); | ||
228 | } | ||
229 | |||
230 | /** | ||
231 | * @since 2.3 | ||
232 | */ | ||
233 | public boolean isExecutingDelayedCommands() { | ||
234 | return this.executingDelayedCommands; | ||
235 | } | ||
236 | |||
237 | /** | ||
238 | * @since 2.3 | ||
239 | */ | ||
240 | public Set<DelayedCommand> getDelayedCommandQueue() { | ||
241 | if (this.executingDelayedCommands) { | ||
242 | return this.delayedCommandBuffer; | ||
243 | } else { | ||
244 | return this.delayedCommandQueue; | ||
245 | } | ||
246 | } | ||
247 | |||
248 | /** | ||
249 | * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier | ||
250 | */ | ||
251 | public void connectAndSynchronize(Supplier supplier, Receiver receiver) { | ||
252 | supplier.appendChild(receiver); | ||
253 | receiver.appendParent(supplier); | ||
254 | tracker.registerDependency(supplier, receiver); | ||
255 | getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this)); | ||
256 | } | ||
257 | |||
258 | /** | ||
259 | * Disconnects a receiver from a supplier | ||
260 | */ | ||
261 | public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) { | ||
262 | final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver); | ||
263 | supplier.removeChild(receiver); | ||
264 | receiver.removeParent(supplier); | ||
265 | tracker.unregisterDependency(supplier, receiver); | ||
266 | getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC)); | ||
267 | } | ||
268 | |||
269 | /** | ||
270 | * @since 2.3 | ||
271 | */ | ||
272 | public void executeDelayedCommands() { | ||
273 | if (!this.delayedCommandQueue.isEmpty()) { | ||
274 | flushUpdates(); | ||
275 | this.executingDelayedCommands = true; | ||
276 | for (final DelayedCommand command : this.delayedCommandQueue) { | ||
277 | command.run(); | ||
278 | } | ||
279 | this.delayedCommandQueue = this.delayedCommandBuffer; | ||
280 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
281 | flushUpdates(); | ||
282 | this.executingDelayedCommands = false; | ||
283 | } | ||
284 | } | ||
285 | |||
286 | /** | ||
287 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
288 | * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre: | ||
289 | * address.container == this, e.g. address MUST be local | ||
290 | * | ||
291 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
292 | */ | ||
293 | long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) { | ||
294 | long timestamp; | ||
295 | Receiver receiver = resolveLocal(address); | ||
296 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
297 | synchronized (externalMessageLock) { | ||
298 | externalMessageQueue.add(message); | ||
299 | timestamp = clock; | ||
300 | externalMessageLock.notifyAll(); | ||
301 | } | ||
302 | |||
303 | return timestamp; | ||
304 | |||
305 | } | ||
306 | |||
307 | /** | ||
308 | * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial | ||
309 | * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other | ||
310 | * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty! | ||
311 | * | ||
312 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
313 | */ | ||
314 | long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction, | ||
315 | Collection<Tuple> updateElements) { | ||
316 | |||
317 | long timestamp; | ||
318 | Receiver receiver = resolveLocal(address); | ||
319 | // UpdateMessage message = new UpdateMessage(receiver, direction, | ||
320 | // updateElement); | ||
321 | synchronized (externalMessageLock) { | ||
322 | for (Tuple ps : updateElements) | ||
323 | externalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
324 | // messageQueue.add(new UpdateMessage(resolveLocal(address), | ||
325 | // direction, updateElement)); | ||
326 | // this.sendUpdateInternal(resolveLocal(address), direction, | ||
327 | // updateElement); | ||
328 | timestamp = clock; | ||
329 | externalMessageLock.notifyAll(); | ||
330 | } | ||
331 | |||
332 | return timestamp; | ||
333 | } | ||
334 | |||
335 | /** | ||
336 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
337 | * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any | ||
338 | * other way. | ||
339 | */ | ||
340 | void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
341 | Tuple updateElement) { | ||
342 | Receiver receiver = resolveLocal(address); | ||
343 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
344 | internalMessageQueue.add(message); | ||
345 | } | ||
346 | |||
347 | /** | ||
348 | * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The | ||
349 | * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT | ||
350 | * use in any other way. | ||
351 | * | ||
352 | * @pre: address.container == this, e.g. address MUST be local | ||
353 | */ | ||
354 | void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
355 | Collection<Tuple> updateElements) { | ||
356 | Receiver receiver = resolveLocal(address); | ||
357 | for (Tuple ps : updateElements) | ||
358 | internalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
359 | } | ||
360 | |||
361 | /** | ||
362 | * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to | ||
363 | * be called by RemoteReceivers, DO NOT use in any other way. | ||
364 | * | ||
365 | * @since 2.4 | ||
366 | */ | ||
367 | public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction, | ||
368 | Tuple updateElement) { | ||
369 | ReteContainer otherContainer = address.getContainer(); | ||
370 | long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement); | ||
371 | // Long criterion = terminationCriteria.get(otherContainer); | ||
372 | // if (criterion==null || otherClock > criterion) | ||
373 | terminationCriteria.put(otherContainer, otherClock); | ||
374 | } | ||
375 | |||
376 | /** | ||
377 | * Finalises all update sequences and returns. To be called from user threads (e.g. network construction). | ||
378 | */ | ||
379 | public void flushUpdates() { | ||
380 | network.waitForReteTermination(); | ||
381 | // synchronized (messageQueue) | ||
382 | // { | ||
383 | // while (!messageQueue.isEmpty()) | ||
384 | // { | ||
385 | // try { | ||
386 | // UpdateMessage message = messageQueue.take(); | ||
387 | // message.receiver.update(message.direction, message.updateElement); | ||
388 | // } catch (InterruptedException e) {} | ||
389 | // } | ||
390 | // } | ||
391 | } | ||
392 | |||
393 | /** | ||
394 | * Retrieves a safe copy of the contents of a supplier. | ||
395 | * | ||
396 | * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set. | ||
397 | * | ||
398 | * @param flush if true, a flush is performed before pulling the contents | ||
399 | * @since 2.3 | ||
400 | */ | ||
401 | public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) { | ||
402 | if (flush) { | ||
403 | flushUpdates(); | ||
404 | } | ||
405 | final Collection<Tuple> collector = new ArrayList<Tuple>(); | ||
406 | supplier.pullInto(collector, flush); | ||
407 | return collector; | ||
408 | } | ||
409 | |||
410 | /** | ||
411 | * @since 2.4 | ||
412 | */ | ||
413 | public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) { | ||
414 | if (flush) { | ||
415 | flushUpdates(); | ||
416 | } | ||
417 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
418 | supplier.pullIntoWithTimeline(collector, flush); | ||
419 | return collector; | ||
420 | } | ||
421 | |||
422 | /** | ||
423 | * Retrieves the contents of a SingleInputNode's parentage. | ||
424 | * | ||
425 | * @since 2.3 | ||
426 | */ | ||
427 | public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) { | ||
428 | if (flush) { | ||
429 | flushUpdates(); | ||
430 | } | ||
431 | final Collection<Tuple> collector = new LinkedList<Tuple>(); | ||
432 | supplier.propagatePullInto(collector, flush); | ||
433 | return collector; | ||
434 | } | ||
435 | |||
436 | /** | ||
437 | * Retrieves the timestamp-aware contents of a SingleInputNode's parentage. | ||
438 | * | ||
439 | * @since 2.3 | ||
440 | */ | ||
441 | public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier, | ||
442 | final boolean flush) { | ||
443 | if (flush) { | ||
444 | flushUpdates(); | ||
445 | } | ||
446 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
447 | supplier.propagatePullIntoWithTimestamp(collector, flush); | ||
448 | return collector; | ||
449 | } | ||
450 | |||
451 | /** | ||
452 | * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the | ||
453 | * supplier, but it is not strictly necessary. | ||
454 | * | ||
455 | * @param supplier | ||
456 | * the address of the supplier to be pulled. | ||
457 | * @since 2.3 | ||
458 | */ | ||
459 | public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) { | ||
460 | if (!isLocal(supplier)) | ||
461 | return supplier.getContainer().remotePull(supplier, flush); | ||
462 | return pullContents(resolveLocal(supplier), flush); | ||
463 | } | ||
464 | |||
465 | /** | ||
466 | * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to | ||
467 | * a remote or local caller. | ||
468 | */ | ||
469 | public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) { | ||
470 | if (!isLocal(production)) | ||
471 | return production.getContainer().remotePosMapping(production); | ||
472 | return resolveLocal(production).getPosMapping(); | ||
473 | } | ||
474 | |||
475 | /** | ||
476 | * Continually consumes update messages. Should be run on a dedicated thread. | ||
477 | */ | ||
478 | void messageConsumptionCycle() { | ||
479 | while (!killed) // deliver messages on and on and on.... | ||
480 | { | ||
481 | long incrementedClock = 0; | ||
482 | UpdateMessage message = null; | ||
483 | |||
484 | if (!internalMessageQueue.isEmpty()) // take internal messages first | ||
485 | message = internalMessageQueue.removeFirst(); | ||
486 | else | ||
487 | // no internal message, take an incoming message | ||
488 | synchronized (externalMessageLock) { // no sleeping allowed, | ||
489 | // because external | ||
490 | // queue is locked for | ||
491 | // precise clocking of | ||
492 | // termination point! | ||
493 | if (!externalMessageQueue.isEmpty()) { // if external queue | ||
494 | // is non-empty, | ||
495 | // retrieve the next | ||
496 | // message instantly | ||
497 | message = takeExternalMessage(); | ||
498 | } else { // if external queue is found empty (and this is | ||
499 | // the first time in a row) | ||
500 | incrementedClock = ++clock; // local termination point | ||
501 | // synchronized(clock){incrementedClock = ++clock;} | ||
502 | } | ||
503 | } | ||
504 | |||
505 | if (message == null) // both queues were empty | ||
506 | { | ||
507 | localUpdateTermination(incrementedClock); // report local | ||
508 | // termination point | ||
509 | while (message == null) // wait for a message while external | ||
510 | // queue is still empty | ||
511 | { | ||
512 | synchronized (externalMessageLock) { | ||
513 | while (externalMessageQueue.isEmpty()) { | ||
514 | try { | ||
515 | externalMessageLock.wait(); | ||
516 | } catch (InterruptedException e) { | ||
517 | if (killed) | ||
518 | return; | ||
519 | } | ||
520 | } | ||
521 | message = takeExternalMessage(); | ||
522 | } | ||
523 | |||
524 | } | ||
525 | } | ||
526 | |||
527 | // now we have a message to deliver | ||
528 | // NOTE: this method is not compatible with differential dataflow | ||
529 | message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO); | ||
530 | } | ||
531 | } | ||
532 | |||
533 | /** | ||
534 | * @since 1.6 | ||
535 | */ | ||
536 | public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0, | ||
537 | Math.min(30, input.toString().length())); | ||
538 | |||
539 | /** | ||
540 | * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker. | ||
541 | * | ||
542 | * @since 1.6 | ||
543 | */ | ||
544 | public void deliverMessagesSingleThreaded() { | ||
545 | if (!backendContext.areUpdatesDelayed()) { | ||
546 | if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) { | ||
547 | // known unreachable; enable for debugging only | ||
548 | |||
549 | CommunicationGroup lastGroup = null; | ||
550 | Set<CommunicationGroup> seenInThisCycle = new HashSet<>(); | ||
551 | |||
552 | while (!tracker.isEmpty()) { | ||
553 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
554 | |||
555 | /** | ||
556 | * The current group does not violate the communication schema iff (1) it was not seen before OR (2) | ||
557 | * the last one that was seen is exactly the same as the current one this can happen if the group | ||
558 | * was added back because of in-group message passing | ||
559 | */ | ||
560 | boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group); | ||
561 | |||
562 | if (!okGroup) { | ||
563 | logger.error( | ||
564 | "[INTERNAL ERROR] Violation of communication schema! The communication component with representative " | ||
565 | + group.getRepresentative() + " has already been processed!"); | ||
566 | } | ||
567 | |||
568 | group.deliverMessages(); | ||
569 | |||
570 | lastGroup = group; | ||
571 | } | ||
572 | |||
573 | } else { | ||
574 | while (!tracker.isEmpty()) { | ||
575 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
576 | group.deliverMessages(); | ||
577 | } | ||
578 | } | ||
579 | } | ||
580 | } | ||
581 | |||
582 | private void localUpdateTermination(long incrementedClock) { | ||
583 | network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria); | ||
584 | terminationCriteria.clear(); | ||
585 | |||
586 | // synchronized(clock){++clock;} // +1 incrementing for parity and easy | ||
587 | // comparison | ||
588 | } | ||
589 | |||
590 | // @pre: externalMessageQueue synchronized && nonempty | ||
591 | private UpdateMessage takeExternalMessage() { | ||
592 | UpdateMessage message = externalMessageQueue.removeFirst(); | ||
593 | if (!externalMessageQueue.isEmpty()) { // copy the whole queue over | ||
594 | // for speedup | ||
595 | Deque<UpdateMessage> temp = externalMessageQueue; | ||
596 | externalMessageQueue = internalMessageQueue; | ||
597 | internalMessageQueue = temp; | ||
598 | } | ||
599 | return message; | ||
600 | } | ||
601 | |||
602 | /** | ||
603 | * Provides an external address for the selected node. | ||
604 | * | ||
605 | * @pre node belongs to this container. | ||
606 | */ | ||
607 | public <N extends Node> Address<N> makeAddress(N node) { | ||
608 | return new Address<N>(node); | ||
609 | } | ||
610 | |||
611 | /** | ||
612 | * Checks whether a certain address points to a node at this container. | ||
613 | */ | ||
614 | public boolean isLocal(Address<? extends Node> address) { | ||
615 | return address.getContainer() == this; | ||
616 | } | ||
617 | |||
618 | /** | ||
619 | * Returns an addressed node at this container. | ||
620 | * | ||
621 | * @pre: address.container == this, e.g. address MUST be local | ||
622 | * @throws IllegalArgumentException | ||
623 | * if address is non-local | ||
624 | */ | ||
625 | @SuppressWarnings("unchecked") | ||
626 | public <N extends Node> N resolveLocal(Address<N> address) { | ||
627 | if (this != address.getContainer()) | ||
628 | throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this)); | ||
629 | |||
630 | N cached = address.getNodeCache(); | ||
631 | if (cached != null) | ||
632 | return cached; | ||
633 | else { | ||
634 | N node = (N) nodesById.get(address.getNodeId()); | ||
635 | address.setNodeCache(node); | ||
636 | return node; | ||
637 | } | ||
638 | } | ||
639 | |||
640 | /** | ||
641 | * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its | ||
642 | * constructor. | ||
643 | * | ||
644 | * @return the unique nodeId issued to the node. | ||
645 | */ | ||
646 | public long registerNode(Node n) { | ||
647 | long id = nextId++; | ||
648 | nodesById.put(id, n); | ||
649 | return id; | ||
650 | } | ||
651 | |||
652 | /** | ||
653 | * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or | ||
654 | * otherwise referenced. | ||
655 | */ | ||
656 | public void unregisterNode(Node n) { | ||
657 | nodesById.remove(n.getNodeId()); | ||
658 | } | ||
659 | |||
660 | /** | ||
661 | * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node. | ||
662 | */ | ||
663 | public void registerClearable(Clearable c) { | ||
664 | clearables.addFirst(c); | ||
665 | } | ||
666 | |||
667 | /** | ||
668 | * Unregisters a pattern memory from the rete network. | ||
669 | */ | ||
670 | public void unregisterClearable(Clearable c) { | ||
671 | clearables.remove(c); | ||
672 | } | ||
673 | |||
674 | /** | ||
675 | * Clears all memory contents in the network. Reverts to initial state. | ||
676 | */ | ||
677 | public void clearAll() { | ||
678 | for (Clearable c : clearables) { | ||
679 | c.clear(); | ||
680 | } | ||
681 | } | ||
682 | |||
683 | public NodeFactory getNodeFactory() { | ||
684 | return network.getNodeFactory(); | ||
685 | } | ||
686 | |||
687 | public ConnectionFactory getConnectionFactory() { | ||
688 | return connectionFactory; | ||
689 | } | ||
690 | |||
691 | public NodeProvisioner getProvisioner() { | ||
692 | return nodeProvisioner; | ||
693 | } | ||
694 | |||
695 | public Network getNetwork() { | ||
696 | return network; | ||
697 | } | ||
698 | |||
699 | @Override | ||
700 | public String toString() { | ||
701 | StringBuilder sb = new StringBuilder(); | ||
702 | String separator = System.getProperty("line.separator"); | ||
703 | sb.append(super.toString() + "[[[" + separator); | ||
704 | java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet()); | ||
705 | java.util.Collections.sort(keys); | ||
706 | for (Long key : keys) { | ||
707 | sb.append(key + " -> " + nodesById.get(key) + separator); | ||
708 | } | ||
709 | sb.append("]]] of " + network); | ||
710 | return sb.toString(); | ||
711 | } | ||
712 | |||
713 | /** | ||
714 | * Access all the Rete nodes inside this container. | ||
715 | * | ||
716 | * @return the collection of {@link Node} instances | ||
717 | */ | ||
718 | public Collection<Node> getAllNodes() { | ||
719 | return nodesById.values(); | ||
720 | } | ||
721 | |||
722 | public InputConnector getInputConnectionFactory() { | ||
723 | return network.getInputConnector(); | ||
724 | } | ||
725 | |||
726 | public void checkCancelled() { | ||
727 | cancellationToken.checkCancelled(); | ||
728 | } | ||
729 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java new file mode 100644 index 00000000..7dc7c4bc --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/StandardNode.java | |||
@@ -0,0 +1,123 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | |||
11 | package tools.refinery.viatra.runtime.rete.network; | ||
12 | |||
13 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
14 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
15 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
16 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
17 | import tools.refinery.viatra.runtime.rete.index.GenericProjectionIndexer; | ||
18 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
20 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
21 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
22 | |||
23 | import java.util.Collection; | ||
24 | import java.util.HashSet; | ||
25 | import java.util.List; | ||
26 | import java.util.Set; | ||
27 | |||
28 | /** | ||
29 | * Base implementation for a supplier node. | ||
30 | * | ||
31 | * @author Gabor Bergmann | ||
32 | * | ||
33 | */ | ||
34 | public abstract class StandardNode extends BaseNode implements Supplier, NetworkStructureChangeSensitiveNode { | ||
35 | protected final List<Receiver> children = CollectionsFactory.createObserverList(); | ||
36 | /** | ||
37 | * @since 2.2 | ||
38 | */ | ||
39 | protected final List<Mailbox> childMailboxes = CollectionsFactory.createObserverList(); | ||
40 | |||
41 | public StandardNode(final ReteContainer reteContainer) { | ||
42 | super(reteContainer); | ||
43 | } | ||
44 | |||
45 | /** | ||
46 | * @since 2.4 | ||
47 | */ | ||
48 | protected void propagateUpdate(final Direction direction, final Tuple updateElement, final Timestamp timestamp) { | ||
49 | reteContainer.checkCancelled(); | ||
50 | for (final Mailbox childMailbox : childMailboxes) { | ||
51 | childMailbox.postMessage(direction, updateElement, timestamp); | ||
52 | } | ||
53 | } | ||
54 | |||
55 | @Override | ||
56 | public void appendChild(final Receiver receiver) { | ||
57 | children.add(receiver); | ||
58 | childMailboxes.add(this.getCommunicationTracker().proxifyMailbox(this, receiver.getMailbox())); | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public void removeChild(final Receiver receiver) { | ||
63 | children.remove(receiver); | ||
64 | Mailbox mailboxToRemove = null; | ||
65 | for (final Mailbox mailbox : childMailboxes) { | ||
66 | if (mailbox.getReceiver() == receiver) { | ||
67 | mailboxToRemove = mailbox; | ||
68 | break; | ||
69 | } | ||
70 | } | ||
71 | assert mailboxToRemove != null; | ||
72 | childMailboxes.remove(mailboxToRemove); | ||
73 | } | ||
74 | |||
75 | @Override | ||
76 | public void networkStructureChanged() { | ||
77 | childMailboxes.clear(); | ||
78 | for (final Receiver receiver : children) { | ||
79 | childMailboxes.add(this.getCommunicationTracker().proxifyMailbox(this, receiver.getMailbox())); | ||
80 | } | ||
81 | } | ||
82 | |||
83 | @Override | ||
84 | public Collection<Receiver> getReceivers() { | ||
85 | return children; | ||
86 | } | ||
87 | |||
88 | /** | ||
89 | * @since 2.2 | ||
90 | */ | ||
91 | public Collection<Mailbox> getChildMailboxes() { | ||
92 | return this.childMailboxes; | ||
93 | } | ||
94 | |||
95 | @Override | ||
96 | public Set<Tuple> getPulledContents(final boolean flush) { | ||
97 | final HashSet<Tuple> results = new HashSet<Tuple>(); | ||
98 | pullInto(results, flush); | ||
99 | return results; | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public ProjectionIndexer constructIndex(final TupleMask mask, final TraceInfo... traces) { | ||
104 | final GenericProjectionIndexer indexer = new GenericProjectionIndexer(reteContainer, mask); | ||
105 | for (final TraceInfo traceInfo : traces) { | ||
106 | indexer.assignTraceInfo(traceInfo); | ||
107 | } | ||
108 | reteContainer.connectAndSynchronize(this, indexer); | ||
109 | return indexer; | ||
110 | } | ||
111 | |||
112 | /** | ||
113 | * @since 1.6 | ||
114 | */ | ||
115 | protected void issueError(final String message, final Exception ex) { | ||
116 | if (ex == null) { | ||
117 | this.reteContainer.getNetwork().getEngine().getLogger().error(message); | ||
118 | } else { | ||
119 | this.reteContainer.getNetwork().getEngine().getLogger().error(message, ex); | ||
120 | } | ||
121 | } | ||
122 | |||
123 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java new file mode 100644 index 00000000..1917a7cf --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Supplier.java | |||
@@ -0,0 +1,82 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.Collection; | ||
13 | import java.util.Map; | ||
14 | import java.util.Set; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
19 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
21 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
22 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
23 | |||
24 | /** | ||
25 | * @author Gabor Bergmann | ||
26 | * | ||
27 | * A supplier is an object that can propagate insert or revoke events towards receivers. | ||
28 | */ | ||
29 | public interface Supplier extends Node { | ||
30 | |||
31 | /** | ||
32 | * Pulls the contents of this object in this particular moment into a target collection. | ||
33 | * | ||
34 | * @param flush if true, flushing of messages is allowed during the pull, otherwise flushing is not allowed | ||
35 | * @since 2.3 | ||
36 | */ | ||
37 | public void pullInto(Collection<Tuple> collector, boolean flush); | ||
38 | |||
39 | /** | ||
40 | * @since 2.4 | ||
41 | */ | ||
42 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush); | ||
43 | |||
44 | /** | ||
45 | * Returns the contents of this object in this particular moment. | ||
46 | * For memoryless nodes, this may involve a costly recomputation of contents. | ||
47 | * | ||
48 | * The result is returned as a Set, even when it has multiplicities (at the output of {@link TrimmerNode}). | ||
49 | * | ||
50 | * <p> Intended mainly for debug purposes; therefore flushing is performed only if explicitly requested | ||
51 | * During runtime, flushing may be preferred; see {@link ReteContainer#pullContents(Supplier)} | ||
52 | * @since 2.3 | ||
53 | */ | ||
54 | public Set<Tuple> getPulledContents(boolean flush); | ||
55 | |||
56 | default public Set<Tuple> getPulledContents() { | ||
57 | return getPulledContents(true); | ||
58 | } | ||
59 | |||
60 | /** | ||
61 | * appends a receiver that will continously receive insert and revoke updates from this supplier | ||
62 | */ | ||
63 | void appendChild(Receiver receiver); | ||
64 | |||
65 | /** | ||
66 | * removes a receiver | ||
67 | */ | ||
68 | void removeChild(Receiver receiver); | ||
69 | |||
70 | /** | ||
71 | * Instantiates (or reuses, depending on implementation) an index according to the given mask. | ||
72 | * | ||
73 | * Intended for internal use; clients should invoke through Library instead to enable reusing. | ||
74 | */ | ||
75 | ProjectionIndexer constructIndex(TupleMask mask, TraceInfo... traces); | ||
76 | |||
77 | /** | ||
78 | * lists receivers | ||
79 | */ | ||
80 | Collection<Receiver> getReceivers(); | ||
81 | |||
82 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java new file mode 100644 index 00000000..f238f47b --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Tunnel.java | |||
@@ -0,0 +1,19 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | /** | ||
13 | * @author Gabor Bergmann | ||
14 | * | ||
15 | * A Tunnel is an interface into which elments can be instered and from which productions can be extracted. | ||
16 | */ | ||
17 | public interface Tunnel extends Supplier, Receiver { | ||
18 | |||
19 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java new file mode 100644 index 00000000..1334a3a9 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/UpdateMessage.java | |||
@@ -0,0 +1,31 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
14 | |||
15 | class UpdateMessage { | ||
16 | public Receiver receiver; | ||
17 | public Direction direction; | ||
18 | public Tuple updateElement; | ||
19 | |||
20 | public UpdateMessage(Receiver receiver, Direction direction, Tuple updateElement) { | ||
21 | this.receiver = receiver; | ||
22 | this.direction = direction; | ||
23 | this.updateElement = updateElement; | ||
24 | } | ||
25 | |||
26 | @Override | ||
27 | public String toString() { | ||
28 | return "M." + direction + ": " + updateElement + " -> " + receiver; | ||
29 | } | ||
30 | |||
31 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java new file mode 100644 index 00000000..8cedeb11 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java | |||
@@ -0,0 +1,103 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
15 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
16 | |||
17 | /** | ||
18 | * A communication group represents a set of nodes in the communication graph that form a strongly connected component. | ||
19 | * | ||
20 | * @author Tamas Szabo | ||
21 | * @since 1.6 | ||
22 | */ | ||
23 | public abstract class CommunicationGroup implements Comparable<CommunicationGroup> { | ||
24 | |||
25 | public static final String UNSUPPORTED_MESSAGE_KIND = "Unsupported message kind "; | ||
26 | |||
27 | /** | ||
28 | * Marker for the {@link CommunicationTracker} | ||
29 | */ | ||
30 | public boolean isEnqueued = false; | ||
31 | |||
32 | protected final Node representative; | ||
33 | |||
34 | /** | ||
35 | * May be changed during bumping in {@link CommunicationTracker.registerDependency} | ||
36 | */ | ||
37 | protected int identifier; | ||
38 | |||
39 | /** | ||
40 | * @since 1.7 | ||
41 | */ | ||
42 | protected final CommunicationTracker tracker; | ||
43 | |||
44 | /** | ||
45 | * @since 1.7 | ||
46 | */ | ||
47 | public CommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
48 | this.tracker = tracker; | ||
49 | this.representative = representative; | ||
50 | this.identifier = identifier; | ||
51 | } | ||
52 | |||
53 | public abstract void deliverMessages(); | ||
54 | |||
55 | public Node getRepresentative() { | ||
56 | return representative; | ||
57 | } | ||
58 | |||
59 | public abstract boolean isEmpty(); | ||
60 | |||
61 | /** | ||
62 | * @since 2.0 | ||
63 | */ | ||
64 | public abstract void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind); | ||
65 | |||
66 | /** | ||
67 | * @since 2.0 | ||
68 | */ | ||
69 | public abstract void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind); | ||
70 | |||
71 | public abstract Map<MessageSelector, Collection<Mailbox>> getMailboxes(); | ||
72 | |||
73 | public abstract boolean isRecursive(); | ||
74 | |||
75 | @Override | ||
76 | public int hashCode() { | ||
77 | return this.identifier; | ||
78 | } | ||
79 | |||
80 | @Override | ||
81 | public String toString() { | ||
82 | return this.getClass().getSimpleName() + " " + this.identifier + " - representative: " + this.representative | ||
83 | + " - isEmpty: " + isEmpty(); | ||
84 | } | ||
85 | |||
86 | @Override | ||
87 | public boolean equals(final Object obj) { | ||
88 | if (obj == null || this.getClass() != obj.getClass()) { | ||
89 | return false; | ||
90 | } else if (this == obj) { | ||
91 | return true; | ||
92 | } else { | ||
93 | final CommunicationGroup that = (CommunicationGroup) obj; | ||
94 | return this.identifier == that.identifier; | ||
95 | } | ||
96 | } | ||
97 | |||
98 | @Override | ||
99 | public int compareTo(final CommunicationGroup that) { | ||
100 | return this.identifier - that.identifier; | ||
101 | } | ||
102 | |||
103 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java new file mode 100644 index 00000000..d244e644 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java | |||
@@ -0,0 +1,467 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.HashMap; | ||
12 | import java.util.HashSet; | ||
13 | import java.util.List; | ||
14 | import java.util.Map; | ||
15 | import java.util.PriorityQueue; | ||
16 | import java.util.Queue; | ||
17 | import java.util.Set; | ||
18 | |||
19 | import tools.refinery.viatra.runtime.rete.itc.alg.incscc.IncSCCAlg; | ||
20 | import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting; | ||
21 | import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph; | ||
22 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
23 | import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode; | ||
24 | import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode; | ||
25 | import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode; | ||
26 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
27 | import tools.refinery.viatra.runtime.rete.index.ExistenceNode; | ||
28 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
29 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
30 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
31 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer; | ||
32 | import tools.refinery.viatra.runtime.rete.network.IGroupable; | ||
33 | import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; | ||
34 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
35 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
36 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
37 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
38 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyIndexerListenerProxy; | ||
39 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy; | ||
40 | import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox; | ||
41 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
42 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
43 | import tools.refinery.viatra.runtime.rete.single.TransitiveClosureNode; | ||
44 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
45 | |||
46 | /** | ||
47 | * An instance of this class is associated with every {@link ReteContainer}. The tracker serves two purposes: <br> | ||
48 | * (1) It allows RETE nodes to register their communication dependencies on-the-fly. These dependencies can be | ||
49 | * registered or unregistered when nodes are disposed of. <br> | ||
50 | * (2) It allows RETE nodes to register their mailboxes as dirty, that is, they can tell the tracker that they have | ||
51 | * something to send to other nodes in the network. The tracker is then responsible for ordering these messages (more | ||
52 | * precisely, the mailboxes that contain the messages) for the associated {@link ReteContainer}. The ordering is | ||
53 | * governed by the strongly connected components in the dependency network and follows a topological sorting scheme; | ||
54 | * those mailboxes will be emptied first whose owner nodes do not depend on other undelivered messages. | ||
55 | * | ||
56 | * @author Tamas Szabo | ||
57 | * @since 1.6 | ||
58 | * | ||
59 | */ | ||
60 | public abstract class CommunicationTracker { | ||
61 | |||
62 | /** | ||
63 | * The minimum group id assigned so far | ||
64 | */ | ||
65 | protected int minGroupId; | ||
66 | |||
67 | /** | ||
68 | * The maximum group id assigned so far | ||
69 | */ | ||
70 | protected int maxGroupId; | ||
71 | |||
72 | /** | ||
73 | * The dependency graph of the communications in the RETE network | ||
74 | */ | ||
75 | protected final Graph<Node> dependencyGraph; | ||
76 | |||
77 | /** | ||
78 | * Incremental SCC information about the dependency graph | ||
79 | */ | ||
80 | protected final IncSCCAlg<Node> sccInformationProvider; | ||
81 | |||
82 | /** | ||
83 | * Precomputed node -> communication group map | ||
84 | */ | ||
85 | protected final Map<Node, CommunicationGroup> groupMap; | ||
86 | |||
87 | /** | ||
88 | * Priority queue of active communication groups | ||
89 | */ | ||
90 | protected final Queue<CommunicationGroup> groupQueue; | ||
91 | |||
92 | // groups should have a simple integer flag which represents its position in a priority queue | ||
93 | // priority queue only contains the ACTIVE groups | ||
94 | |||
95 | public CommunicationTracker() { | ||
96 | this.dependencyGraph = new Graph<Node>(); | ||
97 | this.sccInformationProvider = new IncSCCAlg<Node>(this.dependencyGraph); | ||
98 | this.groupQueue = new PriorityQueue<CommunicationGroup>(); | ||
99 | this.groupMap = new HashMap<Node, CommunicationGroup>(); | ||
100 | } | ||
101 | |||
102 | public Graph<Node> getDependencyGraph() { | ||
103 | return dependencyGraph; | ||
104 | } | ||
105 | |||
106 | public CommunicationGroup getGroup(final Node node) { | ||
107 | return this.groupMap.get(node); | ||
108 | } | ||
109 | |||
110 | private void precomputeGroups() { | ||
111 | groupMap.clear(); | ||
112 | |||
113 | // reconstruct group map from dependency graph | ||
114 | final Graph<Node> reducedGraph = sccInformationProvider.getReducedGraph(); | ||
115 | final List<Node> representatives = TopologicalSorting.compute(reducedGraph); | ||
116 | |||
117 | for (int i = 0; i < representatives.size(); i++) { // groups for SCC representatives | ||
118 | final Node representative = representatives.get(i); | ||
119 | createAndStoreGroup(representative, i); | ||
120 | } | ||
121 | |||
122 | minGroupId = 0; | ||
123 | maxGroupId = representatives.size() - 1; | ||
124 | |||
125 | for (final Node node : dependencyGraph.getAllNodes()) { // extend group map to the rest of nodes | ||
126 | final Node representative = sccInformationProvider.getRepresentative(node); | ||
127 | final CommunicationGroup group = groupMap.get(representative); | ||
128 | if (representative != node) { | ||
129 | addToGroup(node, group); | ||
130 | } | ||
131 | } | ||
132 | |||
133 | for (final Node node : dependencyGraph.getAllNodes()) { | ||
134 | // set fall-through flags of default mailboxes | ||
135 | precomputeFallThroughFlag(node); | ||
136 | // perform further tracker-specific post-processing | ||
137 | postProcessNode(node); | ||
138 | } | ||
139 | |||
140 | // reconstruct new queue contents based on new group map | ||
141 | if (!groupQueue.isEmpty()) { | ||
142 | final Set<CommunicationGroup> oldActiveGroups = new HashSet<CommunicationGroup>(groupQueue); | ||
143 | groupQueue.clear(); | ||
144 | reconstructQueueContents(oldActiveGroups); | ||
145 | } | ||
146 | |||
147 | // post process the groups | ||
148 | for (final CommunicationGroup group : groupMap.values()) { | ||
149 | postProcessGroup(group); | ||
150 | } | ||
151 | } | ||
152 | |||
153 | /** | ||
154 | * This method is responsible for reconstructing the active queue contents after the network structure has changed. | ||
155 | * It it defined as abstract because the reconstruction logic is specific to each {@link CommunicationTracker}. | ||
156 | * @since 2.4 | ||
157 | */ | ||
158 | protected abstract void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups); | ||
159 | |||
160 | private void addToGroup(final Node node, final CommunicationGroup group) { | ||
161 | groupMap.put(node, group); | ||
162 | if (node instanceof Receiver) { | ||
163 | ((Receiver) node).getMailbox().setCurrentGroup(group); | ||
164 | if (node instanceof IGroupable) { | ||
165 | ((IGroupable) node).setCurrentGroup(group); | ||
166 | } | ||
167 | } | ||
168 | } | ||
169 | |||
170 | /** | ||
171 | * Depends on the groups, as well as the parent nodes of the argument, so recomputation is needed if these change | ||
172 | */ | ||
173 | private void precomputeFallThroughFlag(final Node node) { | ||
174 | CommunicationGroup group = groupMap.get(node); | ||
175 | if (node instanceof Receiver) { | ||
176 | IGroupable mailbox = ((Receiver) node).getMailbox(); | ||
177 | if (mailbox instanceof FallThroughCapableMailbox) { | ||
178 | Set<Node> directParents = dependencyGraph.getSourceNodes(node).distinctValues(); | ||
179 | // decide between using quick&cheap fall-through, or allowing for update cancellation | ||
180 | boolean fallThrough = | ||
181 | // disallow fallthrough: updates at production nodes should cancel, if they can be trimmed or | ||
182 | // disjunctive | ||
183 | (!(node instanceof ProductionNode && ( // it is a production node... | ||
184 | // with more than one parent | ||
185 | directParents.size() > 0 || | ||
186 | // or true trimming in its sole parent | ||
187 | directParents.size() == 1 && trueTrimming(directParents.iterator().next())))) && | ||
188 | // disallow fallthrough: external updates should be stored (if updates are delayed) | ||
189 | (!(node instanceof ExternalInputEnumeratorNode)) && | ||
190 | // disallow fallthrough: RelationEvaluatorNode needs to be notified in batch-style, and the batching is done by the mailbox | ||
191 | // however, it is not the RelationEvaluatorNode itself that is interesting here, as that indirectly uses the BatchingReceiver | ||
192 | // so we need to disable fall-through for the BatchingReceiver | ||
193 | (!(node instanceof RelationEvaluatorNode.BatchingReceiver)); | ||
194 | // do additional checks | ||
195 | if (fallThrough) { | ||
196 | // recursive parent groups generate excess updates that should be cancelled after delete&rederive | ||
197 | // phases | ||
198 | // aggregator and transitive closure parent nodes also generate excess updates that should be | ||
199 | // cancelled | ||
200 | directParentLoop: for (Node directParent : directParents) { | ||
201 | Set<Node> parentsToCheck = new HashSet<>(); | ||
202 | // check the case where a direct parent is the reason for mailbox usage | ||
203 | parentsToCheck.add(directParent); | ||
204 | // check the case where an indirect parent (join slot) is the reason for mailbox usage | ||
205 | if (directParent instanceof DualInputNode) { | ||
206 | // in case of existence join (typically antijoin), a mailbox should allow | ||
207 | // an insertion and deletion (at the secondary slot) to cancel each other out | ||
208 | if (directParent instanceof ExistenceNode) { | ||
209 | fallThrough = false; | ||
210 | break directParentLoop; | ||
211 | } | ||
212 | // in beta nodes, indexer slots (or their active nodes) are considered indirect parents | ||
213 | DualInputNode dualInput = (DualInputNode) directParent; | ||
214 | IterableIndexer primarySlot = dualInput.getPrimarySlot(); | ||
215 | if (primarySlot != null) | ||
216 | parentsToCheck.add(primarySlot.getActiveNode()); | ||
217 | Indexer secondarySlot = dualInput.getSecondarySlot(); | ||
218 | if (secondarySlot != null) | ||
219 | parentsToCheck.add(secondarySlot.getActiveNode()); | ||
220 | } | ||
221 | for (Node parent : parentsToCheck) { | ||
222 | CommunicationGroup parentGroup = groupMap.get(parent); | ||
223 | if ( // parent is in a different, recursive group | ||
224 | (group != parentGroup && parentGroup.isRecursive()) || | ||
225 | // node and parent within the same recursive group, and... | ||
226 | (group == parentGroup && group.isRecursive() && ( | ||
227 | // parent is a transitive closure or aggregator node, or a trimmer | ||
228 | // allow trimmed or disjunctive tuple updates to cancel each other | ||
229 | (parent instanceof TransitiveClosureNode) || (parent instanceof IAggregatorNode) | ||
230 | || trueTrimming(parent)))) { | ||
231 | fallThrough = false; | ||
232 | break directParentLoop; | ||
233 | } | ||
234 | } | ||
235 | } | ||
236 | } | ||
237 | // overwrite fallthrough flag with newly computed value | ||
238 | ((FallThroughCapableMailbox) mailbox).setFallThrough(fallThrough); | ||
239 | } | ||
240 | } | ||
241 | } | ||
242 | |||
243 | /** | ||
244 | * A trimmer node that actually eliminates some columns (not just reorders) | ||
245 | */ | ||
246 | private boolean trueTrimming(Node node) { | ||
247 | if (node instanceof TrimmerNode) { | ||
248 | TupleMask mask = ((TrimmerNode) node).getMask(); | ||
249 | return (mask.indices.length != mask.sourceWidth); | ||
250 | } | ||
251 | return false; | ||
252 | } | ||
253 | |||
254 | public void activateUnenqueued(final CommunicationGroup group) { | ||
255 | groupQueue.add(group); | ||
256 | group.isEnqueued = true; | ||
257 | } | ||
258 | |||
259 | public void deactivate(final CommunicationGroup group) { | ||
260 | groupQueue.remove(group); | ||
261 | group.isEnqueued = false; | ||
262 | } | ||
263 | |||
264 | public CommunicationGroup getAndRemoveFirstGroup() { | ||
265 | final CommunicationGroup group = groupQueue.poll(); | ||
266 | group.isEnqueued = false; | ||
267 | return group; | ||
268 | } | ||
269 | |||
270 | public boolean isEmpty() { | ||
271 | return groupQueue.isEmpty(); | ||
272 | } | ||
273 | |||
274 | protected abstract CommunicationGroup createGroup(final Node representative, final int index); | ||
275 | |||
276 | protected CommunicationGroup createAndStoreGroup(final Node representative, final int index) { | ||
277 | final CommunicationGroup group = createGroup(representative, index); | ||
278 | addToGroup(representative, group); | ||
279 | return group; | ||
280 | } | ||
281 | |||
282 | /** | ||
283 | * Registers the dependency that the target {@link Node} depends on the source {@link Node}. In other words, source | ||
284 | * may send messages to target in the RETE network. If the dependency edge is already present, this method call is a | ||
285 | * noop. | ||
286 | * | ||
287 | * @param source | ||
288 | * the source node | ||
289 | * @param target | ||
290 | * the target node | ||
291 | */ | ||
292 | public void registerDependency(final Node source, final Node target) { | ||
293 | // nodes can be immediately inserted, if they already exist in the graph, this is a noop | ||
294 | dependencyGraph.insertNode(source); | ||
295 | dependencyGraph.insertNode(target); | ||
296 | |||
297 | if (!this.dependencyGraph.getTargetNodes(source).containsNonZero(target)) { | ||
298 | |||
299 | // query all these information before the actual edge insertion | ||
300 | // because SCCs may be unified during the process | ||
301 | final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); | ||
302 | final Node targetRepresentative = sccInformationProvider.getRepresentative(target); | ||
303 | final boolean targetHadOutgoingEdges = sccInformationProvider.hasOutgoingEdges(targetRepresentative); | ||
304 | |||
305 | // insert the edge | ||
306 | dependencyGraph.insertEdge(source, target); | ||
307 | |||
308 | // create groups if they do not yet exist | ||
309 | CommunicationGroup sourceGroup = groupMap.get(sourceRepresentative); | ||
310 | if (sourceGroup == null) { | ||
311 | // create on-demand with the next smaller group id | ||
312 | sourceGroup = createAndStoreGroup(sourceRepresentative, --minGroupId); | ||
313 | } | ||
314 | final int sourceIndex = sourceGroup.identifier; | ||
315 | |||
316 | CommunicationGroup targetGroup = groupMap.get(targetRepresentative); | ||
317 | if (targetGroup == null) { | ||
318 | // create on-demand with the next larger group id | ||
319 | targetGroup = createAndStoreGroup(targetRepresentative, ++maxGroupId); | ||
320 | } | ||
321 | final int targetIndex = targetGroup.identifier; | ||
322 | |||
323 | if (sourceIndex <= targetIndex) { | ||
324 | // indices obey current topological ordering | ||
325 | refreshFallThroughFlag(target); | ||
326 | postProcessNode(source); | ||
327 | postProcessNode(target); | ||
328 | postProcessGroup(sourceGroup); | ||
329 | if (sourceGroup != targetGroup) { | ||
330 | postProcessGroup(targetGroup); | ||
331 | } | ||
332 | } else if (sourceIndex > targetIndex && !targetHadOutgoingEdges) { | ||
333 | // indices violate current topological ordering, but we can simply bump the target index | ||
334 | final boolean wasEnqueued = targetGroup.isEnqueued; | ||
335 | if (wasEnqueued) { | ||
336 | groupQueue.remove(targetGroup); | ||
337 | } | ||
338 | targetGroup.identifier = ++maxGroupId; | ||
339 | if (wasEnqueued) { | ||
340 | groupQueue.add(targetGroup); | ||
341 | } | ||
342 | |||
343 | refreshFallThroughFlag(target); | ||
344 | postProcessNode(source); | ||
345 | postProcessNode(target); | ||
346 | postProcessGroup(sourceGroup); | ||
347 | postProcessGroup(targetGroup); | ||
348 | } else { | ||
349 | // needs a full re-computation because of more complex change | ||
350 | precomputeGroups(); | ||
351 | } | ||
352 | } | ||
353 | } | ||
354 | |||
355 | /** | ||
356 | * Returns true if the given {@link Node} is in a recursive {@link CommunicationGroup}, false otherwise. | ||
357 | */ | ||
358 | public boolean isInRecursiveGroup(final Node node) { | ||
359 | final CommunicationGroup group = this.getGroup(node); | ||
360 | if (group == null) { | ||
361 | return false; | ||
362 | } else { | ||
363 | return group.isRecursive(); | ||
364 | } | ||
365 | } | ||
366 | |||
367 | /** | ||
368 | * Returns true if the given two {@link Node}s are in the same {@link CommunicationGroup}. | ||
369 | */ | ||
370 | public boolean areInSameGroup(final Node left, final Node right) { | ||
371 | final CommunicationGroup leftGroup = this.getGroup(left); | ||
372 | final CommunicationGroup rightGroup = this.getGroup(right); | ||
373 | return leftGroup != null && leftGroup == rightGroup; | ||
374 | } | ||
375 | |||
376 | /** | ||
377 | * Unregisters a dependency between source and target. | ||
378 | * | ||
379 | * @param source | ||
380 | * the source node | ||
381 | * @param target | ||
382 | * the target node | ||
383 | */ | ||
384 | public void unregisterDependency(final Node source, final Node target) { | ||
385 | // delete the edge first, and then query the SCC info provider | ||
386 | this.dependencyGraph.deleteEdgeIfExists(source, target); | ||
387 | |||
388 | final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); | ||
389 | final Node targetRepresentative = sccInformationProvider.getRepresentative(target); | ||
390 | |||
391 | // if they are still in the same SCC, | ||
392 | // then this deletion did not affect the SCCs, | ||
393 | // and it is sufficient to recompute affected fall-through flags; | ||
394 | // otherwise, we need a new pre-computation for the groupMap and groupQueue | ||
395 | if (sourceRepresentative.equals(targetRepresentative)) { | ||
396 | // this deletion could not have affected the split flags | ||
397 | refreshFallThroughFlag(target); | ||
398 | postProcessNode(source); | ||
399 | postProcessNode(target); | ||
400 | } else { | ||
401 | // preComputeGroups takes care of the split flag maintenance | ||
402 | precomputeGroups(); | ||
403 | } | ||
404 | } | ||
405 | |||
406 | /** | ||
407 | * Refresh fall-through flags if dependencies change for given target, but no SCC change | ||
408 | */ | ||
409 | private void refreshFallThroughFlag(final Node target) { | ||
410 | precomputeFallThroughFlag(target); | ||
411 | if (target instanceof DualInputNode) { | ||
412 | for (final Node indirectTarget : dependencyGraph.getTargetNodes(target).distinctValues()) { | ||
413 | precomputeFallThroughFlag(indirectTarget); | ||
414 | } | ||
415 | } | ||
416 | } | ||
417 | |||
418 | /** | ||
419 | * Returns true if the given source-target edge in the communication network acts as a recursion cut point. | ||
420 | * The current implementation considers edges leading into {@link ProductionNode}s as cut point iff | ||
421 | * both source and target belong to the same group. | ||
422 | * | ||
423 | * @param source the source node | ||
424 | * @param target the target node | ||
425 | * @return true if the edge is a cut point, false otherwise | ||
426 | * @since 2.4 | ||
427 | */ | ||
428 | protected boolean isRecursionCutPoint(final Node source, final Node target) { | ||
429 | final Node effectiveSource = source instanceof SpecializedProjectionIndexer | ||
430 | ? ((SpecializedProjectionIndexer) source).getActiveNode() | ||
431 | : source; | ||
432 | final CommunicationGroup sourceGroup = this.getGroup(effectiveSource); | ||
433 | final CommunicationGroup targetGroup = this.getGroup(target); | ||
434 | return sourceGroup != null && sourceGroup == targetGroup && target instanceof ProductionNode; | ||
435 | } | ||
436 | |||
437 | /** | ||
438 | * This hook allows concrete tracker implementations to perform tracker-specific post processing on nodes (cf. | ||
439 | * {@link NetworkStructureChangeSensitiveNode} and {@link BehaviorChangingMailbox}). At the time of the invocation, | ||
440 | * the network topology has already been updated. | ||
441 | */ | ||
442 | protected abstract void postProcessNode(final Node node); | ||
443 | |||
444 | /** | ||
445 | * This hook allows concrete tracker implementations to perform tracker-specific post processing on groups. At the | ||
446 | * time of the invocation, the network topology has already been updated. | ||
447 | * @since 2.4 | ||
448 | */ | ||
449 | protected abstract void postProcessGroup(final CommunicationGroup group); | ||
450 | |||
451 | /** | ||
452 | * Creates a proxy for the given {@link Mailbox} for the given requester {@link Node}. The proxy creation is | ||
453 | * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used | ||
454 | * to create {@link TimelyMailboxProxy}s depending on the network topology. There is no guarantee that the same | ||
455 | * proxy instance is returned when this method is called multiple times with the same arguments. | ||
456 | */ | ||
457 | public abstract Mailbox proxifyMailbox(final Node requester, final Mailbox original); | ||
458 | |||
459 | /** | ||
460 | * Creates a proxy for the given {@link IndexerListener} for the given requester {@link Node}. The proxy creation is | ||
461 | * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used | ||
462 | * to create {@link TimelyIndexerListenerProxy}s depending on the network topology. There is no guarantee that the | ||
463 | * same proxy instance is returned when this method is called multiple times with the same arguments. | ||
464 | */ | ||
465 | public abstract IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original); | ||
466 | |||
467 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java new file mode 100644 index 00000000..e1a61693 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java | |||
@@ -0,0 +1,19 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | /** | ||
12 | * Subclasses of this interface represent meta data of update messages in Rete. | ||
13 | * | ||
14 | * @author Tamas Szabo | ||
15 | * @since 2.3 | ||
16 | */ | ||
17 | public interface MessageSelector { | ||
18 | |||
19 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java new file mode 100644 index 00000000..27779352 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java | |||
@@ -0,0 +1,32 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.Comparator; | ||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
15 | |||
16 | /** | ||
17 | * @since 2.4 | ||
18 | */ | ||
19 | public class NodeComparator implements Comparator<Node> { | ||
20 | |||
21 | protected final Map<Node, Integer> nodeMap; | ||
22 | |||
23 | public NodeComparator(final Map<Node, Integer> nodeMap) { | ||
24 | this.nodeMap = nodeMap; | ||
25 | } | ||
26 | |||
27 | @Override | ||
28 | public int compare(final Node left, final Node right) { | ||
29 | return this.nodeMap.get(left) - this.nodeMap.get(right); | ||
30 | } | ||
31 | |||
32 | } \ No newline at end of file | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java new file mode 100644 index 00000000..41cd8cd3 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java | |||
@@ -0,0 +1,34 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | /** | ||
12 | * A default message selector that can be used to associate phases to messages. | ||
13 | * | ||
14 | * @author Tamas Szabo | ||
15 | * @since 2.3 | ||
16 | */ | ||
17 | public enum PhasedSelector implements MessageSelector { | ||
18 | |||
19 | /** | ||
20 | * No special distinguishing feature | ||
21 | */ | ||
22 | DEFAULT, | ||
23 | |||
24 | /** | ||
25 | * Inserts and delete-insert monotone change pairs | ||
26 | */ | ||
27 | MONOTONE, | ||
28 | |||
29 | /** | ||
30 | * Deletes | ||
31 | */ | ||
32 | ANTI_MONOTONE | ||
33 | |||
34 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java new file mode 100644 index 00000000..a50a63a8 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java | |||
@@ -0,0 +1,124 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.AbstractMap; | ||
12 | import java.util.Collection; | ||
13 | import java.util.Map; | ||
14 | import java.util.Set; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines; | ||
18 | |||
19 | /** | ||
20 | * A timestamp associated with update messages in timely evaluation. | ||
21 | * | ||
22 | * @author Tamas Szabo | ||
23 | * @since 2.3 | ||
24 | */ | ||
25 | public class Timestamp implements Comparable<Timestamp>, MessageSelector { | ||
26 | |||
27 | protected final int value; | ||
28 | public static final Timestamp ZERO = new Timestamp(0); | ||
29 | /** | ||
30 | * @since 2.4 | ||
31 | */ | ||
32 | public static final Timeline<Timestamp> INSERT_AT_ZERO_TIMELINE = Timelines.createFrom(Timestamp.ZERO); | ||
33 | |||
34 | public Timestamp(final int value) { | ||
35 | this.value = value; | ||
36 | } | ||
37 | |||
38 | public int getValue() { | ||
39 | return value; | ||
40 | } | ||
41 | |||
42 | public Timestamp max(final Timestamp that) { | ||
43 | if (this.value >= that.value) { | ||
44 | return this; | ||
45 | } else { | ||
46 | return that; | ||
47 | } | ||
48 | } | ||
49 | |||
50 | /** | ||
51 | * @since 2.4 | ||
52 | */ | ||
53 | public Timestamp min(final Timestamp that) { | ||
54 | if (this.value <= that.value) { | ||
55 | return this; | ||
56 | } else { | ||
57 | return that; | ||
58 | } | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public int compareTo(final Timestamp that) { | ||
63 | return this.value - that.value; | ||
64 | } | ||
65 | |||
66 | @Override | ||
67 | public boolean equals(final Object obj) { | ||
68 | if (obj == null || !(obj instanceof Timestamp)) { | ||
69 | return false; | ||
70 | } else { | ||
71 | return this.value == ((Timestamp) obj).value; | ||
72 | } | ||
73 | } | ||
74 | |||
75 | @Override | ||
76 | public int hashCode() { | ||
77 | return this.value; | ||
78 | } | ||
79 | |||
80 | @Override | ||
81 | public String toString() { | ||
82 | return Integer.toString(this.value); | ||
83 | } | ||
84 | |||
85 | /** | ||
86 | * A {@link Map} implementation that associates the zero timestamp with every key. There is no suppor for | ||
87 | * {@link Map#entrySet()} due to performance reasons. | ||
88 | * | ||
89 | * @author Tamas Szabo | ||
90 | */ | ||
91 | public static final class AllZeroMap<T> extends AbstractMap<T, Timeline<Timestamp>> { | ||
92 | |||
93 | private final Collection<T> wrapped; | ||
94 | |||
95 | public AllZeroMap(Set<T> wrapped) { | ||
96 | this.wrapped = wrapped; | ||
97 | } | ||
98 | |||
99 | @Override | ||
100 | public Set<Entry<T, Timeline<Timestamp>>> entrySet() { | ||
101 | throw new UnsupportedOperationException("Use the combination of keySet() and get()!"); | ||
102 | } | ||
103 | |||
104 | /** | ||
105 | * @since 2.4 | ||
106 | */ | ||
107 | @Override | ||
108 | public Timeline<Timestamp> get(final Object key) { | ||
109 | return INSERT_AT_ZERO_TIMELINE; | ||
110 | } | ||
111 | |||
112 | @Override | ||
113 | public Set<T> keySet() { | ||
114 | return (Set<T>) this.wrapped; | ||
115 | } | ||
116 | |||
117 | @Override | ||
118 | public String toString() { | ||
119 | return this.getClass().getSimpleName() + ": " + this.keySet().toString(); | ||
120 | } | ||
121 | |||
122 | } | ||
123 | |||
124 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java new file mode 100644 index 00000000..d8260384 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java | |||
@@ -0,0 +1,164 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.EnumMap; | ||
14 | import java.util.LinkedHashSet; | ||
15 | import java.util.Map; | ||
16 | import java.util.Set; | ||
17 | |||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
20 | import tools.refinery.viatra.runtime.rete.network.RederivableNode; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
25 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
26 | |||
27 | /** | ||
28 | * A communication group representing either a single node where the | ||
29 | * node is a monotonicity aware one or a set of nodes that form an SCC. | ||
30 | * | ||
31 | * @author Tamas Szabo | ||
32 | * @since 2.4 | ||
33 | */ | ||
34 | public class RecursiveCommunicationGroup extends CommunicationGroup { | ||
35 | |||
36 | private final Set<Mailbox> antiMonotoneMailboxes; | ||
37 | private final Set<Mailbox> monotoneMailboxes; | ||
38 | private final Set<Mailbox> defaultMailboxes; | ||
39 | private final Set<RederivableNode> rederivables; | ||
40 | private boolean currentlyDelivering; | ||
41 | |||
42 | /** | ||
43 | * @since 1.7 | ||
44 | */ | ||
45 | public RecursiveCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
46 | super(tracker, representative, identifier); | ||
47 | this.antiMonotoneMailboxes = CollectionsFactory.createSet(); | ||
48 | this.monotoneMailboxes = CollectionsFactory.createSet(); | ||
49 | this.defaultMailboxes = CollectionsFactory.createSet(); | ||
50 | this.rederivables = new LinkedHashSet<RederivableNode>(); | ||
51 | this.currentlyDelivering = false; | ||
52 | } | ||
53 | |||
54 | @Override | ||
55 | public void deliverMessages() { | ||
56 | this.currentlyDelivering = true; | ||
57 | |||
58 | // ANTI-MONOTONE PHASE | ||
59 | while (!this.antiMonotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) { | ||
60 | while (!this.antiMonotoneMailboxes.isEmpty()) { | ||
61 | final Mailbox mailbox = this.antiMonotoneMailboxes.iterator().next(); | ||
62 | this.antiMonotoneMailboxes.remove(mailbox); | ||
63 | mailbox.deliverAll(PhasedSelector.ANTI_MONOTONE); | ||
64 | } | ||
65 | while (!this.defaultMailboxes.isEmpty()) { | ||
66 | final Mailbox mailbox = this.defaultMailboxes.iterator().next(); | ||
67 | this.defaultMailboxes.remove(mailbox); | ||
68 | mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
69 | } | ||
70 | } | ||
71 | |||
72 | // REDERIVE PHASE | ||
73 | while (!this.rederivables.isEmpty()) { | ||
74 | // re-derivable nodes take care of their unregistration!! | ||
75 | final RederivableNode node = this.rederivables.iterator().next(); | ||
76 | node.rederiveOne(); | ||
77 | } | ||
78 | |||
79 | // MONOTONE PHASE | ||
80 | while (!this.monotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) { | ||
81 | while (!this.monotoneMailboxes.isEmpty()) { | ||
82 | final Mailbox mailbox = this.monotoneMailboxes.iterator().next(); | ||
83 | this.monotoneMailboxes.remove(mailbox); | ||
84 | mailbox.deliverAll(PhasedSelector.MONOTONE); | ||
85 | } | ||
86 | while (!this.defaultMailboxes.isEmpty()) { | ||
87 | final Mailbox mailbox = this.defaultMailboxes.iterator().next(); | ||
88 | this.defaultMailboxes.remove(mailbox); | ||
89 | mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
90 | } | ||
91 | } | ||
92 | |||
93 | this.currentlyDelivering = false; | ||
94 | } | ||
95 | |||
96 | @Override | ||
97 | public boolean isEmpty() { | ||
98 | return this.rederivables.isEmpty() && this.antiMonotoneMailboxes.isEmpty() | ||
99 | && this.monotoneMailboxes.isEmpty() && this.defaultMailboxes.isEmpty(); | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) { | ||
104 | final Collection<Mailbox> mailboxes = getMailboxContainer(kind); | ||
105 | mailboxes.add(mailbox); | ||
106 | if (!this.isEnqueued && !this.currentlyDelivering) { | ||
107 | this.tracker.activateUnenqueued(this); | ||
108 | } | ||
109 | } | ||
110 | |||
111 | @Override | ||
112 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
113 | final Collection<Mailbox> mailboxes = getMailboxContainer(kind); | ||
114 | mailboxes.remove(mailbox); | ||
115 | if (isEmpty()) { | ||
116 | this.tracker.deactivate(this); | ||
117 | } | ||
118 | } | ||
119 | |||
120 | private Collection<Mailbox> getMailboxContainer(final MessageSelector kind) { | ||
121 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
122 | return this.antiMonotoneMailboxes; | ||
123 | } else if (kind == PhasedSelector.MONOTONE) { | ||
124 | return this.monotoneMailboxes; | ||
125 | } else if (kind == PhasedSelector.DEFAULT) { | ||
126 | return this.defaultMailboxes; | ||
127 | } else { | ||
128 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
129 | } | ||
130 | } | ||
131 | |||
132 | public void addRederivable(final RederivableNode node) { | ||
133 | this.rederivables.add(node); | ||
134 | if (!this.isEnqueued) { | ||
135 | this.tracker.activateUnenqueued(this); | ||
136 | } | ||
137 | } | ||
138 | |||
139 | public void removeRederivable(final RederivableNode node) { | ||
140 | this.rederivables.remove(node); | ||
141 | if (isEmpty()) { | ||
142 | this.tracker.deactivate(this); | ||
143 | } | ||
144 | } | ||
145 | |||
146 | public Collection<RederivableNode> getRederivables() { | ||
147 | return this.rederivables; | ||
148 | } | ||
149 | |||
150 | @Override | ||
151 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
152 | Map<PhasedSelector, Collection<Mailbox>> map = new EnumMap<>(PhasedSelector.class); | ||
153 | map.put(PhasedSelector.ANTI_MONOTONE, antiMonotoneMailboxes); | ||
154 | map.put(PhasedSelector.MONOTONE, monotoneMailboxes); | ||
155 | map.put(PhasedSelector.DEFAULT, defaultMailboxes); | ||
156 | return Collections.unmodifiableMap(map); | ||
157 | } | ||
158 | |||
159 | @Override | ||
160 | public boolean isRecursive() { | ||
161 | return true; | ||
162 | } | ||
163 | |||
164 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java new file mode 100644 index 00000000..c51c7dbf --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java | |||
@@ -0,0 +1,86 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Map; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
20 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
21 | |||
22 | /** | ||
23 | * A communication group containing only a single node with a single default | ||
24 | * mailbox. | ||
25 | * | ||
26 | * @author Tamas Szabo | ||
27 | * @since 1.6 | ||
28 | */ | ||
29 | public class SingletonCommunicationGroup extends CommunicationGroup { | ||
30 | |||
31 | private Mailbox mailbox; | ||
32 | |||
33 | /** | ||
34 | * @since 1.7 | ||
35 | */ | ||
36 | public SingletonCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
37 | super(tracker, representative, identifier); | ||
38 | } | ||
39 | |||
40 | @Override | ||
41 | public void deliverMessages() { | ||
42 | this.mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
43 | } | ||
44 | |||
45 | @Override | ||
46 | public boolean isEmpty() { | ||
47 | return this.mailbox == null; | ||
48 | } | ||
49 | |||
50 | @Override | ||
51 | public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) { | ||
52 | if (kind == PhasedSelector.DEFAULT) { | ||
53 | this.mailbox = mailbox; | ||
54 | if (!this.isEnqueued) { | ||
55 | this.tracker.activateUnenqueued(this); | ||
56 | } | ||
57 | } else { | ||
58 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
59 | } | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
64 | if (kind == PhasedSelector.DEFAULT) { | ||
65 | this.mailbox = null; | ||
66 | this.tracker.deactivate(this); | ||
67 | } else { | ||
68 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
69 | } | ||
70 | } | ||
71 | |||
72 | @Override | ||
73 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
74 | if (mailbox != null) { | ||
75 | return Collections.singletonMap(PhasedSelector.DEFAULT, Collections.singleton(mailbox)); | ||
76 | } else { | ||
77 | return Collections.emptyMap(); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | @Override | ||
82 | public boolean isRecursive() { | ||
83 | return false; | ||
84 | } | ||
85 | |||
86 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java new file mode 100644 index 00000000..1c18c1cd --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java | |||
@@ -0,0 +1,149 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.HashSet; | ||
13 | import java.util.Set; | ||
14 | import java.util.Map.Entry; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
17 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
18 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
19 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
20 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
21 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
22 | import tools.refinery.viatra.runtime.rete.network.RederivableNode; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
26 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
28 | |||
29 | /** | ||
30 | * Timeless implementation of the communication tracker. | ||
31 | * | ||
32 | * @author Tamas Szabo | ||
33 | * @since 2.2 | ||
34 | */ | ||
35 | public class TimelessCommunicationTracker extends CommunicationTracker { | ||
36 | |||
37 | @Override | ||
38 | protected CommunicationGroup createGroup(Node representative, int index) { | ||
39 | final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; | ||
40 | final boolean isReceiver = representative instanceof Receiver; | ||
41 | final boolean isPosetIndifferent = isReceiver | ||
42 | && ((Receiver) representative).getMailbox() instanceof BehaviorChangingMailbox; | ||
43 | final boolean isSingletonInDRedMode = isSingleton && (representative instanceof RederivableNode) | ||
44 | && ((RederivableNode) representative).isInDRedMode(); | ||
45 | |||
46 | CommunicationGroup group = null; | ||
47 | // we can only use a singleton group iff | ||
48 | // (1) the SCC has one node AND | ||
49 | // (2) either we have a poset-indifferent mailbox OR the node is not even a receiver AND | ||
50 | // (3) the node does not run in DRed mode in a singleton group | ||
51 | if (isSingleton && (isPosetIndifferent || !isReceiver) && !isSingletonInDRedMode) { | ||
52 | group = new SingletonCommunicationGroup(this, representative, index); | ||
53 | } else { | ||
54 | group = new RecursiveCommunicationGroup(this, representative, index); | ||
55 | } | ||
56 | |||
57 | return group; | ||
58 | } | ||
59 | |||
60 | @Override | ||
61 | protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) { | ||
62 | for (final CommunicationGroup oldGroup : oldActiveGroups) { | ||
63 | for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) { | ||
64 | for (final Mailbox mailbox : entry.getValue()) { | ||
65 | final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver()); | ||
66 | newGroup.notifyHasMessage(mailbox, entry.getKey()); | ||
67 | } | ||
68 | } | ||
69 | |||
70 | if (oldGroup instanceof RecursiveCommunicationGroup) { | ||
71 | for (final RederivableNode node : ((RecursiveCommunicationGroup) oldGroup).getRederivables()) { | ||
72 | final CommunicationGroup newGroup = this.groupMap.get(node); | ||
73 | if (!(newGroup instanceof RecursiveCommunicationGroup)) { | ||
74 | throw new IllegalStateException("The new group must also be recursive! " + newGroup); | ||
75 | } | ||
76 | ((RecursiveCommunicationGroup) newGroup).addRederivable(node); | ||
77 | } | ||
78 | } | ||
79 | } | ||
80 | } | ||
81 | |||
82 | @Override | ||
83 | public Mailbox proxifyMailbox(final Node requester, final Mailbox original) { | ||
84 | return original; | ||
85 | } | ||
86 | |||
87 | @Override | ||
88 | public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) { | ||
89 | return original; | ||
90 | } | ||
91 | |||
92 | @Override | ||
93 | protected void postProcessNode(final Node node) { | ||
94 | if (node instanceof Receiver) { | ||
95 | final Mailbox mailbox = ((Receiver) node).getMailbox(); | ||
96 | if (mailbox instanceof BehaviorChangingMailbox) { | ||
97 | final CommunicationGroup group = this.groupMap.get(node); | ||
98 | final Set<Node> sccNodes = this.sccInformationProvider.sccs.getPartition(node); | ||
99 | // a default mailbox must split its messages iff | ||
100 | // (1) its receiver is in a recursive group and | ||
101 | final boolean c1 = group.isRecursive(); | ||
102 | // (2) its receiver is at the SCC boundary of that group | ||
103 | final boolean c2 = isAtSCCBoundary(node); | ||
104 | // (3) its group consists of more than one node | ||
105 | final boolean c3 = sccNodes.size() > 1; | ||
106 | ((BehaviorChangingMailbox) mailbox).setSplitFlag(c1 && c2 && c3); | ||
107 | } | ||
108 | } | ||
109 | } | ||
110 | |||
111 | @Override | ||
112 | protected void postProcessGroup(final CommunicationGroup group) { | ||
113 | |||
114 | } | ||
115 | |||
116 | /** | ||
117 | * @since 2.0 | ||
118 | */ | ||
119 | private boolean isAtSCCBoundary(final Node node) { | ||
120 | final CommunicationGroup ownGroup = this.groupMap.get(node); | ||
121 | assert ownGroup != null; | ||
122 | for (final Node source : this.dependencyGraph.getSourceNodes(node).distinctValues()) { | ||
123 | final Set<Node> sourcesToCheck = new HashSet<Node>(); | ||
124 | sourcesToCheck.add(source); | ||
125 | // DualInputNodes must be checked additionally because they do not use a mailbox directly. | ||
126 | // It can happen that their indexers actually belong to other SCCs. | ||
127 | if (source instanceof DualInputNode) { | ||
128 | final DualInputNode dualInput = (DualInputNode) source; | ||
129 | final IterableIndexer primarySlot = dualInput.getPrimarySlot(); | ||
130 | if (primarySlot != null) { | ||
131 | sourcesToCheck.add(primarySlot.getActiveNode()); | ||
132 | } | ||
133 | final Indexer secondarySlot = dualInput.getSecondarySlot(); | ||
134 | if (secondarySlot != null) { | ||
135 | sourcesToCheck.add(secondarySlot.getActiveNode()); | ||
136 | } | ||
137 | } | ||
138 | for (final Node current : sourcesToCheck) { | ||
139 | final CommunicationGroup otherGroup = this.groupMap.get(current); | ||
140 | assert otherGroup != null; | ||
141 | if (!ownGroup.equals(otherGroup)) { | ||
142 | return true; | ||
143 | } | ||
144 | } | ||
145 | } | ||
146 | return false; | ||
147 | } | ||
148 | |||
149 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java new file mode 100644 index 00000000..8097bd91 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java | |||
@@ -0,0 +1,36 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.IGroupable; | ||
12 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
13 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
14 | |||
15 | /** | ||
16 | * {@link Node}s that implement this interface can resume folding of their states when instructed during timely evaluation. | ||
17 | * | ||
18 | * @since 2.3 | ||
19 | * @author Tamas Szabo | ||
20 | */ | ||
21 | public interface ResumableNode extends Node, IGroupable { | ||
22 | |||
23 | /** | ||
24 | * When called, the folding of the state shall be resumed at the given timestamp. The resumable is expected to | ||
25 | * do a folding step at the given timestamp only. Afterwards, folding shall be interrupted, even if there is more | ||
26 | * folding to do towards higher timestamps. | ||
27 | */ | ||
28 | public void resumeAt(final Timestamp timestamp); | ||
29 | |||
30 | /** | ||
31 | * Returns the smallest timestamp where lazy folding shall be resumed, or null if there is no more folding to do in this | ||
32 | * resumable. | ||
33 | */ | ||
34 | public Timestamp getResumableTimestamp(); | ||
35 | |||
36 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java new file mode 100644 index 00000000..0394d92c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java | |||
@@ -0,0 +1,171 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Comparator; | ||
14 | import java.util.HashMap; | ||
15 | import java.util.Map; | ||
16 | import java.util.Map.Entry; | ||
17 | import java.util.Set; | ||
18 | import java.util.TreeMap; | ||
19 | import java.util.TreeSet; | ||
20 | |||
21 | import org.apache.log4j.Logger; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
23 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
28 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
29 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
30 | |||
31 | /** | ||
32 | * A timely communication group implementation. {@link TimelyMailbox}es and {@link LazyFoldingNode}s are ordered in the | ||
33 | * increasing order of timestamps. | ||
34 | * | ||
35 | * @author Tamas Szabo | ||
36 | * @since 2.3 | ||
37 | */ | ||
38 | public class TimelyCommunicationGroup extends CommunicationGroup { | ||
39 | |||
40 | private final boolean isSingleton; | ||
41 | private final TreeMap<Timestamp, Set<Mailbox>> mailboxQueue; | ||
42 | // may be null - only used in the scattered case where we need to take care of mailboxes and resumables too | ||
43 | private Comparator<Node> nodeComparator; | ||
44 | private boolean currentlyDelivering; | ||
45 | private Timestamp currentlyDeliveredTimestamp; | ||
46 | |||
47 | public TimelyCommunicationGroup(final TimelyCommunicationTracker tracker, final Node representative, | ||
48 | final int identifier, final boolean isSingleton) { | ||
49 | super(tracker, representative, identifier); | ||
50 | this.isSingleton = isSingleton; | ||
51 | this.mailboxQueue = CollectionsFactory.createTreeMap(); | ||
52 | this.currentlyDelivering = false; | ||
53 | } | ||
54 | |||
55 | /** | ||
56 | * Sets the {@link Comparator} to be used to order the {@link Mailbox}es at a given {@link Timestamp} in the mailbox | ||
57 | * queue. Additionally, reorders already queued {@link Mailbox}es to reflect the new comparator. The comparator may | ||
58 | * be null, in this case, no set ordering will be enforced among the {@link Mailbox}es. | ||
59 | */ | ||
60 | public void setComparatorAndReorderMailboxes(final Comparator<Node> nodeComparator) { | ||
61 | this.nodeComparator = nodeComparator; | ||
62 | if (!this.mailboxQueue.isEmpty()) { | ||
63 | final HashMap<Timestamp, Set<Mailbox>> queueCopy = new HashMap<Timestamp, Set<Mailbox>>(this.mailboxQueue); | ||
64 | this.mailboxQueue.clear(); | ||
65 | for (final Entry<Timestamp, Set<Mailbox>> entry : queueCopy.entrySet()) { | ||
66 | for (final Mailbox mailbox : entry.getValue()) { | ||
67 | this.notifyHasMessage(mailbox, entry.getKey()); | ||
68 | } | ||
69 | } | ||
70 | } | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public void deliverMessages() { | ||
75 | this.currentlyDelivering = true; | ||
76 | while (!this.mailboxQueue.isEmpty()) { | ||
77 | // care must be taken here how we iterate over the mailboxes | ||
78 | // it is not okay to loop over the mailboxes at once because a mailbox may disappear from the collection as | ||
79 | // a result of delivering messages from another mailboxes under the same timestamp | ||
80 | // because of this, it is crucial that we pick the mailboxes one by one | ||
81 | final Entry<Timestamp, Set<Mailbox>> entry = this.mailboxQueue.firstEntry(); | ||
82 | final Timestamp timestamp = entry.getKey(); | ||
83 | final Set<Mailbox> mailboxes = entry.getValue(); | ||
84 | final Mailbox mailbox = mailboxes.iterator().next(); | ||
85 | mailboxes.remove(mailbox); | ||
86 | if (mailboxes.isEmpty()) { | ||
87 | this.mailboxQueue.pollFirstEntry(); | ||
88 | } | ||
89 | assert mailbox instanceof TimelyMailbox; | ||
90 | /* debug */ this.currentlyDeliveredTimestamp = timestamp; | ||
91 | mailbox.deliverAll(timestamp); | ||
92 | /* debug */ this.currentlyDeliveredTimestamp = null; | ||
93 | } | ||
94 | this.currentlyDelivering = false; | ||
95 | } | ||
96 | |||
97 | @Override | ||
98 | public boolean isEmpty() { | ||
99 | return this.mailboxQueue.isEmpty(); | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void notifyHasMessage(final Mailbox mailbox, MessageSelector kind) { | ||
104 | if (kind instanceof Timestamp) { | ||
105 | final Timestamp timestamp = (Timestamp) kind; | ||
106 | if (Options.MONITOR_VIOLATION_OF_DIFFERENTIAL_DATAFLOW_TIMESTAMPS) { | ||
107 | if (timestamp.compareTo(this.currentlyDeliveredTimestamp) < 0) { | ||
108 | final Logger logger = this.representative.getContainer().getNetwork().getEngine().getLogger(); | ||
109 | logger.error( | ||
110 | "[INTERNAL ERROR] Violation of differential dataflow communication schema! The communication component with representative " | ||
111 | + this.representative + " observed decreasing timestamp during message delivery!"); | ||
112 | } | ||
113 | } | ||
114 | final Set<Mailbox> mailboxes = this.mailboxQueue.computeIfAbsent(timestamp, k -> { | ||
115 | if (this.nodeComparator == null) { | ||
116 | return CollectionsFactory.createSet(); | ||
117 | } else { | ||
118 | return new TreeSet<Mailbox>(new Comparator<Mailbox>() { | ||
119 | @Override | ||
120 | public int compare(final Mailbox left, final Mailbox right) { | ||
121 | return nodeComparator.compare(left.getReceiver(), right.getReceiver()); | ||
122 | } | ||
123 | }); | ||
124 | } | ||
125 | }); | ||
126 | mailboxes.add(mailbox); | ||
127 | if (!this.isEnqueued && !this.currentlyDelivering) { | ||
128 | this.tracker.activateUnenqueued(this); | ||
129 | } | ||
130 | } else { | ||
131 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
132 | } | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
137 | if (kind instanceof Timestamp) { | ||
138 | final Timestamp timestamp = (Timestamp) kind; | ||
139 | this.mailboxQueue.compute(timestamp, (k, v) -> { | ||
140 | if (v == null) { | ||
141 | throw new IllegalStateException("No mailboxes registered at timestamp " + timestamp + "!"); | ||
142 | } | ||
143 | if (!v.remove(mailbox)) { | ||
144 | throw new IllegalStateException( | ||
145 | "The mailbox " + mailbox + " was not registered at timestamp " + timestamp + "!"); | ||
146 | } | ||
147 | if (v.isEmpty()) { | ||
148 | return null; | ||
149 | } else { | ||
150 | return v; | ||
151 | } | ||
152 | }); | ||
153 | if (this.mailboxQueue.isEmpty()) { | ||
154 | this.tracker.deactivate(this); | ||
155 | } | ||
156 | } else { | ||
157 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
158 | } | ||
159 | } | ||
160 | |||
161 | @Override | ||
162 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
163 | return Collections.unmodifiableMap(this.mailboxQueue); | ||
164 | } | ||
165 | |||
166 | @Override | ||
167 | public boolean isRecursive() { | ||
168 | return !this.isSingleton; | ||
169 | } | ||
170 | |||
171 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java new file mode 100644 index 00000000..79179880 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java | |||
@@ -0,0 +1,216 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.List; | ||
13 | import java.util.Map; | ||
14 | import java.util.Map.Entry; | ||
15 | import java.util.Set; | ||
16 | import java.util.function.Function; | ||
17 | |||
18 | import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting; | ||
19 | import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
21 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
22 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer; | ||
23 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription; | ||
24 | import tools.refinery.viatra.runtime.rete.index.StandardIndexer; | ||
25 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
26 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
27 | import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; | ||
28 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
29 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
30 | import tools.refinery.viatra.runtime.rete.network.StandardNode; | ||
31 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
32 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
33 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
34 | import tools.refinery.viatra.runtime.rete.network.communication.NodeComparator; | ||
35 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
36 | import tools.refinery.viatra.runtime.rete.single.DiscriminatorDispatcherNode; | ||
37 | |||
38 | /** | ||
39 | * Timely (DDF) implementation of the {@link CommunicationTracker}. | ||
40 | * | ||
41 | * @author Tamas Szabo | ||
42 | * @since 2.3 | ||
43 | */ | ||
44 | public class TimelyCommunicationTracker extends CommunicationTracker { | ||
45 | |||
46 | protected final TimelyConfiguration configuration; | ||
47 | |||
48 | public TimelyCommunicationTracker(final TimelyConfiguration configuration) { | ||
49 | this.configuration = configuration; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | protected CommunicationGroup createGroup(final Node representative, final int index) { | ||
54 | final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; | ||
55 | return new TimelyCommunicationGroup(this, representative, index, isSingleton); | ||
56 | } | ||
57 | |||
58 | @Override | ||
59 | protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) { | ||
60 | for (final CommunicationGroup oldGroup : oldActiveGroups) { | ||
61 | for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) { | ||
62 | for (final Mailbox mailbox : entry.getValue()) { | ||
63 | final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver()); | ||
64 | newGroup.notifyHasMessage(mailbox, entry.getKey()); | ||
65 | } | ||
66 | } | ||
67 | } | ||
68 | } | ||
69 | |||
70 | @Override | ||
71 | public Mailbox proxifyMailbox(final Node requester, final Mailbox original) { | ||
72 | final Mailbox mailboxToProxify = (original instanceof TimelyMailboxProxy) | ||
73 | ? ((TimelyMailboxProxy) original).getWrappedMailbox() | ||
74 | : original; | ||
75 | final TimestampTransformation preprocessor = getPreprocessor(requester, mailboxToProxify.getReceiver()); | ||
76 | if (preprocessor == null) { | ||
77 | return mailboxToProxify; | ||
78 | } else { | ||
79 | return new TimelyMailboxProxy(mailboxToProxify, preprocessor); | ||
80 | } | ||
81 | } | ||
82 | |||
83 | @Override | ||
84 | public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) { | ||
85 | final IndexerListener listenerToProxify = (original instanceof TimelyIndexerListenerProxy) | ||
86 | ? ((TimelyIndexerListenerProxy) original).getWrappedIndexerListener() | ||
87 | : original; | ||
88 | final TimestampTransformation preprocessor = getPreprocessor(requester, listenerToProxify.getOwner()); | ||
89 | if (preprocessor == null) { | ||
90 | return listenerToProxify; | ||
91 | } else { | ||
92 | return new TimelyIndexerListenerProxy(listenerToProxify, preprocessor); | ||
93 | } | ||
94 | } | ||
95 | |||
96 | protected TimestampTransformation getPreprocessor(final Node source, final Node target) { | ||
97 | final Node effectiveSource = source instanceof SpecializedProjectionIndexer | ||
98 | ? ((SpecializedProjectionIndexer) source).getActiveNode() | ||
99 | : source; | ||
100 | final CommunicationGroup sourceGroup = this.getGroup(effectiveSource); | ||
101 | final CommunicationGroup targetGroup = this.getGroup(target); | ||
102 | |||
103 | if (sourceGroup != null && targetGroup != null) { | ||
104 | // during RETE construction, the groups may be still null | ||
105 | if (sourceGroup != targetGroup && sourceGroup.isRecursive()) { | ||
106 | // targetGroup is a successor SCC of sourceGroup | ||
107 | // and sourceGroup is a recursive SCC | ||
108 | // then we need to zero out the timestamps | ||
109 | return TimestampTransformation.RESET; | ||
110 | } | ||
111 | if (sourceGroup == targetGroup && target instanceof ProductionNode) { | ||
112 | // if requester and receiver are in the same SCC | ||
113 | // and receiver is a production node | ||
114 | // then we need to increment the timestamps | ||
115 | return TimestampTransformation.INCREMENT; | ||
116 | } | ||
117 | } | ||
118 | |||
119 | return null; | ||
120 | } | ||
121 | |||
122 | @Override | ||
123 | protected void postProcessNode(final Node node) { | ||
124 | if (node instanceof NetworkStructureChangeSensitiveNode) { | ||
125 | ((NetworkStructureChangeSensitiveNode) node).networkStructureChanged(); | ||
126 | } | ||
127 | } | ||
128 | |||
129 | @Override | ||
130 | protected void postProcessGroup(final CommunicationGroup group) { | ||
131 | if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { | ||
132 | final Node representative = group.getRepresentative(); | ||
133 | final Set<Node> groupMembers = this.sccInformationProvider.sccs.getPartition(representative); | ||
134 | if (groupMembers.size() > 1) { | ||
135 | final Graph<Node> graph = new Graph<Node>(); | ||
136 | |||
137 | for (final Node node : groupMembers) { | ||
138 | graph.insertNode(node); | ||
139 | } | ||
140 | |||
141 | for (final Node source : groupMembers) { | ||
142 | for (final Node target : this.dependencyGraph.getTargetNodes(source)) { | ||
143 | // (1) the edge is not a recursion cut point | ||
144 | // (2) the edge is within this group | ||
145 | if (!this.isRecursionCutPoint(source, target) && groupMembers.contains(target)) { | ||
146 | graph.insertEdge(source, target); | ||
147 | } | ||
148 | } | ||
149 | } | ||
150 | |||
151 | final List<Node> orderedNodes = TopologicalSorting.compute(graph); | ||
152 | final Map<Node, Integer> nodeMap = CollectionsFactory.createMap(); | ||
153 | int identifier = 0; | ||
154 | for (final Node orderedNode : orderedNodes) { | ||
155 | nodeMap.put(orderedNode, identifier++); | ||
156 | } | ||
157 | |||
158 | ((TimelyCommunicationGroup) group).setComparatorAndReorderMailboxes(new NodeComparator(nodeMap)); | ||
159 | } | ||
160 | } | ||
161 | } | ||
162 | |||
163 | /** | ||
164 | * This static field is used for debug purposes in the DotGenerator. | ||
165 | */ | ||
166 | public static final Function<Node, Function<Node, String>> EDGE_LABEL_FUNCTION = new Function<Node, Function<Node, String>>() { | ||
167 | |||
168 | @Override | ||
169 | public Function<Node, String> apply(final Node source) { | ||
170 | return new Function<Node, String>() { | ||
171 | @Override | ||
172 | public String apply(final Node target) { | ||
173 | if (source instanceof SpecializedProjectionIndexer) { | ||
174 | final Collection<ListenerSubscription> subscriptions = ((SpecializedProjectionIndexer) source) | ||
175 | .getSubscriptions(); | ||
176 | for (final ListenerSubscription subscription : subscriptions) { | ||
177 | if (subscription.getListener().getOwner() == target | ||
178 | && subscription.getListener() instanceof TimelyIndexerListenerProxy) { | ||
179 | return ((TimelyIndexerListenerProxy) subscription.getListener()).preprocessor | ||
180 | .toString(); | ||
181 | } | ||
182 | } | ||
183 | } | ||
184 | if (source instanceof StandardIndexer) { | ||
185 | final Collection<IndexerListener> listeners = ((StandardIndexer) source).getListeners(); | ||
186 | for (final IndexerListener listener : listeners) { | ||
187 | if (listener.getOwner() == target && listener instanceof TimelyIndexerListenerProxy) { | ||
188 | return ((TimelyIndexerListenerProxy) listener).preprocessor.toString(); | ||
189 | } | ||
190 | } | ||
191 | } | ||
192 | if (source instanceof StandardNode) { | ||
193 | final Collection<Mailbox> mailboxes = ((StandardNode) source).getChildMailboxes(); | ||
194 | for (final Mailbox mailbox : mailboxes) { | ||
195 | if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { | ||
196 | return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); | ||
197 | } | ||
198 | } | ||
199 | } | ||
200 | if (source instanceof DiscriminatorDispatcherNode) { | ||
201 | final Collection<Mailbox> mailboxes = ((DiscriminatorDispatcherNode) source) | ||
202 | .getBucketMailboxes().values(); | ||
203 | for (final Mailbox mailbox : mailboxes) { | ||
204 | if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { | ||
205 | return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); | ||
206 | } | ||
207 | } | ||
208 | } | ||
209 | return null; | ||
210 | } | ||
211 | }; | ||
212 | } | ||
213 | |||
214 | }; | ||
215 | |||
216 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java new file mode 100644 index 00000000..e8fbf84e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java | |||
@@ -0,0 +1,81 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Preconditions; | ||
14 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
15 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
16 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
18 | |||
19 | /** | ||
20 | * A timely proxy for another {@link IndexerListener}, which performs some preprocessing | ||
21 | * on the differential timestamps before passing it on to the real recipient. | ||
22 | * <p> | ||
23 | * These proxies are used on edges leading into {@link ProductionNode}s. Because {@link ProductionNode}s | ||
24 | * never ask back the indexer for its contents, there is no need to also apply the proxy on that direction. | ||
25 | * | ||
26 | * @author Tamas Szabo | ||
27 | * @since 2.3 | ||
28 | */ | ||
29 | public class TimelyIndexerListenerProxy implements IndexerListener { | ||
30 | |||
31 | protected final TimestampTransformation preprocessor; | ||
32 | protected final IndexerListener wrapped; | ||
33 | |||
34 | public TimelyIndexerListenerProxy(final IndexerListener wrapped, | ||
35 | final TimestampTransformation preprocessor) { | ||
36 | Preconditions.checkArgument(!(wrapped instanceof TimelyIndexerListenerProxy), "Proxy in a proxy is not allowed!"); | ||
37 | this.wrapped = wrapped; | ||
38 | this.preprocessor = preprocessor; | ||
39 | } | ||
40 | |||
41 | public IndexerListener getWrappedIndexerListener() { | ||
42 | return wrapped; | ||
43 | } | ||
44 | |||
45 | @Override | ||
46 | public Node getOwner() { | ||
47 | return this.wrapped.getOwner(); | ||
48 | } | ||
49 | |||
50 | @Override | ||
51 | public void notifyIndexerUpdate(final Direction direction, final Tuple updateElement, final Tuple signature, | ||
52 | final boolean change, final Timestamp timestamp) { | ||
53 | this.wrapped.notifyIndexerUpdate(direction, updateElement, signature, change, preprocessor.process(timestamp)); | ||
54 | } | ||
55 | |||
56 | @Override | ||
57 | public String toString() { | ||
58 | return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString(); | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public boolean equals(final Object obj) { | ||
63 | if (obj == null || obj.getClass() != this.getClass()) { | ||
64 | return false; | ||
65 | } else if (obj == this) { | ||
66 | return true; | ||
67 | } else { | ||
68 | final TimelyIndexerListenerProxy that = (TimelyIndexerListenerProxy) obj; | ||
69 | return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor; | ||
70 | } | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public int hashCode() { | ||
75 | int hash = 1; | ||
76 | hash = hash * 17 + this.wrapped.hashCode(); | ||
77 | hash = hash * 31 + this.preprocessor.hashCode(); | ||
78 | return hash; | ||
79 | } | ||
80 | |||
81 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java new file mode 100644 index 00000000..550bfbeb --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java | |||
@@ -0,0 +1,102 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Preconditions; | ||
14 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
15 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
18 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
19 | |||
20 | /** | ||
21 | * A timely proxy for another {@link Mailbox}, which performs some preprocessing | ||
22 | * on the differential timestamps before passing it on to the real recipient. | ||
23 | * | ||
24 | * @author Tamas Szabo | ||
25 | * @since 2.3 | ||
26 | */ | ||
27 | public class TimelyMailboxProxy implements Mailbox { | ||
28 | |||
29 | protected final TimestampTransformation preprocessor; | ||
30 | protected final Mailbox wrapped; | ||
31 | |||
32 | public TimelyMailboxProxy(final Mailbox wrapped, final TimestampTransformation preprocessor) { | ||
33 | Preconditions.checkArgument(!(wrapped instanceof TimelyMailboxProxy), "Proxy in a proxy is not allowed!"); | ||
34 | this.wrapped = wrapped; | ||
35 | this.preprocessor = preprocessor; | ||
36 | } | ||
37 | |||
38 | public Mailbox getWrappedMailbox() { | ||
39 | return wrapped; | ||
40 | } | ||
41 | |||
42 | @Override | ||
43 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
44 | this.wrapped.postMessage(direction, update, preprocessor.process(timestamp)); | ||
45 | } | ||
46 | |||
47 | @Override | ||
48 | public String toString() { | ||
49 | return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString(); | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void clear() { | ||
54 | this.wrapped.clear(); | ||
55 | } | ||
56 | |||
57 | @Override | ||
58 | public void deliverAll(final MessageSelector selector) { | ||
59 | this.wrapped.deliverAll(selector); | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public CommunicationGroup getCurrentGroup() { | ||
64 | return this.wrapped.getCurrentGroup(); | ||
65 | } | ||
66 | |||
67 | @Override | ||
68 | public void setCurrentGroup(final CommunicationGroup group) { | ||
69 | this.wrapped.setCurrentGroup(group); | ||
70 | } | ||
71 | |||
72 | @Override | ||
73 | public Receiver getReceiver() { | ||
74 | return this.wrapped.getReceiver(); | ||
75 | } | ||
76 | |||
77 | @Override | ||
78 | public boolean isEmpty() { | ||
79 | return this.wrapped.isEmpty(); | ||
80 | } | ||
81 | |||
82 | @Override | ||
83 | public boolean equals(final Object obj) { | ||
84 | if (obj == null || obj.getClass() != this.getClass()) { | ||
85 | return false; | ||
86 | } else if (obj == this) { | ||
87 | return true; | ||
88 | } else { | ||
89 | final TimelyMailboxProxy that = (TimelyMailboxProxy) obj; | ||
90 | return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor; | ||
91 | } | ||
92 | } | ||
93 | |||
94 | @Override | ||
95 | public int hashCode() { | ||
96 | int hash = 1; | ||
97 | hash = hash * 17 + this.wrapped.hashCode(); | ||
98 | hash = hash * 31 + this.preprocessor.hashCode(); | ||
99 | return hash; | ||
100 | } | ||
101 | |||
102 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java new file mode 100644 index 00000000..8929eb5c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java | |||
@@ -0,0 +1,48 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
12 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
13 | |||
14 | /** | ||
15 | * Values of this enum perform different kind of preprocessing on {@link Timestamp}s. | ||
16 | * This is used on edges leading in and out from {@link Node}s in recursive {@link TimelyCommunicationGroup}s. | ||
17 | * | ||
18 | * @author Tamas Szabo | ||
19 | * @since 2.3 | ||
20 | */ | ||
21 | public enum TimestampTransformation { | ||
22 | |||
23 | INCREMENT { | ||
24 | @Override | ||
25 | public Timestamp process(final Timestamp timestamp) { | ||
26 | return new Timestamp(timestamp.getValue() + 1); | ||
27 | } | ||
28 | |||
29 | @Override | ||
30 | public String toString() { | ||
31 | return "INCREMENT"; | ||
32 | } | ||
33 | }, | ||
34 | RESET { | ||
35 | @Override | ||
36 | public Timestamp process(final Timestamp timestamp) { | ||
37 | return Timestamp.ZERO; | ||
38 | } | ||
39 | |||
40 | @Override | ||
41 | public String toString() { | ||
42 | return "RESET"; | ||
43 | } | ||
44 | }; | ||
45 | |||
46 | public abstract Timestamp process(final Timestamp timestamp); | ||
47 | |||
48 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java new file mode 100644 index 00000000..d6312671 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedCommand.java | |||
@@ -0,0 +1,81 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.delayed; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Map; | ||
13 | import java.util.Map.Entry; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
16 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.Signed; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
19 | import tools.refinery.viatra.runtime.rete.network.Network; | ||
20 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
21 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
22 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
23 | import tools.refinery.viatra.runtime.rete.network.Supplier; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
26 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
27 | |||
28 | /** | ||
29 | * Instances of this class are responsible for initializing a {@link Receiver} with the contents of a {@link Supplier}. | ||
30 | * However, due to the dynamic nature of the Rete {@link Network} and to the fact that certain {@link Node}s in the | ||
31 | * {@link Network} are sensitive to the shape of the {@link Network}, the commands must be delayed until the | ||
32 | * construction of the {@link Network} has stabilized. | ||
33 | * | ||
34 | * @author Tamas Szabo | ||
35 | * @since 2.3 | ||
36 | */ | ||
37 | public abstract class DelayedCommand implements Runnable { | ||
38 | |||
39 | protected final Supplier supplier; | ||
40 | protected final Receiver receiver; | ||
41 | protected final Direction direction; | ||
42 | protected final ReteContainer container; | ||
43 | |||
44 | public DelayedCommand(final Supplier supplier, final Receiver receiver, final Direction direction, | ||
45 | final ReteContainer container) { | ||
46 | this.supplier = supplier; | ||
47 | this.receiver = receiver; | ||
48 | this.direction = direction; | ||
49 | this.container = container; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void run() { | ||
54 | final CommunicationTracker tracker = this.container.getCommunicationTracker(); | ||
55 | final Mailbox mailbox = tracker.proxifyMailbox(this.supplier, this.receiver.getMailbox()); | ||
56 | |||
57 | if (this.isTimestampAware()) { | ||
58 | final Map<Tuple, Timeline<Timestamp>> contents = this.container.pullContentsWithTimeline(this.supplier, | ||
59 | false); | ||
60 | for (final Entry<Tuple, Timeline<Timestamp>> entry : contents.entrySet()) { | ||
61 | for (final Signed<Timestamp> change : entry.getValue().asChangeSequence()) { | ||
62 | mailbox.postMessage(change.getDirection().multiply(this.direction), entry.getKey(), | ||
63 | change.getPayload()); | ||
64 | } | ||
65 | } | ||
66 | } else { | ||
67 | final Collection<Tuple> contents = this.container.pullContents(this.supplier, false); | ||
68 | for (final Tuple tuple : contents) { | ||
69 | mailbox.postMessage(this.direction, tuple, Timestamp.ZERO); | ||
70 | } | ||
71 | } | ||
72 | } | ||
73 | |||
74 | @Override | ||
75 | public String toString() { | ||
76 | return this.supplier + " -> " + this.receiver.toString(); | ||
77 | } | ||
78 | |||
79 | protected abstract boolean isTimestampAware(); | ||
80 | |||
81 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java new file mode 100644 index 00000000..1bfdbec6 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedConnectCommand.java | |||
@@ -0,0 +1,27 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.delayed; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
12 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
13 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
14 | import tools.refinery.viatra.runtime.rete.network.Supplier; | ||
15 | |||
16 | public class DelayedConnectCommand extends DelayedCommand { | ||
17 | |||
18 | public DelayedConnectCommand(final Supplier supplier, final Receiver receiver, final ReteContainer container) { | ||
19 | super(supplier, receiver, Direction.INSERT, container); | ||
20 | } | ||
21 | |||
22 | @Override | ||
23 | protected boolean isTimestampAware() { | ||
24 | return this.container.isTimelyEvaluation() && this.container.getCommunicationTracker().areInSameGroup(this.supplier, this.receiver); | ||
25 | } | ||
26 | |||
27 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java new file mode 100644 index 00000000..5825a971 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/delayed/DelayedDisconnectCommand.java | |||
@@ -0,0 +1,30 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.delayed; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
12 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
13 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
14 | import tools.refinery.viatra.runtime.rete.network.Supplier; | ||
15 | |||
16 | public class DelayedDisconnectCommand extends DelayedCommand { | ||
17 | |||
18 | protected final boolean wasInSameSCC; | ||
19 | |||
20 | public DelayedDisconnectCommand(final Supplier supplier, final Receiver receiver, final ReteContainer container, final boolean wasInSameSCC) { | ||
21 | super(supplier, receiver, Direction.DELETE, container); | ||
22 | this.wasInSameSCC = wasInSameSCC; | ||
23 | } | ||
24 | |||
25 | @Override | ||
26 | protected boolean isTimestampAware() { | ||
27 | return this.wasInSameSCC; | ||
28 | } | ||
29 | |||
30 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java new file mode 100644 index 00000000..da9bc47e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/DefaultMessageIndexer.java | |||
@@ -0,0 +1,74 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.indexer; | ||
10 | |||
11 | import java.util.Collections; | ||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
15 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
16 | |||
17 | /** | ||
18 | * @author Tamas Szabo | ||
19 | * @since 2.0 | ||
20 | */ | ||
21 | public class DefaultMessageIndexer implements MessageIndexer { | ||
22 | |||
23 | protected final Map<Tuple, Integer> indexer; | ||
24 | |||
25 | public DefaultMessageIndexer() { | ||
26 | this.indexer = CollectionsFactory.createMap(); | ||
27 | } | ||
28 | |||
29 | public Map<Tuple, Integer> getTuples() { | ||
30 | return Collections.unmodifiableMap(this.indexer); | ||
31 | } | ||
32 | |||
33 | @Override | ||
34 | public int getCount(final Tuple update) { | ||
35 | final Integer count = getTuples().get(update); | ||
36 | if (count == null) { | ||
37 | return 0; | ||
38 | } else { | ||
39 | return count; | ||
40 | } | ||
41 | } | ||
42 | |||
43 | @Override | ||
44 | public void insert(final Tuple update) { | ||
45 | update(update, 1); | ||
46 | } | ||
47 | |||
48 | @Override | ||
49 | public void delete(final Tuple update) { | ||
50 | update(update, -1); | ||
51 | } | ||
52 | |||
53 | @Override | ||
54 | public void update(final Tuple update, final int delta) { | ||
55 | final Integer oldCount = this.indexer.get(update); | ||
56 | final int newCount = (oldCount == null ? 0 : oldCount) + delta; | ||
57 | if (newCount == 0) { | ||
58 | this.indexer.remove(update); | ||
59 | } else { | ||
60 | this.indexer.put(update, newCount); | ||
61 | } | ||
62 | } | ||
63 | |||
64 | @Override | ||
65 | public boolean isEmpty() { | ||
66 | return this.indexer.isEmpty(); | ||
67 | } | ||
68 | |||
69 | @Override | ||
70 | public void clear() { | ||
71 | this.indexer.clear(); | ||
72 | } | ||
73 | |||
74 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java new file mode 100644 index 00000000..80271252 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/GroupBasedMessageIndexer.java | |||
@@ -0,0 +1,95 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.indexer; | ||
10 | |||
11 | import java.util.Collections; | ||
12 | import java.util.Map; | ||
13 | import java.util.Set; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
18 | |||
19 | /** | ||
20 | * @author Tamas Szabo | ||
21 | * @since 2.0 | ||
22 | */ | ||
23 | public class GroupBasedMessageIndexer implements MessageIndexer { | ||
24 | |||
25 | protected final Map<Tuple, DefaultMessageIndexer> indexer; | ||
26 | protected final TupleMask groupMask; | ||
27 | |||
28 | public GroupBasedMessageIndexer(final TupleMask groupMask) { | ||
29 | this.indexer = CollectionsFactory.createMap(); | ||
30 | this.groupMask = groupMask; | ||
31 | } | ||
32 | |||
33 | public Map<Tuple, Integer> getTuplesByGroup(final Tuple group) { | ||
34 | final DefaultMessageIndexer values = this.indexer.get(group); | ||
35 | if (values == null) { | ||
36 | return Collections.emptyMap(); | ||
37 | } else { | ||
38 | return Collections.unmodifiableMap(values.getTuples()); | ||
39 | } | ||
40 | } | ||
41 | |||
42 | @Override | ||
43 | public int getCount(final Tuple update) { | ||
44 | final Tuple group = this.groupMask.transform(update); | ||
45 | final Integer count = getTuplesByGroup(group).get(update); | ||
46 | if (count == null) { | ||
47 | return 0; | ||
48 | } else { | ||
49 | return count; | ||
50 | } | ||
51 | } | ||
52 | |||
53 | public Set<Tuple> getGroups() { | ||
54 | return Collections.unmodifiableSet(this.indexer.keySet()); | ||
55 | } | ||
56 | |||
57 | @Override | ||
58 | public void insert(final Tuple update) { | ||
59 | update(update, 1); | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public void delete(final Tuple update) { | ||
64 | update(update, -1); | ||
65 | } | ||
66 | |||
67 | @Override | ||
68 | public void update(final Tuple update, final int delta) { | ||
69 | final Tuple group = this.groupMask.transform(update); | ||
70 | DefaultMessageIndexer valueIndexer = this.indexer.get(group); | ||
71 | |||
72 | if (valueIndexer == null) { | ||
73 | valueIndexer = new DefaultMessageIndexer(); | ||
74 | this.indexer.put(group, valueIndexer); | ||
75 | } | ||
76 | |||
77 | valueIndexer.update(update, delta); | ||
78 | |||
79 | // it may happen that the indexer becomes empty as a result of the update | ||
80 | if (valueIndexer.isEmpty()) { | ||
81 | this.indexer.remove(group); | ||
82 | } | ||
83 | } | ||
84 | |||
85 | @Override | ||
86 | public boolean isEmpty() { | ||
87 | return this.indexer.isEmpty(); | ||
88 | } | ||
89 | |||
90 | @Override | ||
91 | public void clear() { | ||
92 | this.indexer.clear(); | ||
93 | } | ||
94 | |||
95 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java new file mode 100644 index 00000000..271aaa44 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/indexer/MessageIndexer.java | |||
@@ -0,0 +1,33 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.indexer; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
13 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
14 | |||
15 | /** | ||
16 | * A message indexer is used by {@link Mailbox}es to index their contents. | ||
17 | * | ||
18 | * @author Tamas Szabo | ||
19 | * @since 2.0 | ||
20 | */ | ||
21 | public interface MessageIndexer extends Clearable { | ||
22 | |||
23 | public void insert(final Tuple update); | ||
24 | |||
25 | public void delete(final Tuple update); | ||
26 | |||
27 | public void update(final Tuple update, final int delta); | ||
28 | |||
29 | public boolean isEmpty(); | ||
30 | |||
31 | public int getCount(final Tuple update); | ||
32 | |||
33 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java new file mode 100644 index 00000000..99097f56 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/AdaptableMailbox.java | |||
@@ -0,0 +1,32 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
12 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy; | ||
13 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
14 | |||
15 | /** | ||
16 | * An adaptable mailbox can be wrapped by another mailbox to act in behalf of that. The significance of the adaptation | ||
17 | * is that the adaptee will notify the {@link CommunicationTracker} about updates by promoting the adapter itself. | ||
18 | * Adaptable mailboxes are used by the {@link BehaviorChangingMailbox}. | ||
19 | * | ||
20 | * Compare this with {@link TimelyMailboxProxy}. That one also wraps another mailbox in order to | ||
21 | * perform preprocessing on the messages sent to the original recipient. | ||
22 | * | ||
23 | * @author Tamas Szabo | ||
24 | * @since 2.0 | ||
25 | */ | ||
26 | public interface AdaptableMailbox extends Mailbox { | ||
27 | |||
28 | public Mailbox getAdapter(); | ||
29 | |||
30 | public void setAdapter(final Mailbox adapter); | ||
31 | |||
32 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java new file mode 100644 index 00000000..8797e254 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/FallThroughCapableMailbox.java | |||
@@ -0,0 +1,30 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
12 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
13 | |||
14 | /** | ||
15 | * A fall through capable mailbox can directly call the update method of its {@link Receiver} instead of using the | ||
16 | * standard post-deliver mailbox semantics. If the fall through flag is set to true, the mailbox uses direct delivery, | ||
17 | * otherwise it operates in the original behavior. The fall through operation is preferable whenever applicable because | ||
18 | * it improves performance. The fall through flag is controlled by the {@link CommunicationTracker} based on the | ||
19 | * receiver node type and network topology. | ||
20 | * | ||
21 | * @author Tamas Szabo | ||
22 | * @since 2.2 | ||
23 | */ | ||
24 | public interface FallThroughCapableMailbox extends Mailbox { | ||
25 | |||
26 | public boolean isFallThrough(); | ||
27 | |||
28 | public void setFallThrough(final boolean fallThrough); | ||
29 | |||
30 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java new file mode 100644 index 00000000..05005974 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/Mailbox.java | |||
@@ -0,0 +1,78 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
14 | import tools.refinery.viatra.runtime.rete.network.IGroupable; | ||
15 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
19 | |||
20 | /** | ||
21 | * A mailbox is associated with every {@link Receiver}. Messages can be sent to a {@link Receiver} by posting them into | ||
22 | * the mailbox. Different mailbox implementations may differ in the way how they deliver the posted messages. | ||
23 | * | ||
24 | * @author Tamas Szabo | ||
25 | * @since 2.0 | ||
26 | * | ||
27 | */ | ||
28 | public interface Mailbox extends Clearable, IGroupable { | ||
29 | |||
30 | /** | ||
31 | * Posts a new message to this mailbox. | ||
32 | * | ||
33 | * @param direction | ||
34 | * the direction of the update | ||
35 | * @param update | ||
36 | * the update element | ||
37 | * @since 2.4 | ||
38 | */ | ||
39 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp); | ||
40 | |||
41 | /** | ||
42 | * Delivers all messages according to the given selector from this mailbox. The selector can also be null. In this case, no | ||
43 | * special separation is expected between the messages. | ||
44 | * | ||
45 | * @param selector the message selector | ||
46 | */ | ||
47 | public void deliverAll(final MessageSelector selector); | ||
48 | |||
49 | /** | ||
50 | * Returns the {@link Receiver} of this mailbox. | ||
51 | * | ||
52 | * @return the receiver | ||
53 | */ | ||
54 | public Receiver getReceiver(); | ||
55 | |||
56 | /** | ||
57 | * Returns the {@link CommunicationGroup} of the receiver of this mailbox. | ||
58 | * | ||
59 | * @return the communication group | ||
60 | */ | ||
61 | public CommunicationGroup getCurrentGroup(); | ||
62 | |||
63 | /** | ||
64 | * Sets the {@link CommunicationGroup} that the receiver of this mailbox is associated with. | ||
65 | * | ||
66 | * @param group | ||
67 | * the communication group | ||
68 | */ | ||
69 | public void setCurrentGroup(final CommunicationGroup group); | ||
70 | |||
71 | /** | ||
72 | * Returns true if this mailbox is empty. | ||
73 | * | ||
74 | * @return | ||
75 | */ | ||
76 | public boolean isEmpty(); | ||
77 | |||
78 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java new file mode 100644 index 00000000..2c5255fb --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/MessageIndexerFactory.java | |||
@@ -0,0 +1,23 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.indexer.MessageIndexer; | ||
12 | |||
13 | /** | ||
14 | * A factory used to create message indexers for {@link Mailbox}es. | ||
15 | * | ||
16 | * @author Tamas Szabo | ||
17 | * @since 2.0 | ||
18 | */ | ||
19 | public interface MessageIndexerFactory<I extends MessageIndexer> { | ||
20 | |||
21 | public I create(); | ||
22 | |||
23 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java new file mode 100644 index 00000000..1e1ada71 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/AbstractUpdateSplittingMailbox.java | |||
@@ -0,0 +1,109 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
12 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
13 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
14 | import tools.refinery.viatra.runtime.rete.network.indexer.MessageIndexer; | ||
15 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
16 | import tools.refinery.viatra.runtime.rete.network.mailbox.MessageIndexerFactory; | ||
17 | |||
18 | /** | ||
19 | * An abstract mailbox implementation that is capable of splitting update messages based on some form of monotonicity | ||
20 | * (anti-monotone and monotone). The monotonicity is either defined by the less or equal operator of a poset or, it can | ||
21 | * be the standard subset ordering among sets of tuples. | ||
22 | * | ||
23 | * @author Tamas Szabo | ||
24 | * @since 2.0 | ||
25 | * | ||
26 | */ | ||
27 | public abstract class AbstractUpdateSplittingMailbox<IndexerType extends MessageIndexer, ReceiverType extends Receiver> implements Mailbox { | ||
28 | |||
29 | protected IndexerType monotoneQueue; | ||
30 | protected IndexerType antiMonotoneQueue; | ||
31 | protected IndexerType monotoneBuffer; | ||
32 | protected IndexerType antiMonotoneBuffer; | ||
33 | protected boolean deliveringMonotone; | ||
34 | protected boolean deliveringAntiMonotone; | ||
35 | protected final ReceiverType receiver; | ||
36 | protected final ReteContainer container; | ||
37 | protected CommunicationGroup group; | ||
38 | |||
39 | public AbstractUpdateSplittingMailbox(final ReceiverType receiver, final ReteContainer container, | ||
40 | final MessageIndexerFactory<IndexerType> factory) { | ||
41 | this.receiver = receiver; | ||
42 | this.container = container; | ||
43 | this.monotoneQueue = factory.create(); | ||
44 | this.antiMonotoneQueue = factory.create(); | ||
45 | this.monotoneBuffer = factory.create(); | ||
46 | this.antiMonotoneBuffer = factory.create(); | ||
47 | this.deliveringMonotone = false; | ||
48 | this.deliveringAntiMonotone = false; | ||
49 | } | ||
50 | |||
51 | protected void swapAndClearMonotone() { | ||
52 | final IndexerType tmp = this.monotoneQueue; | ||
53 | this.monotoneQueue = this.monotoneBuffer; | ||
54 | this.monotoneBuffer = tmp; | ||
55 | this.monotoneBuffer.clear(); | ||
56 | } | ||
57 | |||
58 | protected void swapAndClearAntiMonotone() { | ||
59 | final IndexerType tmp = this.antiMonotoneQueue; | ||
60 | this.antiMonotoneQueue = this.antiMonotoneBuffer; | ||
61 | this.antiMonotoneBuffer = tmp; | ||
62 | this.antiMonotoneBuffer.clear(); | ||
63 | } | ||
64 | |||
65 | protected IndexerType getActiveMonotoneQueue() { | ||
66 | if (this.deliveringMonotone) { | ||
67 | return this.monotoneBuffer; | ||
68 | } else { | ||
69 | return this.monotoneQueue; | ||
70 | } | ||
71 | } | ||
72 | |||
73 | protected IndexerType getActiveAntiMonotoneQueue() { | ||
74 | if (this.deliveringAntiMonotone) { | ||
75 | return this.antiMonotoneBuffer; | ||
76 | } else { | ||
77 | return this.antiMonotoneQueue; | ||
78 | } | ||
79 | } | ||
80 | |||
81 | @Override | ||
82 | public ReceiverType getReceiver() { | ||
83 | return this.receiver; | ||
84 | } | ||
85 | |||
86 | @Override | ||
87 | public void clear() { | ||
88 | this.monotoneQueue.clear(); | ||
89 | this.antiMonotoneQueue.clear(); | ||
90 | this.monotoneBuffer.clear(); | ||
91 | this.antiMonotoneBuffer.clear(); | ||
92 | } | ||
93 | |||
94 | @Override | ||
95 | public boolean isEmpty() { | ||
96 | return this.getActiveMonotoneQueue().isEmpty() && this.getActiveAntiMonotoneQueue().isEmpty(); | ||
97 | } | ||
98 | |||
99 | @Override | ||
100 | public CommunicationGroup getCurrentGroup() { | ||
101 | return this.group; | ||
102 | } | ||
103 | |||
104 | @Override | ||
105 | public void setCurrentGroup(final CommunicationGroup group) { | ||
106 | this.group = group; | ||
107 | } | ||
108 | |||
109 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java new file mode 100644 index 00000000..fe822d7c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/BehaviorChangingMailbox.java | |||
@@ -0,0 +1,117 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
13 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
14 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
15 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker; | ||
21 | import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox; | ||
22 | import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox; | ||
23 | |||
24 | /** | ||
25 | * This mailbox changes its behavior based on the position of its {@link Receiver} in the network topology. | ||
26 | * It either behaves as a {@link DefaultMailbox} or as an {@link UpdateSplittingMailbox}. The decision is made by the | ||
27 | * {@link CommunicationTracker}, see {@link TimelessCommunicationTracker#postProcessNode(Node)} for more details. | ||
28 | * | ||
29 | * @author Tamas Szabo | ||
30 | */ | ||
31 | public class BehaviorChangingMailbox implements FallThroughCapableMailbox { | ||
32 | |||
33 | protected boolean fallThrough; | ||
34 | protected boolean split; | ||
35 | protected AdaptableMailbox wrapped; | ||
36 | protected final Receiver receiver; | ||
37 | protected final ReteContainer container; | ||
38 | protected CommunicationGroup group; | ||
39 | |||
40 | public BehaviorChangingMailbox(final Receiver receiver, final ReteContainer container) { | ||
41 | this.fallThrough = false; | ||
42 | this.split = false; | ||
43 | this.receiver = receiver; | ||
44 | this.container = container; | ||
45 | this.wrapped = new DefaultMailbox(receiver, container); | ||
46 | this.wrapped.setAdapter(this); | ||
47 | } | ||
48 | |||
49 | @Override | ||
50 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
51 | if (this.fallThrough && !this.container.isExecutingDelayedCommands()) { | ||
52 | // disable fall through while we are in the middle of executing delayed construction commands | ||
53 | this.receiver.update(direction, update, timestamp); | ||
54 | } else { | ||
55 | this.wrapped.postMessage(direction, update, timestamp); | ||
56 | } | ||
57 | } | ||
58 | |||
59 | @Override | ||
60 | public void deliverAll(final MessageSelector kind) { | ||
61 | this.wrapped.deliverAll(kind); | ||
62 | } | ||
63 | |||
64 | @Override | ||
65 | public String toString() { | ||
66 | return "A_MBOX -> " + this.wrapped; | ||
67 | } | ||
68 | |||
69 | public void setSplitFlag(final boolean splitValue) { | ||
70 | if (this.split != splitValue) { | ||
71 | assert isEmpty(); | ||
72 | if (splitValue) { | ||
73 | this.wrapped = new UpdateSplittingMailbox(this.receiver, this.container); | ||
74 | } else { | ||
75 | this.wrapped = new DefaultMailbox(this.receiver, this.container); | ||
76 | } | ||
77 | this.wrapped.setAdapter(this); | ||
78 | this.split = splitValue; | ||
79 | } | ||
80 | } | ||
81 | |||
82 | @Override | ||
83 | public boolean isEmpty() { | ||
84 | return this.wrapped.isEmpty(); | ||
85 | } | ||
86 | |||
87 | @Override | ||
88 | public void clear() { | ||
89 | this.wrapped.clear(); | ||
90 | } | ||
91 | |||
92 | @Override | ||
93 | public Receiver getReceiver() { | ||
94 | return this.receiver; | ||
95 | } | ||
96 | |||
97 | @Override | ||
98 | public CommunicationGroup getCurrentGroup() { | ||
99 | return this.group; | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void setCurrentGroup(final CommunicationGroup group) { | ||
104 | this.group = group; | ||
105 | } | ||
106 | |||
107 | @Override | ||
108 | public boolean isFallThrough() { | ||
109 | return this.fallThrough; | ||
110 | } | ||
111 | |||
112 | @Override | ||
113 | public void setFallThrough(final boolean fallThrough) { | ||
114 | this.fallThrough = fallThrough; | ||
115 | } | ||
116 | |||
117 | } \ No newline at end of file | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java new file mode 100644 index 00000000..baf7270f --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java | |||
@@ -0,0 +1,163 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import java.util.Map; | ||
12 | |||
13 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
14 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
15 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
16 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
17 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
22 | import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox; | ||
23 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
24 | |||
25 | /** | ||
26 | * Default mailbox implementation. | ||
27 | * <p> | ||
28 | * Usually, the mailbox performs counting of messages so that they can cancel each other out. However, if marked as a | ||
29 | * fall-through mailbox, than update messages are delivered directly to the receiver node to reduce overhead. | ||
30 | * | ||
31 | * @author Tamas Szabo | ||
32 | * @since 2.0 | ||
33 | */ | ||
34 | public class DefaultMailbox implements AdaptableMailbox { | ||
35 | |||
36 | private static int SIZE_TRESHOLD = 127; | ||
37 | |||
38 | protected Map<Tuple, Integer> queue; | ||
39 | protected Map<Tuple, Integer> buffer; | ||
40 | protected final Receiver receiver; | ||
41 | protected final ReteContainer container; | ||
42 | protected boolean delivering; | ||
43 | protected Mailbox adapter; | ||
44 | protected CommunicationGroup group; | ||
45 | |||
46 | public DefaultMailbox(final Receiver receiver, final ReteContainer container) { | ||
47 | this.receiver = receiver; | ||
48 | this.container = container; | ||
49 | this.queue = CollectionsFactory.createMap(); | ||
50 | this.buffer = CollectionsFactory.createMap(); | ||
51 | this.adapter = this; | ||
52 | } | ||
53 | |||
54 | protected Map<Tuple, Integer> getActiveQueue() { | ||
55 | if (this.delivering) { | ||
56 | return this.buffer; | ||
57 | } else { | ||
58 | return this.queue; | ||
59 | } | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public Mailbox getAdapter() { | ||
64 | return this.adapter; | ||
65 | } | ||
66 | |||
67 | @Override | ||
68 | public void setAdapter(final Mailbox adapter) { | ||
69 | this.adapter = adapter; | ||
70 | } | ||
71 | |||
72 | @Override | ||
73 | public boolean isEmpty() { | ||
74 | return getActiveQueue().isEmpty(); | ||
75 | } | ||
76 | |||
77 | @Override | ||
78 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
79 | final Map<Tuple, Integer> activeQueue = getActiveQueue(); | ||
80 | final boolean wasEmpty = activeQueue.isEmpty(); | ||
81 | |||
82 | boolean significantChange = false; | ||
83 | Integer count = activeQueue.get(update); | ||
84 | if (count == null) { | ||
85 | count = 0; | ||
86 | significantChange = true; | ||
87 | } | ||
88 | |||
89 | if (direction == Direction.DELETE) { | ||
90 | count--; | ||
91 | } else { | ||
92 | count++; | ||
93 | } | ||
94 | |||
95 | if (count == 0) { | ||
96 | activeQueue.remove(update); | ||
97 | significantChange = true; | ||
98 | } else { | ||
99 | activeQueue.put(update, count); | ||
100 | } | ||
101 | |||
102 | if (significantChange) { | ||
103 | final Mailbox targetMailbox = this.adapter; | ||
104 | final CommunicationGroup targetGroup = this.adapter.getCurrentGroup(); | ||
105 | |||
106 | if (wasEmpty) { | ||
107 | targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.DEFAULT); | ||
108 | } else if (activeQueue.isEmpty()) { | ||
109 | targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.DEFAULT); | ||
110 | } | ||
111 | } | ||
112 | } | ||
113 | |||
114 | @Override | ||
115 | public void deliverAll(final MessageSelector kind) { | ||
116 | if (kind == PhasedSelector.DEFAULT) { | ||
117 | // use the buffer during delivering so that there is a clear | ||
118 | // separation between the stages | ||
119 | this.delivering = true; | ||
120 | this.receiver.batchUpdate(this.queue.entrySet(), Timestamp.ZERO); | ||
121 | this.delivering = false; | ||
122 | |||
123 | if (queue.size() > SIZE_TRESHOLD) { | ||
124 | this.queue = this.buffer; | ||
125 | this.buffer = CollectionsFactory.createMap(); | ||
126 | } else { | ||
127 | this.queue.clear(); | ||
128 | final Map<Tuple, Integer> tmpQueue = this.queue; | ||
129 | this.queue = this.buffer; | ||
130 | this.buffer = tmpQueue; | ||
131 | } | ||
132 | } else { | ||
133 | throw new IllegalArgumentException("Unsupported message kind " + kind); | ||
134 | } | ||
135 | } | ||
136 | |||
137 | @Override | ||
138 | public String toString() { | ||
139 | return "D_MBOX (" + this.receiver + ") " + this.getActiveQueue(); | ||
140 | } | ||
141 | |||
142 | @Override | ||
143 | public Receiver getReceiver() { | ||
144 | return this.receiver; | ||
145 | } | ||
146 | |||
147 | @Override | ||
148 | public void clear() { | ||
149 | this.queue.clear(); | ||
150 | this.buffer.clear(); | ||
151 | } | ||
152 | |||
153 | @Override | ||
154 | public CommunicationGroup getCurrentGroup() { | ||
155 | return this.group; | ||
156 | } | ||
157 | |||
158 | @Override | ||
159 | public void setCurrentGroup(final CommunicationGroup group) { | ||
160 | this.group = group; | ||
161 | } | ||
162 | |||
163 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java new file mode 100644 index 00000000..50d19882 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java | |||
@@ -0,0 +1,218 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import java.util.HashSet; | ||
12 | import java.util.Map.Entry; | ||
13 | import java.util.Set; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.context.IPosetComparator; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
20 | import tools.refinery.viatra.runtime.rete.network.PosetAwareReceiver; | ||
21 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
25 | import tools.refinery.viatra.runtime.rete.network.indexer.GroupBasedMessageIndexer; | ||
26 | |||
27 | /** | ||
28 | * A monotonicity aware mailbox implementation. The mailbox uses an {@link IPosetComparator} to identify if a pair of | ||
29 | * REVOKE - INSERT updates represent a monotone change pair. The mailbox is used by {@link PosetAwareReceiver}s. | ||
30 | * | ||
31 | * @author Tamas Szabo | ||
32 | * @since 2.0 | ||
33 | */ | ||
34 | public class PosetAwareMailbox extends AbstractUpdateSplittingMailbox<GroupBasedMessageIndexer, PosetAwareReceiver> { | ||
35 | |||
36 | protected final TupleMask groupMask; | ||
37 | |||
38 | public PosetAwareMailbox(final PosetAwareReceiver receiver, final ReteContainer container) { | ||
39 | super(receiver, container, () -> new GroupBasedMessageIndexer(receiver.getCoreMask())); | ||
40 | this.groupMask = receiver.getCoreMask(); | ||
41 | } | ||
42 | |||
43 | @Override | ||
44 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
45 | final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
46 | final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
47 | final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0; | ||
48 | final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0; | ||
49 | final TupleMask coreMask = this.receiver.getCoreMask(); | ||
50 | |||
51 | // it cannot happen that it was present in both | ||
52 | assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone); | ||
53 | |||
54 | if (direction == Direction.INSERT) { | ||
55 | if (wasPresentAsAntiMonotone) { | ||
56 | // it was an anti-monotone one before | ||
57 | antiMonotoneQueue.insert(update); | ||
58 | } else { | ||
59 | // it was a monotone one before or did not exist at all | ||
60 | monotoneQueue.insert(update); | ||
61 | |||
62 | // if it was not present in the monotone queue before, then | ||
63 | // we need to check whether it makes REVOKE updates monotone | ||
64 | if (!wasPresentAsMonotone) { | ||
65 | final Set<Tuple> counterParts = tryFindCounterPart(update, false, true); | ||
66 | for (final Tuple counterPart : counterParts) { | ||
67 | final int count = antiMonotoneQueue.getCount(counterPart); | ||
68 | assert count < 0; | ||
69 | antiMonotoneQueue.update(counterPart, -count); | ||
70 | monotoneQueue.update(counterPart, count); | ||
71 | } | ||
72 | } | ||
73 | } | ||
74 | } else { | ||
75 | if (wasPresentAsAntiMonotone) { | ||
76 | // it was an anti-monotone one before | ||
77 | antiMonotoneQueue.delete(update); | ||
78 | } else if (wasPresentAsMonotone) { | ||
79 | // it was a monotone one before | ||
80 | monotoneQueue.delete(update); | ||
81 | |||
82 | // and we need to check whether the monotone REVOKE updates | ||
83 | // still have a reinforcing counterpart | ||
84 | final Set<Tuple> candidates = new HashSet<Tuple>(); | ||
85 | final Tuple key = coreMask.transform(update); | ||
86 | for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(key).entrySet()) { | ||
87 | if (entry.getValue() < 0) { | ||
88 | final Tuple candidate = entry.getKey(); | ||
89 | final Set<Tuple> counterParts = tryFindCounterPart(candidate, true, false); | ||
90 | if (counterParts.isEmpty()) { | ||
91 | // all of them are gone | ||
92 | candidates.add(candidate); | ||
93 | } | ||
94 | } | ||
95 | } | ||
96 | |||
97 | // move the candidates from the monotone queue to the | ||
98 | // anti-monotone queue because they do not have a | ||
99 | // counterpart anymore | ||
100 | for (final Tuple candidate : candidates) { | ||
101 | final int count = monotoneQueue.getCount(candidate); | ||
102 | assert count < 0; | ||
103 | monotoneQueue.update(candidate, -count); | ||
104 | antiMonotoneQueue.update(candidate, count); | ||
105 | } | ||
106 | } else { | ||
107 | // it did not exist before | ||
108 | final Set<Tuple> counterParts = tryFindCounterPart(update, true, false); | ||
109 | if (counterParts.isEmpty()) { | ||
110 | // there is no tuple that would make this update monotone | ||
111 | antiMonotoneQueue.delete(update); | ||
112 | } else { | ||
113 | // there is a reinforcing counterpart | ||
114 | monotoneQueue.delete(update); | ||
115 | } | ||
116 | } | ||
117 | } | ||
118 | |||
119 | if (antiMonotoneQueue.isEmpty()) { | ||
120 | this.group.notifyLostAllMessages(this, PhasedSelector.ANTI_MONOTONE); | ||
121 | } else { | ||
122 | this.group.notifyHasMessage(this, PhasedSelector.ANTI_MONOTONE); | ||
123 | } | ||
124 | |||
125 | if (monotoneQueue.isEmpty()) { | ||
126 | this.group.notifyLostAllMessages(this, PhasedSelector.MONOTONE); | ||
127 | } else { | ||
128 | this.group.notifyHasMessage(this, PhasedSelector.MONOTONE); | ||
129 | } | ||
130 | } | ||
131 | |||
132 | protected Set<Tuple> tryFindCounterPart(final Tuple first, final boolean findPositiveCounterPart, | ||
133 | final boolean findAllCounterParts) { | ||
134 | final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
135 | final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
136 | final TupleMask coreMask = this.receiver.getCoreMask(); | ||
137 | final TupleMask posetMask = this.receiver.getPosetMask(); | ||
138 | final IPosetComparator posetComparator = this.receiver.getPosetComparator(); | ||
139 | final Set<Tuple> result = CollectionsFactory.createSet(); | ||
140 | final Tuple firstKey = coreMask.transform(first); | ||
141 | final Tuple firstValue = posetMask.transform(first); | ||
142 | |||
143 | if (findPositiveCounterPart) { | ||
144 | for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(firstKey).entrySet()) { | ||
145 | final Tuple secondValue = posetMask.transform(entry.getKey()); | ||
146 | if (entry.getValue() > 0 && posetComparator.isLessOrEqual(firstValue, secondValue)) { | ||
147 | result.add(entry.getKey()); | ||
148 | if (!findAllCounterParts) { | ||
149 | return result; | ||
150 | } | ||
151 | } | ||
152 | } | ||
153 | } else { | ||
154 | for (final Entry<Tuple, Integer> entry : antiMonotoneQueue.getTuplesByGroup(firstKey).entrySet()) { | ||
155 | final Tuple secondValue = posetMask.transform(entry.getKey()); | ||
156 | if (posetComparator.isLessOrEqual(secondValue, firstValue)) { | ||
157 | result.add(entry.getKey()); | ||
158 | if (!findAllCounterParts) { | ||
159 | return result; | ||
160 | } | ||
161 | } | ||
162 | } | ||
163 | } | ||
164 | |||
165 | return result; | ||
166 | } | ||
167 | |||
168 | @Override | ||
169 | public void deliverAll(final MessageSelector kind) { | ||
170 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
171 | // use the buffer during delivering so that there is a clear | ||
172 | // separation between the stages | ||
173 | this.deliveringAntiMonotone = true; | ||
174 | |||
175 | for (final Tuple group : this.antiMonotoneQueue.getGroups()) { | ||
176 | for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuplesByGroup(group).entrySet()) { | ||
177 | final Tuple update = entry.getKey(); | ||
178 | final int count = entry.getValue(); | ||
179 | assert count < 0; | ||
180 | for (int i = 0; i < Math.abs(count); i++) { | ||
181 | this.receiver.updateWithPosetInfo(Direction.DELETE, update, false); | ||
182 | } | ||
183 | } | ||
184 | } | ||
185 | |||
186 | this.deliveringAntiMonotone = false; | ||
187 | swapAndClearAntiMonotone(); | ||
188 | } else if (kind == PhasedSelector.MONOTONE) { | ||
189 | // use the buffer during delivering so that there is a clear | ||
190 | // separation between the stages | ||
191 | this.deliveringMonotone = true; | ||
192 | |||
193 | for (final Tuple group : this.monotoneQueue.getGroups()) { | ||
194 | for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuplesByGroup(group).entrySet()) { | ||
195 | final Tuple update = entry.getKey(); | ||
196 | final int count = entry.getValue(); | ||
197 | assert count != 0; | ||
198 | final Direction direction = count < 0 ? Direction.DELETE : Direction.INSERT; | ||
199 | for (int i = 0; i < Math.abs(count); i++) { | ||
200 | this.receiver.updateWithPosetInfo(direction, update, true); | ||
201 | } | ||
202 | } | ||
203 | } | ||
204 | |||
205 | this.deliveringMonotone = false; | ||
206 | swapAndClearMonotone(); | ||
207 | } else { | ||
208 | throw new IllegalArgumentException("Unsupported message kind " + kind); | ||
209 | } | ||
210 | } | ||
211 | |||
212 | @Override | ||
213 | public String toString() { | ||
214 | return "PA_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " " | ||
215 | + this.getActiveAntiMonotoneQueue(); | ||
216 | } | ||
217 | |||
218 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java new file mode 100644 index 00000000..afa155b2 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java | |||
@@ -0,0 +1,135 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import java.util.Map.Entry; | ||
12 | |||
13 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
14 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
15 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
16 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
21 | import tools.refinery.viatra.runtime.rete.network.indexer.DefaultMessageIndexer; | ||
22 | import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox; | ||
23 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
24 | |||
25 | /** | ||
26 | * A mailbox implementation that splits updates messages according to the standard subset ordering into anti-monotonic | ||
27 | * (deletions) and monotonic (insertions) updates. | ||
28 | * | ||
29 | * @author Tamas Szabo | ||
30 | * @since 2.0 | ||
31 | */ | ||
32 | public class UpdateSplittingMailbox extends AbstractUpdateSplittingMailbox<DefaultMessageIndexer, Receiver> | ||
33 | implements AdaptableMailbox { | ||
34 | |||
35 | protected Mailbox adapter; | ||
36 | |||
37 | public UpdateSplittingMailbox(final Receiver receiver, final ReteContainer container) { | ||
38 | super(receiver, container, DefaultMessageIndexer::new); | ||
39 | this.adapter = this; | ||
40 | } | ||
41 | |||
42 | @Override | ||
43 | public Mailbox getAdapter() { | ||
44 | return this.adapter; | ||
45 | } | ||
46 | |||
47 | @Override | ||
48 | public void setAdapter(final Mailbox adapter) { | ||
49 | this.adapter = adapter; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
54 | final DefaultMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
55 | final DefaultMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
56 | final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0; | ||
57 | final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0; | ||
58 | |||
59 | // it cannot happen that it was present in both | ||
60 | assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone); | ||
61 | |||
62 | if (direction == Direction.INSERT) { | ||
63 | if (wasPresentAsAntiMonotone) { | ||
64 | // it was an anti-monotone one before | ||
65 | antiMonotoneQueue.insert(update); | ||
66 | } else { | ||
67 | // it was a monotone one before or did not exist at all | ||
68 | monotoneQueue.insert(update); | ||
69 | } | ||
70 | } else { | ||
71 | if (wasPresentAsMonotone) { | ||
72 | // it was a monotone one before | ||
73 | monotoneQueue.delete(update); | ||
74 | } else { | ||
75 | // it was an anti-monotone one before or did not exist at all | ||
76 | antiMonotoneQueue.delete(update); | ||
77 | } | ||
78 | } | ||
79 | |||
80 | final Mailbox targetMailbox = this.adapter; | ||
81 | final CommunicationGroup targetGroup = this.adapter.getCurrentGroup(); | ||
82 | |||
83 | if (antiMonotoneQueue.isEmpty()) { | ||
84 | targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.ANTI_MONOTONE); | ||
85 | } else { | ||
86 | targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.ANTI_MONOTONE); | ||
87 | } | ||
88 | |||
89 | if (monotoneQueue.isEmpty()) { | ||
90 | targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.MONOTONE); | ||
91 | } else { | ||
92 | targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.MONOTONE); | ||
93 | } | ||
94 | } | ||
95 | |||
96 | @Override | ||
97 | public void deliverAll(final MessageSelector kind) { | ||
98 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
99 | // deliver anti-monotone | ||
100 | this.deliveringAntiMonotone = true; | ||
101 | for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuples().entrySet()) { | ||
102 | final Tuple update = entry.getKey(); | ||
103 | final int count = entry.getValue(); | ||
104 | assert count < 0; | ||
105 | for (int i = 0; i < Math.abs(count); i++) { | ||
106 | this.receiver.update(Direction.DELETE, update, Timestamp.ZERO); | ||
107 | } | ||
108 | } | ||
109 | this.deliveringAntiMonotone = false; | ||
110 | swapAndClearAntiMonotone(); | ||
111 | } else if (kind == PhasedSelector.MONOTONE) { | ||
112 | // deliver monotone | ||
113 | this.deliveringMonotone = true; | ||
114 | for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuples().entrySet()) { | ||
115 | final Tuple update = entry.getKey(); | ||
116 | final int count = entry.getValue(); | ||
117 | assert count > 0; | ||
118 | for (int i = 0; i < count; i++) { | ||
119 | this.receiver.update(Direction.INSERT, update, Timestamp.ZERO); | ||
120 | } | ||
121 | } | ||
122 | this.deliveringMonotone = false; | ||
123 | swapAndClearMonotone(); | ||
124 | } else { | ||
125 | throw new IllegalArgumentException("Unsupported message kind " + kind); | ||
126 | } | ||
127 | } | ||
128 | |||
129 | @Override | ||
130 | public String toString() { | ||
131 | return "US_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " " | ||
132 | + this.getActiveAntiMonotoneQueue(); | ||
133 | } | ||
134 | |||
135 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java new file mode 100644 index 00000000..bf3b8e14 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java | |||
@@ -0,0 +1,150 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timely; | ||
10 | |||
11 | import java.util.Map; | ||
12 | import java.util.TreeMap; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
15 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
16 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
17 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
18 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
19 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode; | ||
24 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
25 | |||
26 | public class TimelyMailbox implements Mailbox { | ||
27 | |||
28 | protected TreeMap<Timestamp, Map<Tuple, Integer>> queue; | ||
29 | protected final Receiver receiver; | ||
30 | protected final ReteContainer container; | ||
31 | protected CommunicationGroup group; | ||
32 | protected boolean fallThrough; | ||
33 | |||
34 | public TimelyMailbox(final Receiver receiver, final ReteContainer container) { | ||
35 | this.receiver = receiver; | ||
36 | this.container = container; | ||
37 | this.queue = CollectionsFactory.createTreeMap(); | ||
38 | } | ||
39 | |||
40 | protected TreeMap<Timestamp, Map<Tuple, Integer>> getActiveQueue() { | ||
41 | return this.queue; | ||
42 | } | ||
43 | |||
44 | @Override | ||
45 | public boolean isEmpty() { | ||
46 | return getActiveQueue().isEmpty(); | ||
47 | } | ||
48 | |||
49 | @Override | ||
50 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
51 | final TreeMap<Timestamp, Map<Tuple, Integer>> activeQueue = getActiveQueue(); | ||
52 | |||
53 | Map<Tuple, Integer> tupleMap = activeQueue.get(timestamp); | ||
54 | final boolean wasEmpty = tupleMap == null; | ||
55 | boolean significantChange = false; | ||
56 | |||
57 | if (tupleMap == null) { | ||
58 | tupleMap = CollectionsFactory.createMap(); | ||
59 | activeQueue.put(timestamp, tupleMap); | ||
60 | significantChange = true; | ||
61 | } | ||
62 | |||
63 | Integer count = tupleMap.get(update); | ||
64 | if (count == null) { | ||
65 | count = 0; | ||
66 | significantChange = true; | ||
67 | } | ||
68 | |||
69 | if (direction == Direction.DELETE) { | ||
70 | count--; | ||
71 | } else { | ||
72 | count++; | ||
73 | } | ||
74 | |||
75 | if (count == 0) { | ||
76 | tupleMap.remove(update); | ||
77 | if (tupleMap.isEmpty()) { | ||
78 | activeQueue.remove(timestamp); | ||
79 | } | ||
80 | significantChange = true; | ||
81 | } else { | ||
82 | tupleMap.put(update, count); | ||
83 | } | ||
84 | |||
85 | if (significantChange) { | ||
86 | if (wasEmpty) { | ||
87 | this.group.notifyHasMessage(this, timestamp); | ||
88 | } else if (tupleMap.isEmpty()) { | ||
89 | final Timestamp resumableTimestamp = (this.receiver instanceof ResumableNode) | ||
90 | ? ((ResumableNode) this.receiver).getResumableTimestamp() | ||
91 | : null; | ||
92 | // check if there is folding left to do before unsubscribing just based on the message queue being empty | ||
93 | if (resumableTimestamp == null || resumableTimestamp.compareTo(timestamp) != 0) { | ||
94 | this.group.notifyLostAllMessages(this, timestamp); | ||
95 | } | ||
96 | } | ||
97 | } | ||
98 | } | ||
99 | |||
100 | @Override | ||
101 | public void deliverAll(final MessageSelector selector) { | ||
102 | if (selector instanceof Timestamp) { | ||
103 | final Timestamp timestamp = (Timestamp) selector; | ||
104 | // REMOVE the tuples associated with the selector, dont just query them | ||
105 | final Map<Tuple, Integer> tupleMap = this.queue.remove(timestamp); | ||
106 | |||
107 | // tupleMap may be empty if we only have lazy folding to do | ||
108 | if (tupleMap != null) { | ||
109 | this.receiver.batchUpdate(tupleMap.entrySet(), timestamp); | ||
110 | } | ||
111 | |||
112 | if (this.container.getTimelyConfiguration() | ||
113 | .getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { | ||
114 | // (1) either normal delivery, which ended up being a lazy folding state | ||
115 | // (2) and/or lazy folding needs to be resumed | ||
116 | if (this.receiver instanceof ResumableNode) { | ||
117 | ((ResumableNode) this.receiver).resumeAt(timestamp); | ||
118 | } | ||
119 | } | ||
120 | } else { | ||
121 | throw new IllegalArgumentException("Unsupported message selector " + selector); | ||
122 | } | ||
123 | } | ||
124 | |||
125 | @Override | ||
126 | public String toString() { | ||
127 | return "DDF_MBOX (" + this.receiver + ") " + this.getActiveQueue(); | ||
128 | } | ||
129 | |||
130 | @Override | ||
131 | public Receiver getReceiver() { | ||
132 | return this.receiver; | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void clear() { | ||
137 | this.queue.clear(); | ||
138 | } | ||
139 | |||
140 | @Override | ||
141 | public CommunicationGroup getCurrentGroup() { | ||
142 | return this.group; | ||
143 | } | ||
144 | |||
145 | @Override | ||
146 | public void setCurrentGroup(final CommunicationGroup group) { | ||
147 | this.group = group; | ||
148 | } | ||
149 | |||
150 | } | ||