diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java new file mode 100644 index 00000000..0394d92c --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java | |||
@@ -0,0 +1,171 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.communication.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Comparator; | ||
14 | import java.util.HashMap; | ||
15 | import java.util.Map; | ||
16 | import java.util.Map.Entry; | ||
17 | import java.util.Set; | ||
18 | import java.util.TreeMap; | ||
19 | import java.util.TreeSet; | ||
20 | |||
21 | import org.apache.log4j.Logger; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
23 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
28 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
29 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
30 | |||
31 | /** | ||
32 | * A timely communication group implementation. {@link TimelyMailbox}es and {@link LazyFoldingNode}s are ordered in the | ||
33 | * increasing order of timestamps. | ||
34 | * | ||
35 | * @author Tamas Szabo | ||
36 | * @since 2.3 | ||
37 | */ | ||
38 | public class TimelyCommunicationGroup extends CommunicationGroup { | ||
39 | |||
40 | private final boolean isSingleton; | ||
41 | private final TreeMap<Timestamp, Set<Mailbox>> mailboxQueue; | ||
42 | // may be null - only used in the scattered case where we need to take care of mailboxes and resumables too | ||
43 | private Comparator<Node> nodeComparator; | ||
44 | private boolean currentlyDelivering; | ||
45 | private Timestamp currentlyDeliveredTimestamp; | ||
46 | |||
47 | public TimelyCommunicationGroup(final TimelyCommunicationTracker tracker, final Node representative, | ||
48 | final int identifier, final boolean isSingleton) { | ||
49 | super(tracker, representative, identifier); | ||
50 | this.isSingleton = isSingleton; | ||
51 | this.mailboxQueue = CollectionsFactory.createTreeMap(); | ||
52 | this.currentlyDelivering = false; | ||
53 | } | ||
54 | |||
55 | /** | ||
56 | * Sets the {@link Comparator} to be used to order the {@link Mailbox}es at a given {@link Timestamp} in the mailbox | ||
57 | * queue. Additionally, reorders already queued {@link Mailbox}es to reflect the new comparator. The comparator may | ||
58 | * be null, in this case, no set ordering will be enforced among the {@link Mailbox}es. | ||
59 | */ | ||
60 | public void setComparatorAndReorderMailboxes(final Comparator<Node> nodeComparator) { | ||
61 | this.nodeComparator = nodeComparator; | ||
62 | if (!this.mailboxQueue.isEmpty()) { | ||
63 | final HashMap<Timestamp, Set<Mailbox>> queueCopy = new HashMap<Timestamp, Set<Mailbox>>(this.mailboxQueue); | ||
64 | this.mailboxQueue.clear(); | ||
65 | for (final Entry<Timestamp, Set<Mailbox>> entry : queueCopy.entrySet()) { | ||
66 | for (final Mailbox mailbox : entry.getValue()) { | ||
67 | this.notifyHasMessage(mailbox, entry.getKey()); | ||
68 | } | ||
69 | } | ||
70 | } | ||
71 | } | ||
72 | |||
73 | @Override | ||
74 | public void deliverMessages() { | ||
75 | this.currentlyDelivering = true; | ||
76 | while (!this.mailboxQueue.isEmpty()) { | ||
77 | // care must be taken here how we iterate over the mailboxes | ||
78 | // it is not okay to loop over the mailboxes at once because a mailbox may disappear from the collection as | ||
79 | // a result of delivering messages from another mailboxes under the same timestamp | ||
80 | // because of this, it is crucial that we pick the mailboxes one by one | ||
81 | final Entry<Timestamp, Set<Mailbox>> entry = this.mailboxQueue.firstEntry(); | ||
82 | final Timestamp timestamp = entry.getKey(); | ||
83 | final Set<Mailbox> mailboxes = entry.getValue(); | ||
84 | final Mailbox mailbox = mailboxes.iterator().next(); | ||
85 | mailboxes.remove(mailbox); | ||
86 | if (mailboxes.isEmpty()) { | ||
87 | this.mailboxQueue.pollFirstEntry(); | ||
88 | } | ||
89 | assert mailbox instanceof TimelyMailbox; | ||
90 | /* debug */ this.currentlyDeliveredTimestamp = timestamp; | ||
91 | mailbox.deliverAll(timestamp); | ||
92 | /* debug */ this.currentlyDeliveredTimestamp = null; | ||
93 | } | ||
94 | this.currentlyDelivering = false; | ||
95 | } | ||
96 | |||
97 | @Override | ||
98 | public boolean isEmpty() { | ||
99 | return this.mailboxQueue.isEmpty(); | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void notifyHasMessage(final Mailbox mailbox, MessageSelector kind) { | ||
104 | if (kind instanceof Timestamp) { | ||
105 | final Timestamp timestamp = (Timestamp) kind; | ||
106 | if (Options.MONITOR_VIOLATION_OF_DIFFERENTIAL_DATAFLOW_TIMESTAMPS) { | ||
107 | if (timestamp.compareTo(this.currentlyDeliveredTimestamp) < 0) { | ||
108 | final Logger logger = this.representative.getContainer().getNetwork().getEngine().getLogger(); | ||
109 | logger.error( | ||
110 | "[INTERNAL ERROR] Violation of differential dataflow communication schema! The communication component with representative " | ||
111 | + this.representative + " observed decreasing timestamp during message delivery!"); | ||
112 | } | ||
113 | } | ||
114 | final Set<Mailbox> mailboxes = this.mailboxQueue.computeIfAbsent(timestamp, k -> { | ||
115 | if (this.nodeComparator == null) { | ||
116 | return CollectionsFactory.createSet(); | ||
117 | } else { | ||
118 | return new TreeSet<Mailbox>(new Comparator<Mailbox>() { | ||
119 | @Override | ||
120 | public int compare(final Mailbox left, final Mailbox right) { | ||
121 | return nodeComparator.compare(left.getReceiver(), right.getReceiver()); | ||
122 | } | ||
123 | }); | ||
124 | } | ||
125 | }); | ||
126 | mailboxes.add(mailbox); | ||
127 | if (!this.isEnqueued && !this.currentlyDelivering) { | ||
128 | this.tracker.activateUnenqueued(this); | ||
129 | } | ||
130 | } else { | ||
131 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
132 | } | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) { | ||
137 | if (kind instanceof Timestamp) { | ||
138 | final Timestamp timestamp = (Timestamp) kind; | ||
139 | this.mailboxQueue.compute(timestamp, (k, v) -> { | ||
140 | if (v == null) { | ||
141 | throw new IllegalStateException("No mailboxes registered at timestamp " + timestamp + "!"); | ||
142 | } | ||
143 | if (!v.remove(mailbox)) { | ||
144 | throw new IllegalStateException( | ||
145 | "The mailbox " + mailbox + " was not registered at timestamp " + timestamp + "!"); | ||
146 | } | ||
147 | if (v.isEmpty()) { | ||
148 | return null; | ||
149 | } else { | ||
150 | return v; | ||
151 | } | ||
152 | }); | ||
153 | if (this.mailboxQueue.isEmpty()) { | ||
154 | this.tracker.deactivate(this); | ||
155 | } | ||
156 | } else { | ||
157 | throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind); | ||
158 | } | ||
159 | } | ||
160 | |||
161 | @Override | ||
162 | public Map<MessageSelector, Collection<Mailbox>> getMailboxes() { | ||
163 | return Collections.unmodifiableMap(this.mailboxQueue); | ||
164 | } | ||
165 | |||
166 | @Override | ||
167 | public boolean isRecursive() { | ||
168 | return !this.isSingleton; | ||
169 | } | ||
170 | |||
171 | } | ||