aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/CommunicationTracker.java
blob: d244e6445297a2594bd6e0a2cb73d2ac432883bf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
/*******************************************************************************
 * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-v20.html.
 *
 * SPDX-License-Identifier: EPL-2.0
 *******************************************************************************/
package tools.refinery.viatra.runtime.rete.network.communication;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;

import tools.refinery.viatra.runtime.rete.itc.alg.incscc.IncSCCAlg;
import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting;
import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph;
import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
import tools.refinery.viatra.runtime.rete.aggregation.IAggregatorNode;
import tools.refinery.viatra.runtime.rete.boundary.ExternalInputEnumeratorNode;
import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode;
import tools.refinery.viatra.runtime.rete.index.DualInputNode;
import tools.refinery.viatra.runtime.rete.index.ExistenceNode;
import tools.refinery.viatra.runtime.rete.index.Indexer;
import tools.refinery.viatra.runtime.rete.index.IndexerListener;
import tools.refinery.viatra.runtime.rete.index.IterableIndexer;
import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer;
import tools.refinery.viatra.runtime.rete.network.IGroupable;
import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode;
import tools.refinery.viatra.runtime.rete.network.Node;
import tools.refinery.viatra.runtime.rete.network.ProductionNode;
import tools.refinery.viatra.runtime.rete.network.Receiver;
import tools.refinery.viatra.runtime.rete.network.ReteContainer;
import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyIndexerListenerProxy;
import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyMailboxProxy;
import tools.refinery.viatra.runtime.rete.network.mailbox.FallThroughCapableMailbox;
import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
import tools.refinery.viatra.runtime.rete.single.TransitiveClosureNode;
import tools.refinery.viatra.runtime.rete.single.TrimmerNode;

/**
 * An instance of this class is associated with every {@link ReteContainer}. The tracker serves two purposes: <br>
 * (1) It allows RETE nodes to register their communication dependencies on-the-fly. These dependencies can be
 * registered or unregistered when nodes are disposed of. <br>
 * (2) It allows RETE nodes to register their mailboxes as dirty, that is, they can tell the tracker that they have
 * something to send to other nodes in the network. The tracker is then responsible for ordering these messages (more
 * precisely, the mailboxes that contain the messages) for the associated {@link ReteContainer}. The ordering is
 * governed by the strongly connected components in the dependency network and follows a topological sorting scheme;
 * those mailboxes will be emptied first whose owner nodes do not depend on other undelivered messages.
 *
 * @author Tamas Szabo
 * @since 1.6
 *
 */
public abstract class CommunicationTracker {

    /**
     * The minimum group id assigned so far
     */
    protected int minGroupId;

    /**
     * The maximum group id assigned so far
     */
    protected int maxGroupId;

    /**
     * The dependency graph of the communications in the RETE network
     */
    protected final Graph<Node> dependencyGraph;

    /**
     * Incremental SCC information about the dependency graph
     */
    protected final IncSCCAlg<Node> sccInformationProvider;

    /**
     * Precomputed node -> communication group map
     */
    protected final Map<Node, CommunicationGroup> groupMap;

    /**
     * Priority queue of active communication groups
     */
    protected final Queue<CommunicationGroup> groupQueue;

    // groups should have a simple integer flag which represents its position in a priority queue
    // priority queue only contains the ACTIVE groups

    public CommunicationTracker() {
        this.dependencyGraph = new Graph<Node>();
        this.sccInformationProvider = new IncSCCAlg<Node>(this.dependencyGraph);
        this.groupQueue = new PriorityQueue<CommunicationGroup>();
        this.groupMap = new HashMap<Node, CommunicationGroup>();
    }

    public Graph<Node> getDependencyGraph() {
        return dependencyGraph;
    }

    public CommunicationGroup getGroup(final Node node) {
        return this.groupMap.get(node);
    }

    private void precomputeGroups() {
        groupMap.clear();

        // reconstruct group map from dependency graph
        final Graph<Node> reducedGraph = sccInformationProvider.getReducedGraph();
        final List<Node> representatives = TopologicalSorting.compute(reducedGraph);

        for (int i = 0; i < representatives.size(); i++) { // groups for SCC representatives
            final Node representative = representatives.get(i);
            createAndStoreGroup(representative, i);
        }

        minGroupId = 0;
        maxGroupId = representatives.size() - 1;

        for (final Node node : dependencyGraph.getAllNodes()) { // extend group map to the rest of nodes
            final Node representative = sccInformationProvider.getRepresentative(node);
            final CommunicationGroup group = groupMap.get(representative);
            if (representative != node) {
                addToGroup(node, group);
            }
        }

        for (final Node node : dependencyGraph.getAllNodes()) {
            // set fall-through flags of default mailboxes
            precomputeFallThroughFlag(node);
            // perform further tracker-specific post-processing
            postProcessNode(node);
        }

        // reconstruct new queue contents based on new group map
        if (!groupQueue.isEmpty()) {
            final Set<CommunicationGroup> oldActiveGroups = new HashSet<CommunicationGroup>(groupQueue);
            groupQueue.clear();
            reconstructQueueContents(oldActiveGroups);
        }

        // post process the groups
        for (final CommunicationGroup group : groupMap.values()) {
            postProcessGroup(group);
        }
    }

    /**
     * This method is responsible for reconstructing the active queue contents after the network structure has changed.
     * It it defined as abstract because the reconstruction logic is specific to each {@link CommunicationTracker}.
     * @since 2.4
     */
    protected abstract void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups);

    private void addToGroup(final Node node, final CommunicationGroup group) {
        groupMap.put(node, group);
        if (node instanceof Receiver) {
            ((Receiver) node).getMailbox().setCurrentGroup(group);
            if (node instanceof IGroupable) {
                ((IGroupable) node).setCurrentGroup(group);
            }
        }
    }

    /**
     * Depends on the groups, as well as the parent nodes of the argument, so recomputation is needed if these change
     */
    private void precomputeFallThroughFlag(final Node node) {
        CommunicationGroup group = groupMap.get(node);
        if (node instanceof Receiver) {
            IGroupable mailbox = ((Receiver) node).getMailbox();
            if (mailbox instanceof FallThroughCapableMailbox) {
                Set<Node> directParents = dependencyGraph.getSourceNodes(node).distinctValues();
                // decide between using quick&cheap fall-through, or allowing for update cancellation
                boolean fallThrough =
                        // disallow fallthrough: updates at production nodes should cancel, if they can be trimmed or
                        // disjunctive
                        (!(node instanceof ProductionNode && ( // it is a production node...
                        // with more than one parent
                        directParents.size() > 0 ||
                        // or true trimming in its sole parent
                                directParents.size() == 1 && trueTrimming(directParents.iterator().next())))) &&
                        // disallow fallthrough: external updates should be stored (if updates are delayed)
                                (!(node instanceof ExternalInputEnumeratorNode)) &&
                        // disallow fallthrough: RelationEvaluatorNode needs to be notified in batch-style, and the batching is done by the mailbox
                        // however, it is not the RelationEvaluatorNode itself that is interesting here, as that indirectly uses the BatchingReceiver
                        // so we need to disable fall-through for the BatchingReceiver
                                (!(node instanceof RelationEvaluatorNode.BatchingReceiver));
                // do additional checks
                if (fallThrough) {
                    // recursive parent groups generate excess updates that should be cancelled after delete&rederive
                    // phases
                    // aggregator and transitive closure parent nodes also generate excess updates that should be
                    // cancelled
                    directParentLoop: for (Node directParent : directParents) {
                        Set<Node> parentsToCheck = new HashSet<>();
                        // check the case where a direct parent is the reason for mailbox usage
                        parentsToCheck.add(directParent);
                        // check the case where an indirect parent (join slot) is the reason for mailbox usage
                        if (directParent instanceof DualInputNode) {
                            // in case of existence join (typically antijoin), a mailbox should allow
                            // an insertion and deletion (at the secondary slot) to cancel each other out
                            if (directParent instanceof ExistenceNode) {
                                fallThrough = false;
                                break directParentLoop;
                            }
                            // in beta nodes, indexer slots (or their active nodes) are considered indirect parents
                            DualInputNode dualInput = (DualInputNode) directParent;
                            IterableIndexer primarySlot = dualInput.getPrimarySlot();
                            if (primarySlot != null)
                                parentsToCheck.add(primarySlot.getActiveNode());
                            Indexer secondarySlot = dualInput.getSecondarySlot();
                            if (secondarySlot != null)
                                parentsToCheck.add(secondarySlot.getActiveNode());
                        }
                        for (Node parent : parentsToCheck) {
                            CommunicationGroup parentGroup = groupMap.get(parent);
                            if ( // parent is in a different, recursive group
                            (group != parentGroup && parentGroup.isRecursive()) ||
                            // node and parent within the same recursive group, and...
                                    (group == parentGroup && group.isRecursive() && (
                                    // parent is a transitive closure or aggregator node, or a trimmer
                                    // allow trimmed or disjunctive tuple updates to cancel each other
                                    (parent instanceof TransitiveClosureNode) || (parent instanceof IAggregatorNode)
                                            || trueTrimming(parent)))) {
                                fallThrough = false;
                                break directParentLoop;
                            }
                        }
                    }
                }
                // overwrite fallthrough flag with newly computed value
                ((FallThroughCapableMailbox) mailbox).setFallThrough(fallThrough);
            }
        }
    }

    /**
     * A trimmer node that actually eliminates some columns (not just reorders)
     */
    private boolean trueTrimming(Node node) {
        if (node instanceof TrimmerNode) {
            TupleMask mask = ((TrimmerNode) node).getMask();
            return (mask.indices.length != mask.sourceWidth);
        }
        return false;
    }

    public void activateUnenqueued(final CommunicationGroup group) {
        groupQueue.add(group);
        group.isEnqueued = true;
    }

    public void deactivate(final CommunicationGroup group) {
        groupQueue.remove(group);
        group.isEnqueued = false;
    }

    public CommunicationGroup getAndRemoveFirstGroup() {
        final CommunicationGroup group = groupQueue.poll();
        group.isEnqueued = false;
        return group;
    }

    public boolean isEmpty() {
        return groupQueue.isEmpty();
    }

    protected abstract CommunicationGroup createGroup(final Node representative, final int index);

    protected CommunicationGroup createAndStoreGroup(final Node representative, final int index) {
        final CommunicationGroup group = createGroup(representative, index);
        addToGroup(representative, group);
        return group;
    }

    /**
     * Registers the dependency that the target {@link Node} depends on the source {@link Node}. In other words, source
     * may send messages to target in the RETE network. If the dependency edge is already present, this method call is a
     * noop.
     *
     * @param source
     *            the source node
     * @param target
     *            the target node
     */
    public void registerDependency(final Node source, final Node target) {
        // nodes can be immediately inserted, if they already exist in the graph, this is a noop
        dependencyGraph.insertNode(source);
        dependencyGraph.insertNode(target);

        if (!this.dependencyGraph.getTargetNodes(source).containsNonZero(target)) {

            // query all these information before the actual edge insertion
            // because SCCs may be unified during the process
            final Node sourceRepresentative = sccInformationProvider.getRepresentative(source);
            final Node targetRepresentative = sccInformationProvider.getRepresentative(target);
            final boolean targetHadOutgoingEdges = sccInformationProvider.hasOutgoingEdges(targetRepresentative);

            // insert the edge
            dependencyGraph.insertEdge(source, target);

            // create groups if they do not yet exist
            CommunicationGroup sourceGroup = groupMap.get(sourceRepresentative);
            if (sourceGroup == null) {
                // create on-demand with the next smaller group id
                sourceGroup = createAndStoreGroup(sourceRepresentative, --minGroupId);
            }
            final int sourceIndex = sourceGroup.identifier;

            CommunicationGroup targetGroup = groupMap.get(targetRepresentative);
            if (targetGroup == null) {
                // create on-demand with the next larger group id
                targetGroup = createAndStoreGroup(targetRepresentative, ++maxGroupId);
            }
            final int targetIndex = targetGroup.identifier;

            if (sourceIndex <= targetIndex) {
                // indices obey current topological ordering
                refreshFallThroughFlag(target);
                postProcessNode(source);
                postProcessNode(target);
                postProcessGroup(sourceGroup);
                if (sourceGroup != targetGroup) {
                    postProcessGroup(targetGroup);
                }
            } else if (sourceIndex > targetIndex && !targetHadOutgoingEdges) {
                // indices violate current topological ordering, but we can simply bump the target index
                final boolean wasEnqueued = targetGroup.isEnqueued;
                if (wasEnqueued) {
                    groupQueue.remove(targetGroup);
                }
                targetGroup.identifier = ++maxGroupId;
                if (wasEnqueued) {
                    groupQueue.add(targetGroup);
                }

                refreshFallThroughFlag(target);
                postProcessNode(source);
                postProcessNode(target);
                postProcessGroup(sourceGroup);
                postProcessGroup(targetGroup);
            } else {
                // needs a full re-computation because of more complex change
                precomputeGroups();
            }
        }
    }

    /**
     * Returns true if the given {@link Node} is in a recursive {@link CommunicationGroup}, false otherwise.
     */
    public boolean isInRecursiveGroup(final Node node) {
        final CommunicationGroup group = this.getGroup(node);
        if (group == null) {
            return false;
        } else {
            return group.isRecursive();
        }
    }

    /**
     * Returns true if the given two {@link Node}s are in the same {@link CommunicationGroup}.
     */
    public boolean areInSameGroup(final Node left, final Node right) {
        final CommunicationGroup leftGroup = this.getGroup(left);
        final CommunicationGroup rightGroup = this.getGroup(right);
        return leftGroup != null && leftGroup == rightGroup;
    }

    /**
     * Unregisters a dependency between source and target.
     *
     * @param source
     *            the source node
     * @param target
     *            the target node
     */
    public void unregisterDependency(final Node source, final Node target) {
        // delete the edge first, and then query the SCC info provider
        this.dependencyGraph.deleteEdgeIfExists(source, target);

        final Node sourceRepresentative = sccInformationProvider.getRepresentative(source);
        final Node targetRepresentative = sccInformationProvider.getRepresentative(target);

        // if they are still in the same SCC,
        // then this deletion did not affect the SCCs,
        // and it is sufficient to recompute affected fall-through flags;
        // otherwise, we need a new pre-computation for the groupMap and groupQueue
        if (sourceRepresentative.equals(targetRepresentative)) {
            // this deletion could not have affected the split flags
            refreshFallThroughFlag(target);
            postProcessNode(source);
            postProcessNode(target);
        } else {
            // preComputeGroups takes care of the split flag maintenance
            precomputeGroups();
        }
    }

    /**
     * Refresh fall-through flags if dependencies change for given target, but no SCC change
     */
    private void refreshFallThroughFlag(final Node target) {
        precomputeFallThroughFlag(target);
        if (target instanceof DualInputNode) {
            for (final Node indirectTarget : dependencyGraph.getTargetNodes(target).distinctValues()) {
                precomputeFallThroughFlag(indirectTarget);
            }
        }
    }

    /**
     * Returns true if the given source-target edge in the communication network acts as a recursion cut point.
     * The current implementation considers edges leading into {@link ProductionNode}s as cut point iff
     * both source and target belong to the same group.
     *
     * @param source the source node
     * @param target the target node
     * @return true if the edge is a cut point, false otherwise
     * @since 2.4
     */
    protected boolean isRecursionCutPoint(final Node source, final Node target) {
        final Node effectiveSource = source instanceof SpecializedProjectionIndexer
                ? ((SpecializedProjectionIndexer) source).getActiveNode()
                : source;
        final CommunicationGroup sourceGroup = this.getGroup(effectiveSource);
        final CommunicationGroup targetGroup = this.getGroup(target);
        return sourceGroup != null && sourceGroup == targetGroup && target instanceof ProductionNode;
    }

    /**
     * This hook allows concrete tracker implementations to perform tracker-specific post processing on nodes (cf.
     * {@link NetworkStructureChangeSensitiveNode} and {@link BehaviorChangingMailbox}). At the time of the invocation,
     * the network topology has already been updated.
     */
    protected abstract void postProcessNode(final Node node);

    /**
     * This hook allows concrete tracker implementations to perform tracker-specific post processing on groups. At the
     * time of the invocation, the network topology has already been updated.
     * @since 2.4
     */
    protected abstract void postProcessGroup(final CommunicationGroup group);

    /**
     * Creates a proxy for the given {@link Mailbox} for the given requester {@link Node}. The proxy creation is
     * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used
     * to create {@link TimelyMailboxProxy}s depending on the network topology. There is no guarantee that the same
     * proxy instance is returned when this method is called multiple times with the same arguments.
     */
    public abstract Mailbox proxifyMailbox(final Node requester, final Mailbox original);

    /**
     * Creates a proxy for the given {@link IndexerListener} for the given requester {@link Node}. The proxy creation is
     * {@link CommunicationTracker}-specific and depends on the identity of the requester. This method is primarily used
     * to create {@link TimelyIndexerListenerProxy}s depending on the network topology. There is no guarantee that the
     * same proxy instance is returned when this method is called multiple times with the same arguments.
     */
    public abstract IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original);

}