aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java163
1 files changed, 163 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java
new file mode 100644
index 00000000..baf7270f
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/DefaultMailbox.java
@@ -0,0 +1,163 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.network.mailbox.timeless;
10
11import java.util.Map;
12
13import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
14import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
15import tools.refinery.viatra.runtime.matchers.util.Direction;
16import tools.refinery.viatra.runtime.rete.network.Receiver;
17import tools.refinery.viatra.runtime.rete.network.ReteContainer;
18import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
19import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
20import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector;
21import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
22import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox;
23import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
24
25/**
26 * Default mailbox implementation.
27 * <p>
28 * Usually, the mailbox performs counting of messages so that they can cancel each other out. However, if marked as a
29 * fall-through mailbox, than update messages are delivered directly to the receiver node to reduce overhead.
30 *
31 * @author Tamas Szabo
32 * @since 2.0
33 */
34public class DefaultMailbox implements AdaptableMailbox {
35
36 private static int SIZE_TRESHOLD = 127;
37
38 protected Map<Tuple, Integer> queue;
39 protected Map<Tuple, Integer> buffer;
40 protected final Receiver receiver;
41 protected final ReteContainer container;
42 protected boolean delivering;
43 protected Mailbox adapter;
44 protected CommunicationGroup group;
45
46 public DefaultMailbox(final Receiver receiver, final ReteContainer container) {
47 this.receiver = receiver;
48 this.container = container;
49 this.queue = CollectionsFactory.createMap();
50 this.buffer = CollectionsFactory.createMap();
51 this.adapter = this;
52 }
53
54 protected Map<Tuple, Integer> getActiveQueue() {
55 if (this.delivering) {
56 return this.buffer;
57 } else {
58 return this.queue;
59 }
60 }
61
62 @Override
63 public Mailbox getAdapter() {
64 return this.adapter;
65 }
66
67 @Override
68 public void setAdapter(final Mailbox adapter) {
69 this.adapter = adapter;
70 }
71
72 @Override
73 public boolean isEmpty() {
74 return getActiveQueue().isEmpty();
75 }
76
77 @Override
78 public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
79 final Map<Tuple, Integer> activeQueue = getActiveQueue();
80 final boolean wasEmpty = activeQueue.isEmpty();
81
82 boolean significantChange = false;
83 Integer count = activeQueue.get(update);
84 if (count == null) {
85 count = 0;
86 significantChange = true;
87 }
88
89 if (direction == Direction.DELETE) {
90 count--;
91 } else {
92 count++;
93 }
94
95 if (count == 0) {
96 activeQueue.remove(update);
97 significantChange = true;
98 } else {
99 activeQueue.put(update, count);
100 }
101
102 if (significantChange) {
103 final Mailbox targetMailbox = this.adapter;
104 final CommunicationGroup targetGroup = this.adapter.getCurrentGroup();
105
106 if (wasEmpty) {
107 targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.DEFAULT);
108 } else if (activeQueue.isEmpty()) {
109 targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.DEFAULT);
110 }
111 }
112 }
113
114 @Override
115 public void deliverAll(final MessageSelector kind) {
116 if (kind == PhasedSelector.DEFAULT) {
117 // use the buffer during delivering so that there is a clear
118 // separation between the stages
119 this.delivering = true;
120 this.receiver.batchUpdate(this.queue.entrySet(), Timestamp.ZERO);
121 this.delivering = false;
122
123 if (queue.size() > SIZE_TRESHOLD) {
124 this.queue = this.buffer;
125 this.buffer = CollectionsFactory.createMap();
126 } else {
127 this.queue.clear();
128 final Map<Tuple, Integer> tmpQueue = this.queue;
129 this.queue = this.buffer;
130 this.buffer = tmpQueue;
131 }
132 } else {
133 throw new IllegalArgumentException("Unsupported message kind " + kind);
134 }
135 }
136
137 @Override
138 public String toString() {
139 return "D_MBOX (" + this.receiver + ") " + this.getActiveQueue();
140 }
141
142 @Override
143 public Receiver getReceiver() {
144 return this.receiver;
145 }
146
147 @Override
148 public void clear() {
149 this.queue.clear();
150 this.buffer.clear();
151 }
152
153 @Override
154 public CommunicationGroup getCurrentGroup() {
155 return this.group;
156 }
157
158 @Override
159 public void setCurrentGroup(final CommunicationGroup group) {
160 this.group = group;
161 }
162
163}