aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java171
1 files changed, 171 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java
new file mode 100644
index 00000000..0394d92c
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/communication/timely/TimelyCommunicationGroup.java
@@ -0,0 +1,171 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.communication.timely;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Comparator;
14import java.util.HashMap;
15import java.util.Map;
16import java.util.Map.Entry;
17import java.util.Set;
18import java.util.TreeMap;
19import java.util.TreeSet;
20
21import org.apache.log4j.Logger;
22import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
23import tools.refinery.viatra.runtime.rete.network.Node;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
25import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
26import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
27import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
28import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;
29import tools.refinery.viatra.runtime.rete.util.Options;
30
31/**
32 * A timely communication group implementation. {@link TimelyMailbox}es and {@link LazyFoldingNode}s are ordered in the
33 * increasing order of timestamps.
34 *
35 * @author Tamas Szabo
36 * @since 2.3
37 */
38public class TimelyCommunicationGroup extends CommunicationGroup {
39
40 private final boolean isSingleton;
41 private final TreeMap<Timestamp, Set<Mailbox>> mailboxQueue;
42 // may be null - only used in the scattered case where we need to take care of mailboxes and resumables too
43 private Comparator<Node> nodeComparator;
44 private boolean currentlyDelivering;
45 private Timestamp currentlyDeliveredTimestamp;
46
47 public TimelyCommunicationGroup(final TimelyCommunicationTracker tracker, final Node representative,
48 final int identifier, final boolean isSingleton) {
49 super(tracker, representative, identifier);
50 this.isSingleton = isSingleton;
51 this.mailboxQueue = CollectionsFactory.createTreeMap();
52 this.currentlyDelivering = false;
53 }
54
55 /**
56 * Sets the {@link Comparator} to be used to order the {@link Mailbox}es at a given {@link Timestamp} in the mailbox
57 * queue. Additionally, reorders already queued {@link Mailbox}es to reflect the new comparator. The comparator may
58 * be null, in this case, no set ordering will be enforced among the {@link Mailbox}es.
59 */
60 public void setComparatorAndReorderMailboxes(final Comparator<Node> nodeComparator) {
61 this.nodeComparator = nodeComparator;
62 if (!this.mailboxQueue.isEmpty()) {
63 final HashMap<Timestamp, Set<Mailbox>> queueCopy = new HashMap<Timestamp, Set<Mailbox>>(this.mailboxQueue);
64 this.mailboxQueue.clear();
65 for (final Entry<Timestamp, Set<Mailbox>> entry : queueCopy.entrySet()) {
66 for (final Mailbox mailbox : entry.getValue()) {
67 this.notifyHasMessage(mailbox, entry.getKey());
68 }
69 }
70 }
71 }
72
73 @Override
74 public void deliverMessages() {
75 this.currentlyDelivering = true;
76 while (!this.mailboxQueue.isEmpty()) {
77 // care must be taken here how we iterate over the mailboxes
78 // it is not okay to loop over the mailboxes at once because a mailbox may disappear from the collection as
79 // a result of delivering messages from another mailboxes under the same timestamp
80 // because of this, it is crucial that we pick the mailboxes one by one
81 final Entry<Timestamp, Set<Mailbox>> entry = this.mailboxQueue.firstEntry();
82 final Timestamp timestamp = entry.getKey();
83 final Set<Mailbox> mailboxes = entry.getValue();
84 final Mailbox mailbox = mailboxes.iterator().next();
85 mailboxes.remove(mailbox);
86 if (mailboxes.isEmpty()) {
87 this.mailboxQueue.pollFirstEntry();
88 }
89 assert mailbox instanceof TimelyMailbox;
90 /* debug */ this.currentlyDeliveredTimestamp = timestamp;
91 mailbox.deliverAll(timestamp);
92 /* debug */ this.currentlyDeliveredTimestamp = null;
93 }
94 this.currentlyDelivering = false;
95 }
96
97 @Override
98 public boolean isEmpty() {
99 return this.mailboxQueue.isEmpty();
100 }
101
102 @Override
103 public void notifyHasMessage(final Mailbox mailbox, MessageSelector kind) {
104 if (kind instanceof Timestamp) {
105 final Timestamp timestamp = (Timestamp) kind;
106 if (Options.MONITOR_VIOLATION_OF_DIFFERENTIAL_DATAFLOW_TIMESTAMPS) {
107 if (timestamp.compareTo(this.currentlyDeliveredTimestamp) < 0) {
108 final Logger logger = this.representative.getContainer().getNetwork().getEngine().getLogger();
109 logger.error(
110 "[INTERNAL ERROR] Violation of differential dataflow communication schema! The communication component with representative "
111 + this.representative + " observed decreasing timestamp during message delivery!");
112 }
113 }
114 final Set<Mailbox> mailboxes = this.mailboxQueue.computeIfAbsent(timestamp, k -> {
115 if (this.nodeComparator == null) {
116 return CollectionsFactory.createSet();
117 } else {
118 return new TreeSet<Mailbox>(new Comparator<Mailbox>() {
119 @Override
120 public int compare(final Mailbox left, final Mailbox right) {
121 return nodeComparator.compare(left.getReceiver(), right.getReceiver());
122 }
123 });
124 }
125 });
126 mailboxes.add(mailbox);
127 if (!this.isEnqueued && !this.currentlyDelivering) {
128 this.tracker.activateUnenqueued(this);
129 }
130 } else {
131 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
132 }
133 }
134
135 @Override
136 public void notifyLostAllMessages(final Mailbox mailbox, final MessageSelector kind) {
137 if (kind instanceof Timestamp) {
138 final Timestamp timestamp = (Timestamp) kind;
139 this.mailboxQueue.compute(timestamp, (k, v) -> {
140 if (v == null) {
141 throw new IllegalStateException("No mailboxes registered at timestamp " + timestamp + "!");
142 }
143 if (!v.remove(mailbox)) {
144 throw new IllegalStateException(
145 "The mailbox " + mailbox + " was not registered at timestamp " + timestamp + "!");
146 }
147 if (v.isEmpty()) {
148 return null;
149 } else {
150 return v;
151 }
152 });
153 if (this.mailboxQueue.isEmpty()) {
154 this.tracker.deactivate(this);
155 }
156 } else {
157 throw new IllegalArgumentException(UNSUPPORTED_MESSAGE_KIND + kind);
158 }
159 }
160
161 @Override
162 public Map<MessageSelector, Collection<Mailbox>> getMailboxes() {
163 return Collections.unmodifiableMap(this.mailboxQueue);
164 }
165
166 @Override
167 public boolean isRecursive() {
168 return !this.isSingleton;
169 }
170
171}