diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication')
15 files changed, 1832 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java new file mode 100644 index 00000000..8cedeb11 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationGroup.java | |||
@@ -0,0 +1,103 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
15 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
16 | |||
17 | /** | ||
18 | * A communication group represents a set of nodes in the communication graph that form a strongly connected component. | ||
19 | * | ||
20 | * @author Tamas Szabo | ||
21 | * @since 1.6 | ||
22 | */ | ||
23 | public abstract class CommunicationGroup implements Comparable<CommunicationGroup> { | ||
24 | |||
25 | public static final String UNSUPPORTED_MESSAGE_KIND = "Unsupported message kind "; | ||
26 | |||
27 | /** | ||
28 | * Marker for the {@link CommunicationTracker} | ||
29 | */ | ||
30 | public boolean isEnqueued = false; | ||
31 | |||
32 | protected final Node representative; | ||
33 | |||
34 | /** | ||
35 | * May be changed during bumping in {@link CommunicationTracker.registerDependency} | ||
36 | */ | ||
37 | protected int identifier; | ||
38 | |||
39 | /** | ||
40 | * @since 1.7 | ||
41 | */ | ||
42 | protected final CommunicationTracker tracker; | ||
43 | |||
44 | /** | ||
45 | * @since 1.7 | ||
46 | */ | ||
47 | public CommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
48 | this.tracker = tracker; | ||
49 | this.representative = representative; | ||
50 | this.identifier = identifier; | ||
51 | } | ||
52 | |||
53 | public abstract void deliverMessages(); | ||
54 | |||
55 | public Node getRepresentative() { | ||
56 | return representative; | ||
57 | } | ||
58 | |||
59 | public abstract boolean isEmpty(); | ||
60 | |||
61 | /** | ||
62 | * @since 2.0 | ||
63 | */ | ||
64 | public abstract void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind); | ||
65 | |||
66 | /** | ||
67 | * @since 2.0 | ||
68 | */ | ||
69 | public abstract void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind); | ||
70 | |||
71 | public abstract Map<MessageSelector, Collection<Mailbox>> getMailboxes(); | ||
72 | |||
73 | public abstract boolean isRecursive(); | ||
74 | |||
75 | @Override | ||
76 | public int hashCode() { | ||
77 | return this.identifier; | ||
78 | } | ||
79 | |||
80 | @Override | ||
81 | public String toString() { | ||
82 | return this.getClass().getSimpleName() + " " + this.identifier + " - representative: " + this.representative | ||
83 | + " - isEmpty: " + isEmpty(); | ||
84 | } | ||
85 | |||
86 | @Override | ||
87 | public boolean equals(final Object obj) { | ||
88 | if (obj == null || this.getClass() != obj.getClass()) { | ||
89 | return false; | ||
90 | } else if (this == obj) { | ||
91 | return true; | ||
92 | } else { | ||
93 | final CommunicationGroup that = (CommunicationGroup) obj; | ||
94 | return this.identifier == that.identifier; | ||
95 | } | ||
96 | } | ||
97 | |||
98 | @Override | ||
99 | public int compareTo(final CommunicationGroup that) { | ||
100 | return this.identifier - that.identifier; | ||
101 | } | ||
102 | |||
103 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java new file mode 100644 index 00000000..d244e644 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java | |||
@@ -0,0 +1,467 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.HashMap; | ||
12 | import java.util.HashSet; | ||
13 | import java.util.List; | ||
14 | import java.util.Map; | ||
15 | import java.util.PriorityQueue; | ||
16 | import java.util.Queue; | ||
17 | import java.util.Set; | ||
18 | |||
19 | import tools.refinery.viatra.runtime.rete.itc.alg.incscc.IncSCCAlg; | ||
20 | import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting; | ||
21 | import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph; | ||
22 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
23 | import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode; | ||
24 | import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode; | ||
25 | import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode; | ||
26 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
27 | import tools.refinery.viatra.runtime.rete.index.ExistenceNode; | ||
28 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
29 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
30 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
31 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer; | ||
32 | import tools.refinery.viatra.runtime.rete.network.IGroupable; | ||
33 | import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; | ||
34 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
35 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
36 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
37 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
38 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyIndexerListenerProxy; | ||
39 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy; | ||
40 | import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox; | ||
41 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
42 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
43 | import tools.refinery.viatra.runtime.rete.single.TransitiveClosureNode; | ||
44 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
45 | |||
46 | /** | ||
47 | * An instance of this class is associated with every {@link ReteContainer}. The tracker serves two purposes: <br> | ||
48 | * (1) It allows RETE nodes to register their communication dependencies on-the-fly. These dependencies can be | ||
49 | * registered or unregistered when nodes are disposed of. <br> | ||
50 | * (2) It allows RETE nodes to register their mailboxes as dirty, that is, they can tell the tracker that they have | ||
51 | * something to send to other nodes in the network. The tracker is then responsible for ordering these messages (more | ||
52 | * precisely, the mailboxes that contain the messages) for the associated {@link ReteContainer}. The ordering is | ||
53 | * governed by the strongly connected components in the dependency network and follows a topological sorting scheme; | ||
54 | * those mailboxes will be emptied first whose owner nodes do not depend on other undelivered messages. | ||
55 | * | ||
56 | * @author Tamas Szabo | ||
57 | * @since 1.6 | ||
58 | * | ||
59 | */ | ||
60 | public abstract class CommunicationTracker { | ||
61 | |||
62 | /** | ||
63 | * The minimum group id assigned so far | ||
64 | */ | ||
65 | protected int minGroupId; | ||
66 | |||
67 | /** | ||
68 | * The maximum group id assigned so far | ||
69 | */ | ||
70 | protected int maxGroupId; | ||
71 | |||
72 | /** | ||
73 | * The dependency graph of the communications in the RETE network | ||
74 | */ | ||
75 | protected final Graph<Node> dependencyGraph; | ||
76 | |||
77 | /** | ||
78 | * Incremental SCC information about the dependency graph | ||
79 | */ | ||
80 | protected final IncSCCAlg<Node> sccInformationProvider; | ||
81 | |||
82 | /** | ||
83 | * Precomputed node -> communication group map | ||
84 | */ | ||
85 | protected final Map<Node, CommunicationGroup> groupMap; | ||
86 | |||
87 | /** | ||
88 | * Priority queue of active communication groups | ||
89 | */ | ||
90 | protected final Queue<CommunicationGroup> groupQueue; | ||
91 | |||
92 | // groups should have a simple integer flag which represents its position in a priority queue | ||
93 | // priority queue only contains the ACTIVE groups | ||
94 | |||
95 | public CommunicationTracker() { | ||
96 | this.dependencyGraph = new Graph<Node>(); | ||
97 | this.sccInformationProvider = new IncSCCAlg<Node>(this.dependencyGraph); | ||
98 | this.groupQueue = new PriorityQueue<CommunicationGroup>(); | ||
99 | this.groupMap = new HashMap<Node, CommunicationGroup>(); | ||
100 | } | ||
101 | |||
102 | public Graph<Node> getDependencyGraph() { | ||
103 | return dependencyGraph; | ||
104 | } | ||
105 | |||
106 | public CommunicationGroup getGroup(final Node node) { | ||
107 | return this.groupMap.get(node); | ||
108 | } | ||
109 | |||
110 | private void precomputeGroups() { | ||
111 | groupMap.clear(); | ||
112 | |||
113 | // reconstruct group map from dependency graph | ||
114 | final Graph<Node> reducedGraph = sccInformationProvider.getReducedGraph(); | ||
115 | final List<Node> representatives = TopologicalSorting.compute(reducedGraph); | ||
116 | |||
117 | for (int i = 0; i < representatives.size(); i++) { // groups for SCC representatives | ||
118 | final Node representative = representatives.get(i); | ||
119 | createAndStoreGroup(representative, i); | ||
120 | } | ||
121 | |||
122 | minGroupId = 0; | ||
123 | maxGroupId = representatives.size() - 1; | ||
124 | |||
125 | for (final Node node : dependencyGraph.getAllNodes()) { // extend group map to the rest of nodes | ||
126 | final Node representative = sccInformationProvider.getRepresentative(node); | ||
127 | final CommunicationGroup group = groupMap.get(representative); | ||
128 | if (representative != node) { | ||
129 | addToGroup(node, group); | ||
130 | } | ||
131 | } | ||
132 | |||
133 | for (final Node node : dependencyGraph.getAllNodes()) { | ||
134 | // set fall-through flags of default mailboxes | ||
135 | precomputeFallThroughFlag(node); | ||
136 | // perform further tracker-specific post-processing | ||
137 | postProcessNode(node); | ||
138 | } | ||
139 | |||
140 | // reconstruct new queue contents based on new group map | ||
141 | if (!groupQueue.isEmpty()) { | ||
142 | final Set<CommunicationGroup> oldActiveGroups = new HashSet<CommunicationGroup>(groupQueue); | ||
143 | groupQueue.clear(); | ||
144 | reconstructQueueContents(oldActiveGroups); | ||
145 | } | ||
146 | |||
147 | // post process the groups | ||
148 | for (final CommunicationGroup group : groupMap.values()) { | ||
149 | postProcessGroup(group); | ||
150 | } | ||
151 | } | ||
152 | |||
153 | /** | ||
154 | * This method is responsible for reconstructing the active queue contents after the network structure has changed. | ||
155 | * It it defined as abstract because the reconstruction logic is specific to each {@link CommunicationTracker}. | ||
156 | * @since 2.4 | ||
157 | */ | ||
158 | protected abstract void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups); | ||
159 | |||
160 | private void addToGroup(final Node node, final CommunicationGroup group) { | ||
161 | groupMap.put(node, group); | ||
162 | if (node instanceof Receiver) { | ||
163 | ((Receiver) node).getMailbox().setCurrentGroup(group); | ||
164 | if (node instanceof IGroupable) { | ||
165 | ((IGroupable) node).setCurrentGroup(group); | ||
166 | } | ||
167 | } | ||
168 | } | ||
169 | |||
170 | /** | ||
171 | * Depends on the groups, as well as the parent nodes of the argument, so recomputation is needed if these change | ||
172 | */ | ||
173 | private void precomputeFallThroughFlag(final Node node) { | ||
174 | CommunicationGroup group = groupMap.get(node); | ||
175 | if (node instanceof Receiver) { | ||
176 | IGroupable mailbox = ((Receiver) node).getMailbox(); | ||
177 | if (mailbox instanceof FallThroughCapableMailbox) { | ||
178 | Set<Node> directParents = dependencyGraph.getSourceNodes(node).distinctValues(); | ||
179 | // decide between using quick&cheap fall-through, or allowing for update cancellation | ||
180 | boolean fallThrough = | ||
181 | // disallow fallthrough: updates at production nodes should cancel, if they can be trimmed or | ||
182 | // disjunctive | ||
183 | (!(node instanceof ProductionNode && ( // it is a production node... | ||
184 | // with more than one parent | ||
185 | directParents.size() > 0 || | ||
186 | // or true trimming in its sole parent | ||
187 | directParents.size() == 1 && trueTrimming(directParents.iterator().next())))) && | ||
188 | // disallow fallthrough: external updates should be stored (if updates are delayed) | ||
189 | (!(node instanceof ExternalInputEnumeratorNode)) && | ||
190 | // disallow fallthrough: RelationEvaluatorNode needs to be notified in batch-style, and the batching is done by the mailbox | ||
191 | // however, it is not the RelationEvaluatorNode itself that is interesting here, as that indirectly uses the BatchingReceiver | ||
192 | // so we need to disable fall-through for the BatchingReceiver | ||
193 | (!(node instanceof RelationEvaluatorNode.BatchingReceiver)); | ||
194 | // do additional checks | ||
195 | if (fallThrough) { | ||
196 | // recursive parent groups generate excess updates that should be cancelled after delete&rederive | ||
197 | // phases | ||
198 | // aggregator and transitive closure parent nodes also generate excess updates that should be | ||
199 | // cancelled | ||
200 | directParentLoop: for (Node directParent : directParents) { | ||
201 | Set<Node> parentsToCheck = new HashSet<>(); | ||
202 | // check the case where a direct parent is the reason for mailbox usage | ||
203 | parentsToCheck.add(directParent); | ||
204 | // check the case where an indirect parent (join slot) is the reason for mailbox usage | ||
205 | if (directParent instanceof DualInputNode) { | ||
206 | // in case of existence join (typically antijoin), a mailbox should allow | ||
207 | // an insertion and deletion (at the secondary slot) to cancel each other out | ||
208 | if (directParent instanceof ExistenceNode) { | ||
209 | fallThrough = false; | ||
210 | break directParentLoop; | ||
211 | } | ||
212 | // in beta nodes, indexer slots (or their active nodes) are considered indirect parents | ||
213 | DualInputNode dualInput = (DualInputNode) directParent; | ||
214 | IterableIndexer primarySlot = dualInput.getPrimarySlot(); | ||
215 | if (primarySlot != null) | ||
216 | parentsToCheck.add(primarySlot.getActiveNode()); | ||
217 | Indexer secondarySlot = dualInput.getSecondarySlot(); | ||
218 | if (secondarySlot != null) | ||
219 | parentsToCheck.add(secondarySlot.getActiveNode()); | ||
220 | } | ||
221 | for (Node parent : parentsToCheck) { | ||
222 | CommunicationGroup parentGroup = groupMap.get(parent); | ||
223 | if ( // parent is in a different, recursive group | ||
224 | (group != parentGroup && parentGroup.isRecursive()) || | ||
225 | // node and parent within the same recursive group, and... | ||
226 | (group == parentGroup && group.isRecursive() && ( | ||
227 | // parent is a transitive closure or aggregator node, or a trimmer | ||
228 | // allow trimmed or disjunctive tuple updates to cancel each other | ||
229 | (parent instanceof TransitiveClosureNode) || (parent instanceof IAggregatorNode) | ||
230 | || trueTrimming(parent)))) { | ||
231 | fallThrough = false; | ||
232 | break directParentLoop; | ||
233 | } | ||
234 | } | ||
235 | } | ||
236 | } | ||
237 | // overwrite fallthrough flag with newly computed value | ||
238 | ((FallThroughCapableMailbox) mailbox).setFallThrough(fallThrough); | ||
239 | } | ||
240 | } | ||
241 | } | ||
242 | |||
243 | /** | ||
244 | * A trimmer node that actually eliminates some columns (not just reorders) | ||
245 | */ | ||
246 | private boolean trueTrimming(Node node) { | ||
247 | if (node instanceof TrimmerNode) { | ||
248 | TupleMask mask = ((TrimmerNode) node).getMask(); | ||
249 | return (mask.indices.length != mask.sourceWidth); | ||
250 | } | ||
251 | return false; | ||
252 | } | ||
253 | |||
254 | public void activateUnenqueued(final CommunicationGroup group) { | ||
255 | groupQueue.add(group); | ||
256 | group.isEnqueued = true; | ||
257 | } | ||
258 | |||
259 | public void deactivate(final CommunicationGroup group) { | ||
260 | groupQueue.remove(group); | ||
261 | group.isEnqueued = false; | ||
262 | } | ||
263 | |||
264 | public CommunicationGroup getAndRemoveFirstGroup() { | ||
265 | final CommunicationGroup group = groupQueue.poll(); | ||
266 | group.isEnqueued = false; | ||
267 | return group; | ||
268 | } | ||
269 | |||
270 | public boolean isEmpty() { | ||
271 | return groupQueue.isEmpty(); | ||
272 | } | ||
273 | |||
274 | protected abstract CommunicationGroup createGroup(final Node representative, final int index); | ||
275 | |||
276 | protected CommunicationGroup createAndStoreGroup(final Node representative, final int index) { | ||
277 | final CommunicationGroup group = createGroup(representative, index); | ||
278 | addToGroup(representative, group); | ||
279 | return group; | ||
280 | } | ||
281 | |||
282 | /** | ||
283 | * Registers the dependency that the target {@link Node} depends on the source {@link Node}. In other words, source | ||
284 | * may send messages to target in the RETE network. If the dependency edge is already present, this method call is a | ||
285 | * noop. | ||
286 | * | ||
287 | * @param source | ||
288 | * the source node | ||
289 | * @param target | ||
290 | * the target node | ||
291 | */ | ||
292 | public void registerDependency(final Node source, final Node target) { | ||
293 | // nodes can be immediately inserted, if they already exist in the graph, this is a noop | ||
294 | dependencyGraph.insertNode(source); | ||
295 | dependencyGraph.insertNode(target); | ||
296 | |||
297 | if (!this.dependencyGraph.getTargetNodes(source).containsNonZero(target)) { | ||
298 | |||
299 | // query all these information before the actual edge insertion | ||
300 | // because SCCs may be unified during the process | ||
301 | final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); | ||
302 | final Node targetRepresentative = sccInformationProvider.getRepresentative(target); | ||
303 | final boolean targetHadOutgoingEdges = sccInformationProvider.hasOutgoingEdges(targetRepresentative); | ||
304 | |||
305 | // insert the edge | ||
306 | dependencyGraph.insertEdge(source, target); | ||
307 | |||
308 | // create groups if they do not yet exist | ||
309 | CommunicationGroup sourceGroup = groupMap.get(sourceRepresentative); | ||
310 | if (sourceGroup == null) { | ||
311 | // create on-demand with the next smaller group id | ||
312 | sourceGroup = createAndStoreGroup(sourceRepresentative, --minGroupId); | ||
313 | } | ||
314 | final int sourceIndex = sourceGroup.identifier; | ||
315 | |||
316 | CommunicationGroup targetGroup = groupMap.get(targetRepresentative); | ||
317 | if (targetGroup == null) { | ||
318 | // create on-demand with the next larger group id | ||
319 | targetGroup = createAndStoreGroup(targetRepresentative, ++maxGroupId); | ||
320 | } | ||
321 | final int targetIndex = targetGroup.identifier; | ||
322 | |||
323 | if (sourceIndex <= targetIndex) { | ||
324 | // indices obey current topological ordering | ||
325 | refreshFallThroughFlag(target); | ||
326 | postProcessNode(source); | ||
327 | postProcessNode(target); | ||
328 | postProcessGroup(sourceGroup); | ||
329 | if (sourceGroup != targetGroup) { | ||
330 | postProcessGroup(targetGroup); | ||
331 | } | ||
332 | } else if (sourceIndex > targetIndex && !targetHadOutgoingEdges) { | ||
333 | // indices violate current topological ordering, but we can simply bump the target index | ||
334 | final boolean wasEnqueued = targetGroup.isEnqueued; | ||
335 | if (wasEnqueued) { | ||
336 | groupQueue.remove(targetGroup); | ||
337 | } | ||
338 | targetGroup.identifier = ++maxGroupId; | ||
339 | if (wasEnqueued) { | ||
340 | groupQueue.add(targetGroup); | ||
341 | } | ||
342 | |||
343 | refreshFallThroughFlag(target); | ||
344 | postProcessNode(source); | ||
345 | postProcessNode(target); | ||
346 | postProcessGroup(sourceGroup); | ||
347 | postProcessGroup(targetGroup); | ||
348 | } else { | ||
349 | // needs a full re-computation because of more complex change | ||
350 | precomputeGroups(); | ||
351 | } | ||
352 | } | ||
353 | } | ||
354 | |||
355 | /** | ||
356 | * Returns true if the given {@link Node} is in a recursive {@link CommunicationGroup}, false otherwise. | ||
357 | */ | ||
358 | public boolean isInRecursiveGroup(final Node node) { | ||
359 | final CommunicationGroup group = this.getGroup(node); | ||
360 | if (group == null) { | ||
361 | return false; | ||
362 | } else { | ||
363 | return group.isRecursive(); | ||
364 | } | ||
365 | } | ||
366 | |||
367 | /** | ||
368 | * Returns true if the given two {@link Node}s are in the same {@link CommunicationGroup}. | ||
369 | */ | ||
370 | public boolean areInSameGroup(final Node left, final Node right) { | ||
371 | final CommunicationGroup leftGroup = this.getGroup(left); | ||
372 | final CommunicationGroup rightGroup = this.getGroup(right); | ||
373 | return leftGroup != null && leftGroup == rightGroup; | ||
374 | } | ||
375 | |||
376 | /** | ||
377 | * Unregisters a dependency between source and target. | ||
378 | * | ||
379 | * @param source | ||
380 | * the source node | ||
381 | * @param target | ||
382 | * the target node | ||
383 | */ | ||
384 | public void unregisterDependency(final Node source, final Node target) { | ||
385 | // delete the edge first, and then query the SCC info provider | ||
386 | this.dependencyGraph.deleteEdgeIfExists(source, target); | ||
387 | |||
388 | final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); | ||
389 | final Node targetRepresentative = sccInformationProvider.getRepresentative(target); | ||
390 | |||
391 | // if they are still in the same SCC, | ||
392 | // then this deletion did not affect the SCCs, | ||
393 | // and it is sufficient to recompute affected fall-through flags; | ||
394 | // otherwise, we need a new pre-computation for the groupMap and groupQueue | ||
395 | if (sourceRepresentative.equals(targetRepresentative)) { | ||
396 | // this deletion could not have affected the split flags | ||
397 | refreshFallThroughFlag(target); | ||
398 | postProcessNode(source); | ||
399 | postProcessNode(target); | ||
400 | } else { | ||
401 | // preComputeGroups takes care of the split flag maintenance | ||
402 | precomputeGroups(); | ||
403 | } | ||
404 | } | ||
405 | |||
406 | /** | ||
407 | * Refresh fall-through flags if dependencies change for given target, but no SCC change | ||
408 | */ | ||
409 | private void refreshFallThroughFlag(final Node target) { | ||
410 | precomputeFallThroughFlag(target); | ||
411 | if (target instanceof DualInputNode) { | ||
412 | for (final Node indirectTarget : dependencyGraph.getTargetNodes(target).distinctValues()) { | ||
413 | precomputeFallThroughFlag(indirectTarget); | ||
414 | } | ||
415 | } | ||
416 | } | ||
417 | |||
418 | /** | ||
419 | * Returns true if the given source-target edge in the communication network acts as a recursion cut point. | ||
420 | * The current implementation considers edges leading into {@link ProductionNode}s as cut point iff | ||
421 | * both source and target belong to the same group. | ||
422 | * | ||
423 | * @param source the source node | ||
424 | * @param target the target node | ||
425 | * @return true if the edge is a cut point, false otherwise | ||
426 | * @since 2.4 | ||
427 | */ | ||
428 | protected boolean isRecursionCutPoint(final Node source, final Node target) { | ||
429 | final Node effectiveSource = source instanceof SpecializedProjectionIndexer | ||
430 | ? ((SpecializedProjectionIndexer) source).getActiveNode() | ||
431 | : source; | ||
432 | final CommunicationGroup sourceGroup = this.getGroup(effectiveSource); | ||
433 | final CommunicationGroup targetGroup = this.getGroup(target); | ||
434 | return sourceGroup != null && sourceGroup == targetGroup && target instanceof ProductionNode; | ||
435 | } | ||
436 | |||
437 | /** | ||
438 | * This hook allows concrete tracker implementations to perform tracker-specific post processing on nodes (cf. | ||
439 | * {@link NetworkStructureChangeSensitiveNode} and {@link BehaviorChangingMailbox}). At the time of the invocation, | ||
440 | * the network topology has already been updated. | ||
441 | */ | ||
442 | protected abstract void postProcessNode(final Node node); | ||
443 | |||
444 | /** | ||
445 | * This hook allows concrete tracker implementations to perform tracker-specific post processing on groups. At the | ||
446 | * time of the invocation, the network topology has already been updated. | ||
447 | * @since 2.4 | ||
448 | */ | ||
449 | protected abstract void postProcessGroup(final CommunicationGroup group); | ||
450 | |||
451 | /** | ||
452 | * Creates a proxy for the given {@link Mailbox} for the given requester {@link Node}. The proxy creation is | ||
453 | * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used | ||
454 | * to create {@link TimelyMailboxProxy}s depending on the network topology. There is no guarantee that the same | ||
455 | * proxy instance is returned when this method is called multiple times with the same arguments. | ||
456 | */ | ||
457 | public abstract Mailbox proxifyMailbox(final Node requester, final Mailbox original); | ||
458 | |||
459 | /** | ||
460 | * Creates a proxy for the given {@link IndexerListener} for the given requester {@link Node}. The proxy creation is | ||
461 | * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used | ||
462 | * to create {@link TimelyIndexerListenerProxy}s depending on the network topology. There is no guarantee that the | ||
463 | * same proxy instance is returned when this method is called multiple times with the same arguments. | ||
464 | */ | ||
465 | public abstract IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original); | ||
466 | |||
467 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java new file mode 100644 index 00000000..e1a61693 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/MessageSelector.java | |||
@@ -0,0 +1,19 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | /** | ||
12 | * Subclasses of this interface represent meta data of update messages in Rete. | ||
13 | * | ||
14 | * @author Tamas Szabo | ||
15 | * @since 2.3 | ||
16 | */ | ||
17 | public interface MessageSelector { | ||
18 | |||
19 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java new file mode 100644 index 00000000..27779352 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/NodeComparator.java | |||
@@ -0,0 +1,32 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.Comparator; | ||
12 | import java.util.Map; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
15 | |||
16 | /** | ||
17 | * @since 2.4 | ||
18 | */ | ||
19 | public class NodeComparator implements Comparator<Node> { | ||
20 | |||
21 | protected final Map<Node, Integer> nodeMap; | ||
22 | |||
23 | public NodeComparator(final Map<Node, Integer> nodeMap) { | ||
24 | this.nodeMap = nodeMap; | ||
25 | } | ||
26 | |||
27 | @Override | ||
28 | public int compare(final Node left, final Node right) { | ||
29 | return this.nodeMap.get(left) - this.nodeMap.get(right); | ||
30 | } | ||
31 | |||
32 | } \ No newline at end of file | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java new file mode 100644 index 00000000..41cd8cd3 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/PhasedSelector.java | |||
@@ -0,0 +1,34 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | /** | ||
12 | * A default message selector that can be used to associate phases to messages. | ||
13 | * | ||
14 | * @author Tamas Szabo | ||
15 | * @since 2.3 | ||
16 | */ | ||
17 | public enum PhasedSelector implements MessageSelector { | ||
18 | |||
19 | /** | ||
20 | * No special distinguishing feature | ||
21 | */ | ||
22 | DEFAULT, | ||
23 | |||
24 | /** | ||
25 | * Inserts and delete-insert monotone change pairs | ||
26 | */ | ||
27 | MONOTONE, | ||
28 | |||
29 | /** | ||
30 | * Deletes | ||
31 | */ | ||
32 | ANTI_MONOTONE | ||
33 | |||
34 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java new file mode 100644 index 00000000..a50a63a8 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/Timestamp.java | |||
@@ -0,0 +1,124 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2018, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication; | ||
10 | |||
11 | import java.util.AbstractMap; | ||
12 | import java.util.Collection; | ||
13 | import java.util.Map; | ||
14 | import java.util.Set; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines; | ||
18 | |||
19 | /** | ||
20 | * A timestamp associated with update messages in timely evaluation. | ||
21 | * | ||
22 | * @author Tamas Szabo | ||
23 | * @since 2.3 | ||
24 | */ | ||
25 | public class Timestamp implements Comparable<Timestamp>, MessageSelector { | ||
26 | |||
27 | protected final int value; | ||
28 | public static final Timestamp ZERO = new Timestamp(0); | ||
29 | /** | ||
30 | * @since 2.4 | ||
31 | */ | ||
32 | public static final Timeline<Timestamp> INSERT_AT_ZERO_TIMELINE = Timelines.createFrom(Timestamp.ZERO); | ||
33 | |||
34 | public Timestamp(final int value) { | ||
35 | this.value = value; | ||
36 | } | ||
37 | |||
38 | public int getValue() { | ||
39 | return value; | ||
40 | } | ||
41 | |||
42 | public Timestamp max(final Timestamp that) { | ||
43 | if (this.value >= that.value) { | ||
44 | return this; | ||
45 | } else { | ||
46 | return that; | ||
47 | } | ||
48 | } | ||
49 | |||
50 | /** | ||
51 | * @since 2.4 | ||
52 | */ | ||
53 | public Timestamp min(final Timestamp that) { | ||
54 | if (this.value <= that.value) { | ||
55 | return this; | ||
56 | } else { | ||
57 | return that; | ||
58 | } | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public int compareTo(final Timestamp that) { | ||
63 | return this.value - that.value; | ||
64 | } | ||
65 | |||
66 | @Override | ||
67 | public boolean equals(final Object obj) { | ||
68 | if (obj == null || !(obj instanceof Timestamp)) { | ||
69 | return false; | ||
70 | } else { | ||
71 | return this.value == ((Timestamp) obj).value; | ||
72 | } | ||
73 | } | ||
74 | |||
75 | @Override | ||
76 | public int hashCode() { | ||
77 | return this.value; | ||
78 | } | ||
79 | |||
80 | @Override | ||
81 | public String toString() { | ||
82 | return Integer.toString(this.value); | ||
83 | } | ||
84 | |||
85 | /** | ||
86 | * A {@link Map} implementation that associates the zero timestamp with every key. There is no suppor for | ||
87 | * {@link Map#entrySet()} due to performance reasons. | ||
88 | * | ||
89 | * @author Tamas Szabo | ||
90 | */ | ||
91 | public static final class AllZeroMap<T> extends AbstractMap<T, Timeline<Timestamp>> { | ||
92 | |||
93 | private final Collection<T> wrapped; | ||
94 | |||
95 | public AllZeroMap(Set<T> wrapped) { | ||
96 | this.wrapped = wrapped; | ||
97 | } | ||
98 | |||
99 | @Override | ||
100 | public Set<Entry<T, Timeline<Timestamp>>> entrySet() { | ||
101 | throw new UnsupportedOperationException("Use the combination of keySet() and get()!"); | ||
102 | } | ||
103 | |||
104 | /** | ||
105 | * @since 2.4 | ||
106 | */ | ||
107 | @Override | ||
108 | public Timeline<Timestamp> get(final Object key) { | ||
109 | return INSERT_AT_ZERO_TIMELINE; | ||
110 | } | ||
111 | |||
112 | @Override | ||
113 | public Set<T> keySet() { | ||
114 | return (Set<T>) this.wrapped; | ||
115 | } | ||
116 | |||
117 | @Override | ||
118 | public String toString() { | ||
119 | return this.getClass().getSimpleName() + ": " + this.keySet().toString(); | ||
120 | } | ||
121 | |||
122 | } | ||
123 | |||
124 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java new file mode 100644 index 00000000..d8260384 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/RecursiveCommunicationGroup.java | |||
@@ -0,0 +1,164 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.EnumMap; | ||
14 | import java.util.LinkedHashSet; | ||
15 | import java.util.Map; | ||
16 | import java.util.Set; | ||
17 | |||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
20 | import tools.refinery.viatra.runtime.rete.network.RederivableNode; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
25 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
26 | |||
27 | /** | ||
28 | * A communication group representing either a single node where the | ||
29 | * node is a monotonicity aware one or a set of nodes that form an SCC. | ||
30 | * | ||
31 | * @author Tamas Szabo | ||
32 | * @since 2.4 | ||
33 | */ | ||
34 | public class RecursiveCommunicationGroup extends CommunicationGroup { | ||
35 | |||
36 | private final Set<Mailbox> antiMonotoneMailboxes; | ||
37 | private final Set<Mailbox> monotoneMailboxes; | ||
38 | private final Set<Mailbox> defaultMailboxes; | ||
39 | private final Set<RederivableNode> rederivables; | ||
40 | private boolean currentlyDelivering; | ||
41 | |||
42 | /** | ||
43 | * @since 1.7 | ||
44 | */ | ||
45 | public RecursiveCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
46 | super(tracker, representative, identifier); | ||
47 | this.antiMonotoneMailboxes = CollectionsFactory.createSet(); | ||
48 | this.monotoneMailboxes = CollectionsFactory.createSet(); | ||
49 | this.defaultMailboxes = CollectionsFactory.createSet(); | ||
50 | this.rederivables = new LinkedHashSet<RederivableNode>(); | ||
51 | this.currentlyDelivering = false; | ||
52 | } | ||
53 | |||
54 | @Override | ||
55 | public void deliverMessages() { | ||
56 | this.currentlyDelivering = true; | ||
57 | |||
58 | // ANTI-MONOTONE PHASE | ||
59 | while (!this.antiMonotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) { | ||
60 | while (!this.antiMonotoneMailboxes.isEmpty()) { | ||
61 | final Mailbox mailbox = this.antiMonotoneMailboxes.iterator().next(); | ||
62 | this.antiMonotoneMailboxes.remove(mailbox); | ||
63 | mailbox.deliverAll(PhasedSelector.ANTI_MONOTONE); | ||
64 | } | ||
65 | while (!this.defaultMailboxes.isEmpty()) { | ||
66 | final Mailbox mailbox = this.defaultMailboxes.iterator().next(); | ||
67 | this.defaultMailboxes.remove(mailbox); | ||
68 | mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
69 | } | ||
70 | } | ||
71 | |||
72 | // REDERIVE PHASE | ||
73 | while (!this.rederivables.isEmpty()) { | ||
74 | // re-derivable nodes take care of their unregistration!! | ||
75 | final RederivableNode node = this.rederivables.iterator().next(); | ||
76 | node.rederiveOne(); | ||
77 | } | ||
78 | |||
79 | // MONOTONE PHASE | ||
80 | while (!this.monotoneMailboxes.isEmpty() || !this.defaultMailboxes.isEmpty()) { | ||
81 | while (!this.monotoneMailboxes.isEmpty()) { | ||
82 | final Mailbox mailbox = this.monotoneMailboxes.iterator().next(); | ||
83 | this.monotoneMailboxes.remove(mailbox); | ||
84 | mailbox.deliverAll(PhasedSelector.MONOTONE); | ||
85 | } | ||
86 | while (!this.defaultMailboxes.isEmpty()) { | ||
87 | final Mailbox mailbox = this.defaultMailboxes.iterator().next(); | ||
88 | this.defaultMailboxes.remove(mailbox); | ||
89 | mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
90 | } | ||
91 | } | ||
92 | |||
93 | this.currentlyDelivering = false; | ||
94 | } | ||
95 | |||
96 | @Override | ||
97 | public boolean isEmpty() { | ||
98 | return this.rederivables.isEmpty() && this.antiMonotoneMailboxes.isEmpty() | ||
99 | && this.monotoneMailboxes.isEmpty() && this.defaultMailboxes.isEmpty(); | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) { | ||
104 | final Collection<Mailbox> mailboxes = getMailboxContainer(kind); | ||
105 | mailboxes.add(mailbox); | ||
106 | if (!this.isEnqueued && !this.currentlyDelivering) { | ||
107 | this.tracker.activateUnenqueued(this); | ||
108 | } | ||
109 | } | ||
110 | |||
111 | @Override | ||
112 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
113 | final Collection<Mailbox> mailboxes = getMailboxContainer(kind); | ||
114 | mailboxes.remove(mailbox); | ||
115 | if (isEmpty()) { | ||
116 | this.tracker.deactivate(this); | ||
117 | } | ||
118 | } | ||
119 | |||
120 | private Collection<Mailbox> getMailboxContainer(final MessageSelector kind) { | ||
121 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
122 | return this.antiMonotoneMailboxes; | ||
123 | } else if (kind == PhasedSelector.MONOTONE) { | ||
124 | return this.monotoneMailboxes; | ||
125 | } else if (kind == PhasedSelector.DEFAULT) { | ||
126 | return this.defaultMailboxes; | ||
127 | } else { | ||
128 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
129 | } | ||
130 | } | ||
131 | |||
132 | public void addRederivable(final RederivableNode node) { | ||
133 | this.rederivables.add(node); | ||
134 | if (!this.isEnqueued) { | ||
135 | this.tracker.activateUnenqueued(this); | ||
136 | } | ||
137 | } | ||
138 | |||
139 | public void removeRederivable(final RederivableNode node) { | ||
140 | this.rederivables.remove(node); | ||
141 | if (isEmpty()) { | ||
142 | this.tracker.deactivate(this); | ||
143 | } | ||
144 | } | ||
145 | |||
146 | public Collection<RederivableNode> getRederivables() { | ||
147 | return this.rederivables; | ||
148 | } | ||
149 | |||
150 | @Override | ||
151 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
152 | Map<PhasedSelector, Collection<Mailbox>> map = new EnumMap<>(PhasedSelector.class); | ||
153 | map.put(PhasedSelector.ANTI_MONOTONE, antiMonotoneMailboxes); | ||
154 | map.put(PhasedSelector.MONOTONE, monotoneMailboxes); | ||
155 | map.put(PhasedSelector.DEFAULT, defaultMailboxes); | ||
156 | return Collections.unmodifiableMap(map); | ||
157 | } | ||
158 | |||
159 | @Override | ||
160 | public boolean isRecursive() { | ||
161 | return true; | ||
162 | } | ||
163 | |||
164 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java new file mode 100644 index 00000000..c51c7dbf --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/SingletonCommunicationGroup.java | |||
@@ -0,0 +1,86 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Map; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
20 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
21 | |||
22 | /** | ||
23 | * A communication group containing only a single node with a single default | ||
24 | * mailbox. | ||
25 | * | ||
26 | * @author Tamas Szabo | ||
27 | * @since 1.6 | ||
28 | */ | ||
29 | public class SingletonCommunicationGroup extends CommunicationGroup { | ||
30 | |||
31 | private Mailbox mailbox; | ||
32 | |||
33 | /** | ||
34 | * @since 1.7 | ||
35 | */ | ||
36 | public SingletonCommunicationGroup(final CommunicationTracker tracker, final Node representative, final int identifier) { | ||
37 | super(tracker, representative, identifier); | ||
38 | } | ||
39 | |||
40 | @Override | ||
41 | public void deliverMessages() { | ||
42 | this.mailbox.deliverAll(PhasedSelector.DEFAULT); | ||
43 | } | ||
44 | |||
45 | @Override | ||
46 | public boolean isEmpty() { | ||
47 | return this.mailbox == null; | ||
48 | } | ||
49 | |||
50 | @Override | ||
51 | public void notifyHasMessage(final Mailbox mailbox, final MessageSelector kind) { | ||
52 | if (kind == PhasedSelector.DEFAULT) { | ||
53 | this.mailbox = mailbox; | ||
54 | if (!this.isEnqueued) { | ||
55 | this.tracker.activateUnenqueued(this); | ||
56 | } | ||
57 | } else { | ||
58 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
59 | } | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
64 | if (kind == PhasedSelector.DEFAULT) { | ||
65 | this.mailbox = null; | ||
66 | this.tracker.deactivate(this); | ||
67 | } else { | ||
68 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
69 | } | ||
70 | } | ||
71 | |||
72 | @Override | ||
73 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
74 | if (mailbox != null) { | ||
75 | return Collections.singletonMap(PhasedSelector.DEFAULT, Collections.singleton(mailbox)); | ||
76 | } else { | ||
77 | return Collections.emptyMap(); | ||
78 | } | ||
79 | } | ||
80 | |||
81 | @Override | ||
82 | public boolean isRecursive() { | ||
83 | return false; | ||
84 | } | ||
85 | |||
86 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java new file mode 100644 index 00000000..1c18c1cd --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timeless/TimelessCommunicationTracker.java | |||
@@ -0,0 +1,149 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timeless; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.HashSet; | ||
13 | import java.util.Set; | ||
14 | import java.util.Map.Entry; | ||
15 | |||
16 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
17 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
18 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
19 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
20 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
21 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
22 | import tools.refinery.viatra.runtime.rete.network.RederivableNode; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
26 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
28 | |||
29 | /** | ||
30 | * Timeless implementation of the communication tracker. | ||
31 | * | ||
32 | * @author Tamas Szabo | ||
33 | * @since 2.2 | ||
34 | */ | ||
35 | public class TimelessCommunicationTracker extends CommunicationTracker { | ||
36 | |||
37 | @Override | ||
38 | protected CommunicationGroup createGroup(Node representative, int index) { | ||
39 | final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; | ||
40 | final boolean isReceiver = representative instanceof Receiver; | ||
41 | final boolean isPosetIndifferent = isReceiver | ||
42 | && ((Receiver) representative).getMailbox() instanceof BehaviorChangingMailbox; | ||
43 | final boolean isSingletonInDRedMode = isSingleton && (representative instanceof RederivableNode) | ||
44 | && ((RederivableNode) representative).isInDRedMode(); | ||
45 | |||
46 | CommunicationGroup group = null; | ||
47 | // we can only use a singleton group iff | ||
48 | // (1) the SCC has one node AND | ||
49 | // (2) either we have a poset-indifferent mailbox OR the node is not even a receiver AND | ||
50 | // (3) the node does not run in DRed mode in a singleton group | ||
51 | if (isSingleton && (isPosetIndifferent || !isReceiver) && !isSingletonInDRedMode) { | ||
52 | group = new SingletonCommunicationGroup(this, representative, index); | ||
53 | } else { | ||
54 | group = new RecursiveCommunicationGroup(this, representative, index); | ||
55 | } | ||
56 | |||
57 | return group; | ||
58 | } | ||
59 | |||
60 | @Override | ||
61 | protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) { | ||
62 | for (final CommunicationGroup oldGroup : oldActiveGroups) { | ||
63 | for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) { | ||
64 | for (final Mailbox mailbox : entry.getValue()) { | ||
65 | final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver()); | ||
66 | newGroup.notifyHasMessage(mailbox, entry.getKey()); | ||
67 | } | ||
68 | } | ||
69 | |||
70 | if (oldGroup instanceof RecursiveCommunicationGroup) { | ||
71 | for (final RederivableNode node : ((RecursiveCommunicationGroup) oldGroup).getRederivables()) { | ||
72 | final CommunicationGroup newGroup = this.groupMap.get(node); | ||
73 | if (!(newGroup instanceof RecursiveCommunicationGroup)) { | ||
74 | throw new IllegalStateException("The new group must also be recursive! " + newGroup); | ||
75 | } | ||
76 | ((RecursiveCommunicationGroup) newGroup).addRederivable(node); | ||
77 | } | ||
78 | } | ||
79 | } | ||
80 | } | ||
81 | |||
82 | @Override | ||
83 | public Mailbox proxifyMailbox(final Node requester, final Mailbox original) { | ||
84 | return original; | ||
85 | } | ||
86 | |||
87 | @Override | ||
88 | public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) { | ||
89 | return original; | ||
90 | } | ||
91 | |||
92 | @Override | ||
93 | protected void postProcessNode(final Node node) { | ||
94 | if (node instanceof Receiver) { | ||
95 | final Mailbox mailbox = ((Receiver) node).getMailbox(); | ||
96 | if (mailbox instanceof BehaviorChangingMailbox) { | ||
97 | final CommunicationGroup group = this.groupMap.get(node); | ||
98 | final Set<Node> sccNodes = this.sccInformationProvider.sccs.getPartition(node); | ||
99 | // a default mailbox must split its messages iff | ||
100 | // (1) its receiver is in a recursive group and | ||
101 | final boolean c1 = group.isRecursive(); | ||
102 | // (2) its receiver is at the SCC boundary of that group | ||
103 | final boolean c2 = isAtSCCBoundary(node); | ||
104 | // (3) its group consists of more than one node | ||
105 | final boolean c3 = sccNodes.size() > 1; | ||
106 | ((BehaviorChangingMailbox) mailbox).setSplitFlag(c1 && c2 && c3); | ||
107 | } | ||
108 | } | ||
109 | } | ||
110 | |||
111 | @Override | ||
112 | protected void postProcessGroup(final CommunicationGroup group) { | ||
113 | |||
114 | } | ||
115 | |||
116 | /** | ||
117 | * @since 2.0 | ||
118 | */ | ||
119 | private boolean isAtSCCBoundary(final Node node) { | ||
120 | final CommunicationGroup ownGroup = this.groupMap.get(node); | ||
121 | assert ownGroup != null; | ||
122 | for (final Node source : this.dependencyGraph.getSourceNodes(node).distinctValues()) { | ||
123 | final Set<Node> sourcesToCheck = new HashSet<Node>(); | ||
124 | sourcesToCheck.add(source); | ||
125 | // DualInputNodes must be checked additionally because they do not use a mailbox directly. | ||
126 | // It can happen that their indexers actually belong to other SCCs. | ||
127 | if (source instanceof DualInputNode) { | ||
128 | final DualInputNode dualInput = (DualInputNode) source; | ||
129 | final IterableIndexer primarySlot = dualInput.getPrimarySlot(); | ||
130 | if (primarySlot != null) { | ||
131 | sourcesToCheck.add(primarySlot.getActiveNode()); | ||
132 | } | ||
133 | final Indexer secondarySlot = dualInput.getSecondarySlot(); | ||
134 | if (secondarySlot != null) { | ||
135 | sourcesToCheck.add(secondarySlot.getActiveNode()); | ||
136 | } | ||
137 | } | ||
138 | for (final Node current : sourcesToCheck) { | ||
139 | final CommunicationGroup otherGroup = this.groupMap.get(current); | ||
140 | assert otherGroup != null; | ||
141 | if (!ownGroup.equals(otherGroup)) { | ||
142 | return true; | ||
143 | } | ||
144 | } | ||
145 | } | ||
146 | return false; | ||
147 | } | ||
148 | |||
149 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java new file mode 100644 index 00000000..8097bd91 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/ResumableNode.java | |||
@@ -0,0 +1,36 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.IGroupable; | ||
12 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
13 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
14 | |||
15 | /** | ||
16 | * {@link Node}s that implement this interface can resume folding of their states when instructed during timely evaluation. | ||
17 | * | ||
18 | * @since 2.3 | ||
19 | * @author Tamas Szabo | ||
20 | */ | ||
21 | public interface ResumableNode extends Node, IGroupable { | ||
22 | |||
23 | /** | ||
24 | * When called, the folding of the state shall be resumed at the given timestamp. The resumable is expected to | ||
25 | * do a folding step at the given timestamp only. Afterwards, folding shall be interrupted, even if there is more | ||
26 | * folding to do towards higher timestamps. | ||
27 | */ | ||
28 | public void resumeAt(final Timestamp timestamp); | ||
29 | |||
30 | /** | ||
31 | * Returns the smallest timestamp where lazy folding shall be resumed, or null if there is no more folding to do in this | ||
32 | * resumable. | ||
33 | */ | ||
34 | public Timestamp getResumableTimestamp(); | ||
35 | |||
36 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java new file mode 100644 index 00000000..0394d92c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java | |||
@@ -0,0 +1,171 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Comparator; | ||
14 | import java.util.HashMap; | ||
15 | import java.util.Map; | ||
16 | import java.util.Map.Entry; | ||
17 | import java.util.Set; | ||
18 | import java.util.TreeMap; | ||
19 | import java.util.TreeSet; | ||
20 | |||
21 | import org.apache.log4j.Logger; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
23 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
28 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
29 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
30 | |||
31 | /** | ||
32 | * A timely communication group implementation. {@link TimelyMailbox}es and {@link LazyFoldingNode}s are ordered in the | ||
33 | * increasing order of timestamps. | ||
34 | * | ||
35 | * @author Tamas Szabo | ||
36 | * @since 2.3 | ||
37 | */ | ||
38 | public class TimelyCommunicationGroup extends CommunicationGroup { | ||
39 | |||
40 | private final boolean isSingleton; | ||
41 | private final TreeMap<Timestamp, Set<Mailbox>> mailboxQueue; | ||
42 | // may be null - only used in the scattered case where we need to take care of mailboxes and resumables too | ||
43 | private Comparator<Node> nodeComparator; | ||
44 | private boolean currentlyDelivering; | ||
45 | private Timestamp currentlyDeliveredTimestamp; | ||
46 | |||
47 | public TimelyCommunicationGroup(final TimelyCommunicationTracker tracker, final Node representative, | ||
48 | final int identifier, final boolean isSingleton) { | ||
49 | super(tracker, representative, identifier); | ||
50 | this.isSingleton = isSingleton; | ||
51 | this.mailboxQueue = CollectionsFactory.createTreeMap(); | ||
52 | this.currentlyDelivering = false; | ||
53 | } | ||
54 | |||
55 | /** | ||
56 | * Sets the {@link Comparator} to be used to order the {@link Mailbox}es at a given {@link Timestamp} in the mailbox | ||
57 | * queue. Additionally, reorders already queued {@link Mailbox}es to reflect the new comparator. The comparator may | ||
58 | * be null, in this case, no set ordering will be enforced among the {@link Mailbox}es. | ||
59 | */ | ||
60 | public void setComparatorAndReorderMailboxes(final Comparator<Node> nodeComparator) { | ||
61 | this.nodeComparator = nodeComparator; | ||
62 | if (!this.mailboxQueue.isEmpty()) { | ||
63 | final HashMap<Timestamp, Set<Mailbox>> queueCopy = new HashMap<Timestamp, Set<Mailbox>>(this.mailboxQueue); | ||
64 | this.mailboxQueue.clear(); | ||
65 | for (final Entry<Timestamp, Set<Mailbox>> entry : queueCopy.entrySet()) { | ||
66 | for (final Mailbox mailbox : entry.getValue()) { | ||
67 | this.notifyHasMessage(mailbox, entry.getKey()); | ||
68 | } | ||
69 | } | ||
70 | } | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public void deliverMessages() { | ||
75 | this.currentlyDelivering = true; | ||
76 | while (!this.mailboxQueue.isEmpty()) { | ||
77 | // care must be taken here how we iterate over the mailboxes | ||
78 | // it is not okay to loop over the mailboxes at once because a mailbox may disappear from the collection as | ||
79 | // a result of delivering messages from another mailboxes under the same timestamp | ||
80 | // because of this, it is crucial that we pick the mailboxes one by one | ||
81 | final Entry<Timestamp, Set<Mailbox>> entry = this.mailboxQueue.firstEntry(); | ||
82 | final Timestamp timestamp = entry.getKey(); | ||
83 | final Set<Mailbox> mailboxes = entry.getValue(); | ||
84 | final Mailbox mailbox = mailboxes.iterator().next(); | ||
85 | mailboxes.remove(mailbox); | ||
86 | if (mailboxes.isEmpty()) { | ||
87 | this.mailboxQueue.pollFirstEntry(); | ||
88 | } | ||
89 | assert mailbox instanceof TimelyMailbox; | ||
90 | /* debug */ this.currentlyDeliveredTimestamp = timestamp; | ||
91 | mailbox.deliverAll(timestamp); | ||
92 | /* debug */ this.currentlyDeliveredTimestamp = null; | ||
93 | } | ||
94 | this.currentlyDelivering = false; | ||
95 | } | ||
96 | |||
97 | @Override | ||
98 | public boolean isEmpty() { | ||
99 | return this.mailboxQueue.isEmpty(); | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void notifyHasMessage(final Mailbox mailbox, MessageSelector kind) { | ||
104 | if (kind instanceof Timestamp) { | ||
105 | final Timestamp timestamp = (Timestamp) kind; | ||
106 | if (Options.MONITOR_VIOLATION_OF_DIFFERENTIAL_DATAFLOW_TIMESTAMPS) { | ||
107 | if (timestamp.compareTo(this.currentlyDeliveredTimestamp) < 0) { | ||
108 | final Logger logger = this.representative.getContainer().getNetwork().getEngine().getLogger(); | ||
109 | logger.error( | ||
110 | "[INTERNAL ERROR] Violation of differential dataflow communication schema! The communication component with representative " | ||
111 | + this.representative + " observed decreasing timestamp during message delivery!"); | ||
112 | } | ||
113 | } | ||
114 | final Set<Mailbox> mailboxes = this.mailboxQueue.computeIfAbsent(timestamp, k -> { | ||
115 | if (this.nodeComparator == null) { | ||
116 | return CollectionsFactory.createSet(); | ||
117 | } else { | ||
118 | return new TreeSet<Mailbox>(new Comparator<Mailbox>() { | ||
119 | @Override | ||
120 | public int compare(final Mailbox left, final Mailbox right) { | ||
121 | return nodeComparator.compare(left.getReceiver(), right.getReceiver()); | ||
122 | } | ||
123 | }); | ||
124 | } | ||
125 | }); | ||
126 | mailboxes.add(mailbox); | ||
127 | if (!this.isEnqueued && !this.currentlyDelivering) { | ||
128 | this.tracker.activateUnenqueued(this); | ||
129 | } | ||
130 | } else { | ||
131 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
132 | } | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
137 | if (kind instanceof Timestamp) { | ||
138 | final Timestamp timestamp = (Timestamp) kind; | ||
139 | this.mailboxQueue.compute(timestamp, (k, v) -> { | ||
140 | if (v == null) { | ||
141 | throw new IllegalStateException("No mailboxes registered at timestamp " + timestamp + "!"); | ||
142 | } | ||
143 | if (!v.remove(mailbox)) { | ||
144 | throw new IllegalStateException( | ||
145 | "The mailbox " + mailbox + " was not registered at timestamp " + timestamp + "!"); | ||
146 | } | ||
147 | if (v.isEmpty()) { | ||
148 | return null; | ||
149 | } else { | ||
150 | return v; | ||
151 | } | ||
152 | }); | ||
153 | if (this.mailboxQueue.isEmpty()) { | ||
154 | this.tracker.deactivate(this); | ||
155 | } | ||
156 | } else { | ||
157 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
158 | } | ||
159 | } | ||
160 | |||
161 | @Override | ||
162 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
163 | return Collections.unmodifiableMap(this.mailboxQueue); | ||
164 | } | ||
165 | |||
166 | @Override | ||
167 | public boolean isRecursive() { | ||
168 | return !this.isSingleton; | ||
169 | } | ||
170 | |||
171 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java new file mode 100644 index 00000000..79179880 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java | |||
@@ -0,0 +1,216 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.List; | ||
13 | import java.util.Map; | ||
14 | import java.util.Map.Entry; | ||
15 | import java.util.Set; | ||
16 | import java.util.function.Function; | ||
17 | |||
18 | import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting; | ||
19 | import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
21 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
22 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer; | ||
23 | import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription; | ||
24 | import tools.refinery.viatra.runtime.rete.index.StandardIndexer; | ||
25 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
26 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
27 | import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; | ||
28 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
29 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
30 | import tools.refinery.viatra.runtime.rete.network.StandardNode; | ||
31 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
32 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
33 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
34 | import tools.refinery.viatra.runtime.rete.network.communication.NodeComparator; | ||
35 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
36 | import tools.refinery.viatra.runtime.rete.single.DiscriminatorDispatcherNode; | ||
37 | |||
38 | /** | ||
39 | * Timely (DDF) implementation of the {@link CommunicationTracker}. | ||
40 | * | ||
41 | * @author Tamas Szabo | ||
42 | * @since 2.3 | ||
43 | */ | ||
44 | public class TimelyCommunicationTracker extends CommunicationTracker { | ||
45 | |||
46 | protected final TimelyConfiguration configuration; | ||
47 | |||
48 | public TimelyCommunicationTracker(final TimelyConfiguration configuration) { | ||
49 | this.configuration = configuration; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | protected CommunicationGroup createGroup(final Node representative, final int index) { | ||
54 | final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; | ||
55 | return new TimelyCommunicationGroup(this, representative, index, isSingleton); | ||
56 | } | ||
57 | |||
58 | @Override | ||
59 | protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) { | ||
60 | for (final CommunicationGroup oldGroup : oldActiveGroups) { | ||
61 | for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) { | ||
62 | for (final Mailbox mailbox : entry.getValue()) { | ||
63 | final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver()); | ||
64 | newGroup.notifyHasMessage(mailbox, entry.getKey()); | ||
65 | } | ||
66 | } | ||
67 | } | ||
68 | } | ||
69 | |||
70 | @Override | ||
71 | public Mailbox proxifyMailbox(final Node requester, final Mailbox original) { | ||
72 | final Mailbox mailboxToProxify = (original instanceof TimelyMailboxProxy) | ||
73 | ? ((TimelyMailboxProxy) original).getWrappedMailbox() | ||
74 | : original; | ||
75 | final TimestampTransformation preprocessor = getPreprocessor(requester, mailboxToProxify.getReceiver()); | ||
76 | if (preprocessor == null) { | ||
77 | return mailboxToProxify; | ||
78 | } else { | ||
79 | return new TimelyMailboxProxy(mailboxToProxify, preprocessor); | ||
80 | } | ||
81 | } | ||
82 | |||
83 | @Override | ||
84 | public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) { | ||
85 | final IndexerListener listenerToProxify = (original instanceof TimelyIndexerListenerProxy) | ||
86 | ? ((TimelyIndexerListenerProxy) original).getWrappedIndexerListener() | ||
87 | : original; | ||
88 | final TimestampTransformation preprocessor = getPreprocessor(requester, listenerToProxify.getOwner()); | ||
89 | if (preprocessor == null) { | ||
90 | return listenerToProxify; | ||
91 | } else { | ||
92 | return new TimelyIndexerListenerProxy(listenerToProxify, preprocessor); | ||
93 | } | ||
94 | } | ||
95 | |||
96 | protected TimestampTransformation getPreprocessor(final Node source, final Node target) { | ||
97 | final Node effectiveSource = source instanceof SpecializedProjectionIndexer | ||
98 | ? ((SpecializedProjectionIndexer) source).getActiveNode() | ||
99 | : source; | ||
100 | final CommunicationGroup sourceGroup = this.getGroup(effectiveSource); | ||
101 | final CommunicationGroup targetGroup = this.getGroup(target); | ||
102 | |||
103 | if (sourceGroup != null && targetGroup != null) { | ||
104 | // during RETE construction, the groups may be still null | ||
105 | if (sourceGroup != targetGroup && sourceGroup.isRecursive()) { | ||
106 | // targetGroup is a successor SCC of sourceGroup | ||
107 | // and sourceGroup is a recursive SCC | ||
108 | // then we need to zero out the timestamps | ||
109 | return TimestampTransformation.RESET; | ||
110 | } | ||
111 | if (sourceGroup == targetGroup && target instanceof ProductionNode) { | ||
112 | // if requester and receiver are in the same SCC | ||
113 | // and receiver is a production node | ||
114 | // then we need to increment the timestamps | ||
115 | return TimestampTransformation.INCREMENT; | ||
116 | } | ||
117 | } | ||
118 | |||
119 | return null; | ||
120 | } | ||
121 | |||
122 | @Override | ||
123 | protected void postProcessNode(final Node node) { | ||
124 | if (node instanceof NetworkStructureChangeSensitiveNode) { | ||
125 | ((NetworkStructureChangeSensitiveNode) node).networkStructureChanged(); | ||
126 | } | ||
127 | } | ||
128 | |||
129 | @Override | ||
130 | protected void postProcessGroup(final CommunicationGroup group) { | ||
131 | if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { | ||
132 | final Node representative = group.getRepresentative(); | ||
133 | final Set<Node> groupMembers = this.sccInformationProvider.sccs.getPartition(representative); | ||
134 | if (groupMembers.size() > 1) { | ||
135 | final Graph<Node> graph = new Graph<Node>(); | ||
136 | |||
137 | for (final Node node : groupMembers) { | ||
138 | graph.insertNode(node); | ||
139 | } | ||
140 | |||
141 | for (final Node source : groupMembers) { | ||
142 | for (final Node target : this.dependencyGraph.getTargetNodes(source)) { | ||
143 | // (1) the edge is not a recursion cut point | ||
144 | // (2) the edge is within this group | ||
145 | if (!this.isRecursionCutPoint(source, target) && groupMembers.contains(target)) { | ||
146 | graph.insertEdge(source, target); | ||
147 | } | ||
148 | } | ||
149 | } | ||
150 | |||
151 | final List<Node> orderedNodes = TopologicalSorting.compute(graph); | ||
152 | final Map<Node, Integer> nodeMap = CollectionsFactory.createMap(); | ||
153 | int identifier = 0; | ||
154 | for (final Node orderedNode : orderedNodes) { | ||
155 | nodeMap.put(orderedNode, identifier++); | ||
156 | } | ||
157 | |||
158 | ((TimelyCommunicationGroup) group).setComparatorAndReorderMailboxes(new NodeComparator(nodeMap)); | ||
159 | } | ||
160 | } | ||
161 | } | ||
162 | |||
163 | /** | ||
164 | * This static field is used for debug purposes in the DotGenerator. | ||
165 | */ | ||
166 | public static final Function<Node, Function<Node, String>> EDGE_LABEL_FUNCTION = new Function<Node, Function<Node, String>>() { | ||
167 | |||
168 | @Override | ||
169 | public Function<Node, String> apply(final Node source) { | ||
170 | return new Function<Node, String>() { | ||
171 | @Override | ||
172 | public String apply(final Node target) { | ||
173 | if (source instanceof SpecializedProjectionIndexer) { | ||
174 | final Collection<ListenerSubscription> subscriptions = ((SpecializedProjectionIndexer) source) | ||
175 | .getSubscriptions(); | ||
176 | for (final ListenerSubscription subscription : subscriptions) { | ||
177 | if (subscription.getListener().getOwner() == target | ||
178 | && subscription.getListener() instanceof TimelyIndexerListenerProxy) { | ||
179 | return ((TimelyIndexerListenerProxy) subscription.getListener()).preprocessor | ||
180 | .toString(); | ||
181 | } | ||
182 | } | ||
183 | } | ||
184 | if (source instanceof StandardIndexer) { | ||
185 | final Collection<IndexerListener> listeners = ((StandardIndexer) source).getListeners(); | ||
186 | for (final IndexerListener listener : listeners) { | ||
187 | if (listener.getOwner() == target && listener instanceof TimelyIndexerListenerProxy) { | ||
188 | return ((TimelyIndexerListenerProxy) listener).preprocessor.toString(); | ||
189 | } | ||
190 | } | ||
191 | } | ||
192 | if (source instanceof StandardNode) { | ||
193 | final Collection<Mailbox> mailboxes = ((StandardNode) source).getChildMailboxes(); | ||
194 | for (final Mailbox mailbox : mailboxes) { | ||
195 | if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { | ||
196 | return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); | ||
197 | } | ||
198 | } | ||
199 | } | ||
200 | if (source instanceof DiscriminatorDispatcherNode) { | ||
201 | final Collection<Mailbox> mailboxes = ((DiscriminatorDispatcherNode) source) | ||
202 | .getBucketMailboxes().values(); | ||
203 | for (final Mailbox mailbox : mailboxes) { | ||
204 | if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { | ||
205 | return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); | ||
206 | } | ||
207 | } | ||
208 | } | ||
209 | return null; | ||
210 | } | ||
211 | }; | ||
212 | } | ||
213 | |||
214 | }; | ||
215 | |||
216 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java new file mode 100644 index 00000000..e8fbf84e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyIndexerListenerProxy.java | |||
@@ -0,0 +1,81 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Preconditions; | ||
14 | import tools.refinery.viatra.runtime.rete.index.IndexerListener; | ||
15 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
16 | import tools.refinery.viatra.runtime.rete.network.ProductionNode; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
18 | |||
19 | /** | ||
20 | * A timely proxy for another {@link IndexerListener}, which performs some preprocessing | ||
21 | * on the differential timestamps before passing it on to the real recipient. | ||
22 | * <p> | ||
23 | * These proxies are used on edges leading into {@link ProductionNode}s. Because {@link ProductionNode}s | ||
24 | * never ask back the indexer for its contents, there is no need to also apply the proxy on that direction. | ||
25 | * | ||
26 | * @author Tamas Szabo | ||
27 | * @since 2.3 | ||
28 | */ | ||
29 | public class TimelyIndexerListenerProxy implements IndexerListener { | ||
30 | |||
31 | protected final TimestampTransformation preprocessor; | ||
32 | protected final IndexerListener wrapped; | ||
33 | |||
34 | public TimelyIndexerListenerProxy(final IndexerListener wrapped, | ||
35 | final TimestampTransformation preprocessor) { | ||
36 | Preconditions.checkArgument(!(wrapped instanceof TimelyIndexerListenerProxy), "Proxy in a proxy is not allowed!"); | ||
37 | this.wrapped = wrapped; | ||
38 | this.preprocessor = preprocessor; | ||
39 | } | ||
40 | |||
41 | public IndexerListener getWrappedIndexerListener() { | ||
42 | return wrapped; | ||
43 | } | ||
44 | |||
45 | @Override | ||
46 | public Node getOwner() { | ||
47 | return this.wrapped.getOwner(); | ||
48 | } | ||
49 | |||
50 | @Override | ||
51 | public void notifyIndexerUpdate(final Direction direction, final Tuple updateElement, final Tuple signature, | ||
52 | final boolean change, final Timestamp timestamp) { | ||
53 | this.wrapped.notifyIndexerUpdate(direction, updateElement, signature, change, preprocessor.process(timestamp)); | ||
54 | } | ||
55 | |||
56 | @Override | ||
57 | public String toString() { | ||
58 | return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString(); | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public boolean equals(final Object obj) { | ||
63 | if (obj == null || obj.getClass() != this.getClass()) { | ||
64 | return false; | ||
65 | } else if (obj == this) { | ||
66 | return true; | ||
67 | } else { | ||
68 | final TimelyIndexerListenerProxy that = (TimelyIndexerListenerProxy) obj; | ||
69 | return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor; | ||
70 | } | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public int hashCode() { | ||
75 | int hash = 1; | ||
76 | hash = hash * 17 + this.wrapped.hashCode(); | ||
77 | hash = hash * 31 + this.preprocessor.hashCode(); | ||
78 | return hash; | ||
79 | } | ||
80 | |||
81 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java new file mode 100644 index 00000000..550bfbeb --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyMailboxProxy.java | |||
@@ -0,0 +1,102 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
12 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
13 | import tools.refinery.viatra.runtime.matchers.util.Preconditions; | ||
14 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
15 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
16 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
18 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
19 | |||
20 | /** | ||
21 | * A timely proxy for another {@link Mailbox}, which performs some preprocessing | ||
22 | * on the differential timestamps before passing it on to the real recipient. | ||
23 | * | ||
24 | * @author Tamas Szabo | ||
25 | * @since 2.3 | ||
26 | */ | ||
27 | public class TimelyMailboxProxy implements Mailbox { | ||
28 | |||
29 | protected final TimestampTransformation preprocessor; | ||
30 | protected final Mailbox wrapped; | ||
31 | |||
32 | public TimelyMailboxProxy(final Mailbox wrapped, final TimestampTransformation preprocessor) { | ||
33 | Preconditions.checkArgument(!(wrapped instanceof TimelyMailboxProxy), "Proxy in a proxy is not allowed!"); | ||
34 | this.wrapped = wrapped; | ||
35 | this.preprocessor = preprocessor; | ||
36 | } | ||
37 | |||
38 | public Mailbox getWrappedMailbox() { | ||
39 | return wrapped; | ||
40 | } | ||
41 | |||
42 | @Override | ||
43 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
44 | this.wrapped.postMessage(direction, update, preprocessor.process(timestamp)); | ||
45 | } | ||
46 | |||
47 | @Override | ||
48 | public String toString() { | ||
49 | return this.preprocessor.toString() + "_PROXY -> " + this.wrapped.toString(); | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void clear() { | ||
54 | this.wrapped.clear(); | ||
55 | } | ||
56 | |||
57 | @Override | ||
58 | public void deliverAll(final MessageSelector selector) { | ||
59 | this.wrapped.deliverAll(selector); | ||
60 | } | ||
61 | |||
62 | @Override | ||
63 | public CommunicationGroup getCurrentGroup() { | ||
64 | return this.wrapped.getCurrentGroup(); | ||
65 | } | ||
66 | |||
67 | @Override | ||
68 | public void setCurrentGroup(final CommunicationGroup group) { | ||
69 | this.wrapped.setCurrentGroup(group); | ||
70 | } | ||
71 | |||
72 | @Override | ||
73 | public Receiver getReceiver() { | ||
74 | return this.wrapped.getReceiver(); | ||
75 | } | ||
76 | |||
77 | @Override | ||
78 | public boolean isEmpty() { | ||
79 | return this.wrapped.isEmpty(); | ||
80 | } | ||
81 | |||
82 | @Override | ||
83 | public boolean equals(final Object obj) { | ||
84 | if (obj == null || obj.getClass() != this.getClass()) { | ||
85 | return false; | ||
86 | } else if (obj == this) { | ||
87 | return true; | ||
88 | } else { | ||
89 | final TimelyMailboxProxy that = (TimelyMailboxProxy) obj; | ||
90 | return this.wrapped.equals(that.wrapped) && this.preprocessor == that.preprocessor; | ||
91 | } | ||
92 | } | ||
93 | |||
94 | @Override | ||
95 | public int hashCode() { | ||
96 | int hash = 1; | ||
97 | hash = hash * 17 + this.wrapped.hashCode(); | ||
98 | hash = hash * 31 + this.preprocessor.hashCode(); | ||
99 | return hash; | ||
100 | } | ||
101 | |||
102 | } | ||
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java new file mode 100644 index 00000000..8929eb5c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimestampTransformation.java | |||
@@ -0,0 +1,48 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
12 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
13 | |||
14 | /** | ||
15 | * Values of this enum perform different kind of preprocessing on {@link Timestamp}s. | ||
16 | * This is used on edges leading in and out from {@link Node}s in recursive {@link TimelyCommunicationGroup}s. | ||
17 | * | ||
18 | * @author Tamas Szabo | ||
19 | * @since 2.3 | ||
20 | */ | ||
21 | public enum TimestampTransformation { | ||
22 | |||
23 | INCREMENT { | ||
24 | @Override | ||
25 | public Timestamp process(final Timestamp timestamp) { | ||
26 | return new Timestamp(timestamp.getValue() + 1); | ||
27 | } | ||
28 | |||
29 | @Override | ||
30 | public String toString() { | ||
31 | return "INCREMENT"; | ||
32 | } | ||
33 | }, | ||
34 | RESET { | ||
35 | @Override | ||
36 | public Timestamp process(final Timestamp timestamp) { | ||
37 | return Timestamp.ZERO; | ||
38 | } | ||
39 | |||
40 | @Override | ||
41 | public String toString() { | ||
42 | return "RESET"; | ||
43 | } | ||
44 | }; | ||
45 | |||
46 | public abstract Timestamp process(final Timestamp timestamp); | ||
47 | |||
48 | } | ||