diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java new file mode 100644 index 00000000..50d19882 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/PosetAwareMailbox.java | |||
@@ -0,0 +1,218 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import java.util.HashSet; | ||
12 | import java.util.Map.Entry; | ||
13 | import java.util.Set; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.context.IPosetComparator; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
20 | import tools.refinery.viatra.runtime.rete.network.PosetAwareReceiver; | ||
21 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
25 | import tools.refinery.viatra.runtime.rete.network.indexer.GroupBasedMessageIndexer; | ||
26 | |||
27 | /** | ||
28 | * A monotonicity aware mailbox implementation. The mailbox uses an {@link IPosetComparator} to identify if a pair of | ||
29 | * REVOKE - INSERT updates represent a monotone change pair. The mailbox is used by {@link PosetAwareReceiver}s. | ||
30 | * | ||
31 | * @author Tamas Szabo | ||
32 | * @since 2.0 | ||
33 | */ | ||
34 | public class PosetAwareMailbox extends AbstractUpdateSplittingMailbox<GroupBasedMessageIndexer, PosetAwareReceiver> { | ||
35 | |||
36 | protected final TupleMask groupMask; | ||
37 | |||
38 | public PosetAwareMailbox(final PosetAwareReceiver receiver, final ReteContainer container) { | ||
39 | super(receiver, container, () -> new GroupBasedMessageIndexer(receiver.getCoreMask())); | ||
40 | this.groupMask = receiver.getCoreMask(); | ||
41 | } | ||
42 | |||
43 | @Override | ||
44 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
45 | final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
46 | final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
47 | final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0; | ||
48 | final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0; | ||
49 | final TupleMask coreMask = this.receiver.getCoreMask(); | ||
50 | |||
51 | // it cannot happen that it was present in both | ||
52 | assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone); | ||
53 | |||
54 | if (direction == Direction.INSERT) { | ||
55 | if (wasPresentAsAntiMonotone) { | ||
56 | // it was an anti-monotone one before | ||
57 | antiMonotoneQueue.insert(update); | ||
58 | } else { | ||
59 | // it was a monotone one before or did not exist at all | ||
60 | monotoneQueue.insert(update); | ||
61 | |||
62 | // if it was not present in the monotone queue before, then | ||
63 | // we need to check whether it makes REVOKE updates monotone | ||
64 | if (!wasPresentAsMonotone) { | ||
65 | final Set<Tuple> counterParts = tryFindCounterPart(update, false, true); | ||
66 | for (final Tuple counterPart : counterParts) { | ||
67 | final int count = antiMonotoneQueue.getCount(counterPart); | ||
68 | assert count < 0; | ||
69 | antiMonotoneQueue.update(counterPart, -count); | ||
70 | monotoneQueue.update(counterPart, count); | ||
71 | } | ||
72 | } | ||
73 | } | ||
74 | } else { | ||
75 | if (wasPresentAsAntiMonotone) { | ||
76 | // it was an anti-monotone one before | ||
77 | antiMonotoneQueue.delete(update); | ||
78 | } else if (wasPresentAsMonotone) { | ||
79 | // it was a monotone one before | ||
80 | monotoneQueue.delete(update); | ||
81 | |||
82 | // and we need to check whether the monotone REVOKE updates | ||
83 | // still have a reinforcing counterpart | ||
84 | final Set<Tuple> candidates = new HashSet<Tuple>(); | ||
85 | final Tuple key = coreMask.transform(update); | ||
86 | for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(key).entrySet()) { | ||
87 | if (entry.getValue() < 0) { | ||
88 | final Tuple candidate = entry.getKey(); | ||
89 | final Set<Tuple> counterParts = tryFindCounterPart(candidate, true, false); | ||
90 | if (counterParts.isEmpty()) { | ||
91 | // all of them are gone | ||
92 | candidates.add(candidate); | ||
93 | } | ||
94 | } | ||
95 | } | ||
96 | |||
97 | // move the candidates from the monotone queue to the | ||
98 | // anti-monotone queue because they do not have a | ||
99 | // counterpart anymore | ||
100 | for (final Tuple candidate : candidates) { | ||
101 | final int count = monotoneQueue.getCount(candidate); | ||
102 | assert count < 0; | ||
103 | monotoneQueue.update(candidate, -count); | ||
104 | antiMonotoneQueue.update(candidate, count); | ||
105 | } | ||
106 | } else { | ||
107 | // it did not exist before | ||
108 | final Set<Tuple> counterParts = tryFindCounterPart(update, true, false); | ||
109 | if (counterParts.isEmpty()) { | ||
110 | // there is no tuple that would make this update monotone | ||
111 | antiMonotoneQueue.delete(update); | ||
112 | } else { | ||
113 | // there is a reinforcing counterpart | ||
114 | monotoneQueue.delete(update); | ||
115 | } | ||
116 | } | ||
117 | } | ||
118 | |||
119 | if (antiMonotoneQueue.isEmpty()) { | ||
120 | this.group.notifyLostAllMessages(this, PhasedSelector.ANTI_MONOTONE); | ||
121 | } else { | ||
122 | this.group.notifyHasMessage(this, PhasedSelector.ANTI_MONOTONE); | ||
123 | } | ||
124 | |||
125 | if (monotoneQueue.isEmpty()) { | ||
126 | this.group.notifyLostAllMessages(this, PhasedSelector.MONOTONE); | ||
127 | } else { | ||
128 | this.group.notifyHasMessage(this, PhasedSelector.MONOTONE); | ||
129 | } | ||
130 | } | ||
131 | |||
132 | protected Set<Tuple> tryFindCounterPart(final Tuple first, final boolean findPositiveCounterPart, | ||
133 | final boolean findAllCounterParts) { | ||
134 | final GroupBasedMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
135 | final GroupBasedMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
136 | final TupleMask coreMask = this.receiver.getCoreMask(); | ||
137 | final TupleMask posetMask = this.receiver.getPosetMask(); | ||
138 | final IPosetComparator posetComparator = this.receiver.getPosetComparator(); | ||
139 | final Set<Tuple> result = CollectionsFactory.createSet(); | ||
140 | final Tuple firstKey = coreMask.transform(first); | ||
141 | final Tuple firstValue = posetMask.transform(first); | ||
142 | |||
143 | if (findPositiveCounterPart) { | ||
144 | for (final Entry<Tuple, Integer> entry : monotoneQueue.getTuplesByGroup(firstKey).entrySet()) { | ||
145 | final Tuple secondValue = posetMask.transform(entry.getKey()); | ||
146 | if (entry.getValue() > 0 && posetComparator.isLessOrEqual(firstValue, secondValue)) { | ||
147 | result.add(entry.getKey()); | ||
148 | if (!findAllCounterParts) { | ||
149 | return result; | ||
150 | } | ||
151 | } | ||
152 | } | ||
153 | } else { | ||
154 | for (final Entry<Tuple, Integer> entry : antiMonotoneQueue.getTuplesByGroup(firstKey).entrySet()) { | ||
155 | final Tuple secondValue = posetMask.transform(entry.getKey()); | ||
156 | if (posetComparator.isLessOrEqual(secondValue, firstValue)) { | ||
157 | result.add(entry.getKey()); | ||
158 | if (!findAllCounterParts) { | ||
159 | return result; | ||
160 | } | ||
161 | } | ||
162 | } | ||
163 | } | ||
164 | |||
165 | return result; | ||
166 | } | ||
167 | |||
168 | @Override | ||
169 | public void deliverAll(final MessageSelector kind) { | ||
170 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
171 | // use the buffer during delivering so that there is a clear | ||
172 | // separation between the stages | ||
173 | this.deliveringAntiMonotone = true; | ||
174 | |||
175 | for (final Tuple group : this.antiMonotoneQueue.getGroups()) { | ||
176 | for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuplesByGroup(group).entrySet()) { | ||
177 | final Tuple update = entry.getKey(); | ||
178 | final int count = entry.getValue(); | ||
179 | assert count < 0; | ||
180 | for (int i = 0; i < Math.abs(count); i++) { | ||
181 | this.receiver.updateWithPosetInfo(Direction.DELETE, update, false); | ||
182 | } | ||
183 | } | ||
184 | } | ||
185 | |||
186 | this.deliveringAntiMonotone = false; | ||
187 | swapAndClearAntiMonotone(); | ||
188 | } else if (kind == PhasedSelector.MONOTONE) { | ||
189 | // use the buffer during delivering so that there is a clear | ||
190 | // separation between the stages | ||
191 | this.deliveringMonotone = true; | ||
192 | |||
193 | for (final Tuple group : this.monotoneQueue.getGroups()) { | ||
194 | for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuplesByGroup(group).entrySet()) { | ||
195 | final Tuple update = entry.getKey(); | ||
196 | final int count = entry.getValue(); | ||
197 | assert count != 0; | ||
198 | final Direction direction = count < 0 ? Direction.DELETE : Direction.INSERT; | ||
199 | for (int i = 0; i < Math.abs(count); i++) { | ||
200 | this.receiver.updateWithPosetInfo(direction, update, true); | ||
201 | } | ||
202 | } | ||
203 | } | ||
204 | |||
205 | this.deliveringMonotone = false; | ||
206 | swapAndClearMonotone(); | ||
207 | } else { | ||
208 | throw new IllegalArgumentException("Unsupported message kind " + kind); | ||
209 | } | ||
210 | } | ||
211 | |||
212 | @Override | ||
213 | public String toString() { | ||
214 | return "PA_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " " | ||
215 | + this.getActiveAntiMonotoneQueue(); | ||
216 | } | ||
217 | |||
218 | } | ||