aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java369
1 files changed, 369 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java
new file mode 100644
index 00000000..4480aed8
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java
@@ -0,0 +1,369 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation;
10
11import java.util.Map;
12import java.util.Map.Entry;
13
14import tools.refinery.viatra.runtime.matchers.context.IPosetComparator;
15import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
18import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
19import tools.refinery.viatra.runtime.matchers.util.Direction;
20import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
21import tools.refinery.viatra.runtime.rete.network.PosetAwareReceiver;
22import tools.refinery.viatra.runtime.rete.network.RederivableNode;
23import tools.refinery.viatra.runtime.rete.network.ReteContainer;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
25import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
26import tools.refinery.viatra.runtime.rete.network.communication.timeless.RecursiveCommunicationGroup;
27import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
28import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
29import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.PosetAwareMailbox;
30
31/**
32 * Timeless implementation of the column aggregator node.
33 * <p>
34 * The node is capable of operating in the delete and re-derive mode. In this mode, it is also possible to equip the
35 * node with an {@link IPosetComparator} to identify monotone changes; thus, ensuring that a fix-point can be reached
36 * during the evaluation.
37 *
38 * @author Gabor Bergmann
39 * @author Tamas Szabo
40 * @since 1.4
41 */
42public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult>
43 extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult>
44 implements RederivableNode, PosetAwareReceiver {
45
46 /**
47 * @since 1.6
48 */
49 protected final IPosetComparator posetComparator;
50
51 /**
52 * @since 1.6
53 */
54 protected final boolean deleteRederiveEvaluation;
55
56 // invariant: neutral values are not stored
57 /**
58 * @since 1.6
59 */
60 protected final Map<Tuple, Accumulator> memory;
61 /**
62 * @since 1.6
63 */
64 protected final Map<Tuple, Accumulator> rederivableMemory;
65
66 /**
67 * @since 1.7
68 */
69 protected CommunicationGroup currentGroup;
70
71 /**
72 * Creates a new column aggregator node.
73 *
74 * @param reteContainer
75 * the RETE container of the node
76 * @param operator
77 * the aggregation operator
78 * @param deleteRederiveEvaluation
79 * true if the node should run in DRED mode, false otherwise
80 * @param groupMask
81 * the mask that masks a tuple to obtain the key that we are grouping-by
82 * @param columnMask
83 * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over
84 * @param posetComparator
85 * the poset comparator for the column, if known, otherwise it can be null
86 * @since 1.6
87 */
88 public ColumnAggregatorNode(final ReteContainer reteContainer,
89 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
90 final boolean deleteRederiveEvaluation, final TupleMask groupMask, final TupleMask columnMask,
91 final IPosetComparator posetComparator) {
92 super(reteContainer, operator, groupMask, columnMask);
93 this.memory = CollectionsFactory.createMap();
94 this.rederivableMemory = CollectionsFactory.createMap();
95 this.deleteRederiveEvaluation = deleteRederiveEvaluation;
96 this.posetComparator = posetComparator;
97 // mailbox MUST be instantiated after the fields are all set
98 this.mailbox = instantiateMailbox();
99 }
100
101 /**
102 * Creates a new column aggregator node.
103 *
104 * @param reteContainer
105 * the RETE container of the node
106 * @param operator
107 * the aggregation operator
108 * @param groupMask
109 * the mask that masks a tuple to obtain the key that we are grouping-by
110 * @param aggregatedColumn
111 * the index of the column that the aggregator node is aggregating over
112 */
113 public ColumnAggregatorNode(final ReteContainer reteContainer,
114 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
115 final TupleMask groupMask, final int aggregatedColumn) {
116 this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth),
117 null);
118 }
119
120 @Override
121 public boolean isInDRedMode() {
122 return this.deleteRederiveEvaluation;
123 }
124
125 @Override
126 protected Mailbox instantiateMailbox() {
127 if (groupMask != null && columnMask != null && posetComparator != null) {
128 return new PosetAwareMailbox(this, this.reteContainer);
129 } else {
130 return new BehaviorChangingMailbox(this, this.reteContainer);
131 }
132 }
133
134 @Override
135 public TupleMask getCoreMask() {
136 return groupMask;
137 }
138
139 @Override
140 public TupleMask getPosetMask() {
141 return columnMask;
142 }
143
144 @Override
145 public IPosetComparator getPosetComparator() {
146 return posetComparator;
147 }
148
149 @Override
150 public void rederiveOne() {
151 final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next();
152 final Tuple group = entry.getKey();
153 final Accumulator accumulator = entry.getValue();
154 rederivableMemory.remove(group);
155 memory.put(group, accumulator);
156 // unregister the node if there is nothing left to be re-derived
157 if (this.rederivableMemory.isEmpty()) {
158 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
159 }
160 final AggregateResult value = operator.getAggregate(accumulator);
161 propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO);
162 }
163
164 @Override
165 public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) {
166 if (this.deleteRederiveEvaluation) {
167 updateWithDeleteAndRederive(direction, update, monotone);
168 } else {
169 updateDefault(direction, update, Timestamp.ZERO);
170 }
171 }
172
173 @Override
174 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
175 updateWithPosetInfo(direction, update, false);
176 }
177
178 /**
179 * @since 2.4
180 */
181 protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) {
182 final Tuple key = groupMask.transform(update);
183 final Tuple value = columnMask.transform(update);
184 @SuppressWarnings("unchecked")
185 final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0));
186 final boolean isInsertion = direction == Direction.INSERT;
187
188 final Accumulator oldMainAccumulator = getMainAccumulator(key);
189 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
190
191 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion);
192 storeIfNotNeutral(key, newMainAccumulator, memory);
193 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
194
195 propagateAggregateResultUpdate(key, oldValue, newValue, timestamp);
196 }
197
198 /**
199 * @since 2.4
200 */
201 protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) {
202 final Tuple group = groupMask.transform(update);
203 final Tuple value = columnMask.transform(update);
204 @SuppressWarnings("unchecked")
205 final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0));
206 final boolean isInsertion = direction == Direction.INSERT;
207
208 Accumulator oldMainAccumulator = memory.get(group);
209 Accumulator oldRederivableAccumulator = rederivableMemory.get(group);
210
211 if (direction == Direction.INSERT) {
212 // INSERT
213 if (oldRederivableAccumulator != null) {
214 // the group is in the re-derivable memory
215 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator,
216 aggregableValue, isInsertion);
217 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory);
218 if (rederivableMemory.isEmpty()) {
219 // there is nothing left to be re-derived
220 // this can happen if the accumulator became neutral in response to the INSERT
221 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
222 }
223 } else {
224 // the group is in the main memory
225 // at this point, it can happen that we need to initialize with a neutral accumulator
226 if (oldMainAccumulator == null) {
227 oldMainAccumulator = operator.createNeutral();
228 }
229
230 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
231 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue,
232 isInsertion);
233 storeIfNotNeutral(group, newMainAccumulator, memory);
234 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
235 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO);
236 }
237 } else {
238 // DELETE
239 if (oldRederivableAccumulator != null) {
240 // the group is in the re-derivable memory
241 if (oldMainAccumulator != null) {
242 issueError("[INTERNAL ERROR] Inconsistent state for " + update
243 + " because it is present both in the main and re-derivable memory in the ColumnAggregatorNode "
244 + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null);
245 }
246 try {
247 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator,
248 aggregableValue, isInsertion);
249 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory);
250 if (rederivableMemory.isEmpty()) {
251 // there is nothing left to be re-derived
252 // this can happen if the accumulator became neutral in response to the DELETE
253 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
254 }
255 } catch (final NullPointerException ex) {
256 issueError("[INTERNAL ERROR] Deleting a domain element in " + update
257 + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) "
258 + getTraceInfoPatternsEnumerated(), ex);
259 }
260 } else {
261 // the group is in the main memory
262 // at this point, it can happen that we need to initialize with a neutral accumulator
263 if (oldMainAccumulator == null) {
264 oldMainAccumulator = operator.createNeutral();
265 }
266
267 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
268 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue,
269 isInsertion);
270 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
271
272 if (monotone) {
273 storeIfNotNeutral(group, newMainAccumulator, memory);
274 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO);
275 } else {
276 final boolean wasEmpty = rederivableMemory.isEmpty();
277 if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) {
278 ((RecursiveCommunicationGroup) currentGroup).addRederivable(this);
279 }
280 memory.remove(group);
281 propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO);
282 }
283 }
284 }
285 }
286
287 @Override
288 public void clear() {
289 this.memory.clear();
290 this.rederivableMemory.clear();
291 this.childMailboxes.clear();
292 }
293
294 /**
295 * Returns true if the accumulator was stored, false otherwise.
296 *
297 * @since 1.6
298 */
299 protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator,
300 final Map<Tuple, Accumulator> memory) {
301 if (operator.isNeutral(accumulator)) {
302 memory.remove(key);
303 return false;
304 } else {
305 memory.put(key, accumulator);
306 return true;
307 }
308 }
309
310 @Override
311 public Tuple getAggregateTuple(final Tuple group) {
312 final Accumulator accumulator = getMainAccumulator(group);
313 final AggregateResult result = operator.getAggregate(accumulator);
314 return tupleFromAggregateResult(group, result);
315 }
316
317 @Override
318 public AggregateResult getAggregateResult(final Tuple group) {
319 final Accumulator accumulator = getMainAccumulator(group);
320 return operator.getAggregate(accumulator);
321 }
322
323 @Override
324 public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) {
325 throw new UnsupportedOperationException();
326 }
327
328 @Override
329 public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) {
330 throw new UnsupportedOperationException();
331 }
332
333 /**
334 * @since 1.6
335 */
336 protected Accumulator getMainAccumulator(final Tuple key) {
337 return getAccumulator(key, memory);
338 }
339
340 /**
341 * @since 1.6
342 */
343 protected Accumulator getRederivableAccumulator(final Tuple key) {
344 return getAccumulator(key, rederivableMemory);
345 }
346
347 /**
348 * @since 1.6
349 */
350 protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) {
351 Accumulator accumulator = memory.get(key);
352 if (accumulator == null) {
353 return operator.createNeutral();
354 } else {
355 return accumulator;
356 }
357 }
358
359 @Override
360 public CommunicationGroup getCurrentGroup() {
361 return currentGroup;
362 }
363
364 @Override
365 public void setCurrentGroup(final CommunicationGroup currentGroup) {
366 this.currentGroup = currentGroup;
367 }
368
369}