aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java216
1 files changed, 216 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java
new file mode 100644
index 00000000..79179880
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationTracker.java
@@ -0,0 +1,216 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import java.util.Collection;
12import java.util.List;
13import java.util.Map;
14import java.util.Map.Entry;
15import java.util.Set;
16import java.util.function.Function;
17
18import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting;
19import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph;
20import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
21import tools.refinery.viatra.runtime.rete.index.IndexerListener;
22import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer;
23import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription;
24import tools.refinery.viatra.runtime.rete.index.StandardIndexer;
25import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
26import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
27import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode;
28import tools.refinery.viatra.runtime.rete.network.Node;
29import tools.refinery.viatra.runtime.rete.network.ProductionNode;
30import tools.refinery.viatra.runtime.rete.network.StandardNode;
31import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
32import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
33import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
34import tools.refinery.viatra.runtime.rete.network.communication.NodeComparator;
35import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
36import tools.refinery.viatra.runtime.rete.single.DiscriminatorDispatcherNode;
37
38/**
39 * Timely (DDF) implementation of the {@link CommunicationTracker}.
40 *
41 * @author Tamas Szabo
42 * @since 2.3
43 */
44public class TimelyCommunicationTracker extends CommunicationTracker {
45
46 protected final TimelyConfiguration configuration;
47
48 public TimelyCommunicationTracker(final TimelyConfiguration configuration) {
49 this.configuration = configuration;
50 }
51
52 @Override
53 protected CommunicationGroup createGroup(final Node representative, final int index) {
54 final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1;
55 return new TimelyCommunicationGroup(this, representative, index, isSingleton);
56 }
57
58 @Override
59 protected void reconstructQueueContents(final Set<CommunicationGroup> oldActiveGroups) {
60 for (final CommunicationGroup oldGroup : oldActiveGroups) {
61 for (final Entry<MessageSelector, Collection<Mailbox>> entry : oldGroup.getMailboxes().entrySet()) {
62 for (final Mailbox mailbox : entry.getValue()) {
63 final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver());
64 newGroup.notifyHasMessage(mailbox, entry.getKey());
65 }
66 }
67 }
68 }
69
70 @Override
71 public Mailbox proxifyMailbox(final Node requester, final Mailbox original) {
72 final Mailbox mailboxToProxify = (original instanceof TimelyMailboxProxy)
73 ? ((TimelyMailboxProxy) original).getWrappedMailbox()
74 : original;
75 final TimestampTransformation preprocessor = getPreprocessor(requester, mailboxToProxify.getReceiver());
76 if (preprocessor == null) {
77 return mailboxToProxify;
78 } else {
79 return new TimelyMailboxProxy(mailboxToProxify, preprocessor);
80 }
81 }
82
83 @Override
84 public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) {
85 final IndexerListener listenerToProxify = (original instanceof TimelyIndexerListenerProxy)
86 ? ((TimelyIndexerListenerProxy) original).getWrappedIndexerListener()
87 : original;
88 final TimestampTransformation preprocessor = getPreprocessor(requester, listenerToProxify.getOwner());
89 if (preprocessor == null) {
90 return listenerToProxify;
91 } else {
92 return new TimelyIndexerListenerProxy(listenerToProxify, preprocessor);
93 }
94 }
95
96 protected TimestampTransformation getPreprocessor(final Node source, final Node target) {
97 final Node effectiveSource = source instanceof SpecializedProjectionIndexer
98 ? ((SpecializedProjectionIndexer) source).getActiveNode()
99 : source;
100 final CommunicationGroup sourceGroup = this.getGroup(effectiveSource);
101 final CommunicationGroup targetGroup = this.getGroup(target);
102
103 if (sourceGroup != null && targetGroup != null) {
104 // during RETE construction, the groups may be still null
105 if (sourceGroup != targetGroup && sourceGroup.isRecursive()) {
106 // targetGroup is a successor SCC of sourceGroup
107 // and sourceGroup is a recursive SCC
108 // then we need to zero out the timestamps
109 return TimestampTransformation.RESET;
110 }
111 if (sourceGroup == targetGroup && target instanceof ProductionNode) {
112 // if requester and receiver are in the same SCC
113 // and receiver is a production node
114 // then we need to increment the timestamps
115 return TimestampTransformation.INCREMENT;
116 }
117 }
118
119 return null;
120 }
121
122 @Override
123 protected void postProcessNode(final Node node) {
124 if (node instanceof NetworkStructureChangeSensitiveNode) {
125 ((NetworkStructureChangeSensitiveNode) node).networkStructureChanged();
126 }
127 }
128
129 @Override
130 protected void postProcessGroup(final CommunicationGroup group) {
131 if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) {
132 final Node representative = group.getRepresentative();
133 final Set<Node> groupMembers = this.sccInformationProvider.sccs.getPartition(representative);
134 if (groupMembers.size() > 1) {
135 final Graph<Node> graph = new Graph<Node>();
136
137 for (final Node node : groupMembers) {
138 graph.insertNode(node);
139 }
140
141 for (final Node source : groupMembers) {
142 for (final Node target : this.dependencyGraph.getTargetNodes(source)) {
143 // (1) the edge is not a recursion cut point
144 // (2) the edge is within this group
145 if (!this.isRecursionCutPoint(source, target) && groupMembers.contains(target)) {
146 graph.insertEdge(source, target);
147 }
148 }
149 }
150
151 final List<Node> orderedNodes = TopologicalSorting.compute(graph);
152 final Map<Node, Integer> nodeMap = CollectionsFactory.createMap();
153 int identifier = 0;
154 for (final Node orderedNode : orderedNodes) {
155 nodeMap.put(orderedNode, identifier++);
156 }
157
158 ((TimelyCommunicationGroup) group).setComparatorAndReorderMailboxes(new NodeComparator(nodeMap));
159 }
160 }
161 }
162
163 /**
164 * This static field is used for debug purposes in the DotGenerator.
165 */
166 public static final Function<Node, Function<Node, String>> EDGE_LABEL_FUNCTION = new Function<Node, Function<Node, String>>() {
167
168 @Override
169 public Function<Node, String> apply(final Node source) {
170 return new Function<Node, String>() {
171 @Override
172 public String apply(final Node target) {
173 if (source instanceof SpecializedProjectionIndexer) {
174 final Collection<ListenerSubscription> subscriptions = ((SpecializedProjectionIndexer) source)
175 .getSubscriptions();
176 for (final ListenerSubscription subscription : subscriptions) {
177 if (subscription.getListener().getOwner() == target
178 && subscription.getListener() instanceof TimelyIndexerListenerProxy) {
179 return ((TimelyIndexerListenerProxy) subscription.getListener()).preprocessor
180 .toString();
181 }
182 }
183 }
184 if (source instanceof StandardIndexer) {
185 final Collection<IndexerListener> listeners = ((StandardIndexer) source).getListeners();
186 for (final IndexerListener listener : listeners) {
187 if (listener.getOwner() == target && listener instanceof TimelyIndexerListenerProxy) {
188 return ((TimelyIndexerListenerProxy) listener).preprocessor.toString();
189 }
190 }
191 }
192 if (source instanceof StandardNode) {
193 final Collection<Mailbox> mailboxes = ((StandardNode) source).getChildMailboxes();
194 for (final Mailbox mailbox : mailboxes) {
195 if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) {
196 return ((TimelyMailboxProxy) mailbox).preprocessor.toString();
197 }
198 }
199 }
200 if (source instanceof DiscriminatorDispatcherNode) {
201 final Collection<Mailbox> mailboxes = ((DiscriminatorDispatcherNode) source)
202 .getBucketMailboxes().values();
203 for (final Mailbox mailbox : mailboxes) {
204 if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) {
205 return ((TimelyMailboxProxy) mailbox).preprocessor.toString();
206 }
207 }
208 }
209 return null;
210 }
211 };
212 }
213
214 };
215
216}