aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java474
1 files changed, 474 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java
new file mode 100644
index 00000000..2588bde1
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java
@@ -0,0 +1,474 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Map;
14import java.util.Map.Entry;
15import java.util.Objects;
16
17import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext;
18import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
19import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
20import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
21import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
22import tools.refinery.viatra.runtime.matchers.util.Clearable;
23import tools.refinery.viatra.runtime.matchers.util.Direction;
24import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
25import tools.refinery.viatra.runtime.rete.index.Indexer;
26import tools.refinery.viatra.runtime.rete.index.StandardIndexer;
27import tools.refinery.viatra.runtime.rete.network.Node;
28import tools.refinery.viatra.runtime.rete.network.Receiver;
29import tools.refinery.viatra.runtime.rete.network.ReteContainer;
30import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
31import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
32import tools.refinery.viatra.runtime.rete.single.SingleInputNode;
33
34/**
35 * Groups incoming tuples by the given mask, and aggregates values at a specific index in each group.
36 * <p>
37 * Direct children are not supported, use via outer join indexers instead.
38 * <p>
39 * There are both timeless and timely implementations.
40 *
41 * @author Tamas Szabo
42 * @since 2.2
43 *
44 */
45public abstract class AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> extends SingleInputNode
46 implements Clearable, IAggregatorNode {
47
48 /**
49 * @since 1.6
50 */
51 protected final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator;
52
53 /**
54 * @since 1.6
55 */
56 protected final TupleMask groupMask;
57
58 /**
59 * @since 1.6
60 */
61 protected final TupleMask columnMask;
62
63 /**
64 * @since 1.6
65 */
66 protected final int sourceWidth;
67
68 /**
69 * @since 1.6
70 */
71 protected final IQueryRuntimeContext runtimeContext;
72
73 protected final AggregateResult NEUTRAL;
74
75 protected AggregatorOuterIndexer aggregatorOuterIndexer;
76
77 @SuppressWarnings("rawtypes")
78 protected AbstractColumnAggregatorNode.AggregatorOuterIdentityIndexer[] aggregatorOuterIdentityIndexers;
79
80 /**
81 * Creates a new column aggregator node.
82 *
83 * @param reteContainer
84 * the RETE container of the node
85 * @param operator
86 * the aggregation operator
87 * @param deleteRederiveEvaluation
88 * true if the node should run in DRED mode, false otherwise
89 * @param groupMask
90 * the mask that masks a tuple to obtain the key that we are grouping-by
91 * @param columnMask
92 * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over
93 * @param posetComparator
94 * the poset comparator for the column, if known, otherwise it can be null
95 * @since 1.6
96 */
97 public AbstractColumnAggregatorNode(final ReteContainer reteContainer,
98 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
99 final TupleMask groupMask, final TupleMask columnMask) {
100 super(reteContainer);
101 this.operator = operator;
102 this.groupMask = groupMask;
103 this.columnMask = columnMask;
104 this.sourceWidth = groupMask.indices.length;
105 this.runtimeContext = reteContainer.getNetwork().getEngine().getRuntimeContext();
106 this.NEUTRAL = operator.getAggregate(operator.createNeutral());
107 reteContainer.registerClearable(this);
108 }
109
110 /**
111 * Creates a new column aggregator node.
112 *
113 * @param reteContainer
114 * the RETE container of the node
115 * @param operator
116 * the aggregation operator
117 * @param groupMask
118 * the mask that masks a tuple to obtain the key that we are grouping-by
119 * @param aggregatedColumn
120 * the index of the column that the aggregator node is aggregating over
121 */
122 public AbstractColumnAggregatorNode(final ReteContainer reteContainer,
123 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
124 final TupleMask groupMask, final int aggregatedColumn) {
125 this(reteContainer, operator, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth));
126 }
127
128 @Override
129 public CommunicationTracker getCommunicationTracker() {
130 return this.reteContainer.getCommunicationTracker();
131 }
132
133 @Override
134 public void pullInto(Collection<Tuple> collector, boolean flush) {
135 // DIRECT CHILDREN NOT SUPPORTED
136 throw new UnsupportedOperationException();
137 }
138
139 @Override
140 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
141 // DIRECT CHILDREN NOT SUPPORTED
142 throw new UnsupportedOperationException();
143 }
144
145 @Override
146 public void appendChild(Receiver receiver) {
147 // DIRECT CHILDREN NOT SUPPORTED
148 throw new UnsupportedOperationException();
149 }
150
151 @Override
152 public Indexer getAggregatorOuterIndexer() {
153 if (aggregatorOuterIndexer == null) {
154 aggregatorOuterIndexer = new AggregatorOuterIndexer();
155 this.getCommunicationTracker().registerDependency(this, aggregatorOuterIndexer);
156 }
157 return aggregatorOuterIndexer;
158 }
159
160 @Override
161 public Indexer getAggregatorOuterIdentityIndexer(final int resultPositionInSignature) {
162 if (aggregatorOuterIdentityIndexers == null) {
163 aggregatorOuterIdentityIndexers = new AbstractColumnAggregatorNode.AggregatorOuterIdentityIndexer[sourceWidth
164 + 1];
165 }
166 if (aggregatorOuterIdentityIndexers[resultPositionInSignature] == null) {
167 aggregatorOuterIdentityIndexers[resultPositionInSignature] = new AggregatorOuterIdentityIndexer(
168 resultPositionInSignature);
169 this.getCommunicationTracker().registerDependency(this,
170 aggregatorOuterIdentityIndexers[resultPositionInSignature]);
171 }
172 return aggregatorOuterIdentityIndexers[resultPositionInSignature];
173 }
174
175 /**
176 * @since 2.4
177 */
178 public void propagateAggregateResultUpdate(final Tuple group, final AggregateResult oldValue,
179 final AggregateResult newValue, final Timestamp timestamp) {
180 if (!Objects.equals(oldValue, newValue)) {
181 propagate(Direction.DELETE, group, oldValue, timestamp);
182 propagate(Direction.INSERT, group, newValue, timestamp);
183 }
184 }
185
186 /**
187 * @since 2.4
188 */
189 @SuppressWarnings("unchecked")
190 public void propagate(final Direction direction, final Tuple group, final AggregateResult value,
191 final Timestamp timestamp) {
192 final Tuple tuple = tupleFromAggregateResult(group, value);
193
194 if (aggregatorOuterIndexer != null) {
195 aggregatorOuterIndexer.propagate(direction, tuple, group, timestamp);
196 }
197 if (aggregatorOuterIdentityIndexers != null) {
198 for (final AggregatorOuterIdentityIndexer aggregatorOuterIdentityIndexer : aggregatorOuterIdentityIndexers) {
199 if (aggregatorOuterIdentityIndexer != null) {
200 aggregatorOuterIdentityIndexer.propagate(direction, tuple, group, timestamp);
201 }
202 }
203 }
204 }
205
206 public abstract Tuple getAggregateTuple(final Tuple key);
207
208 /**
209 * @since 2.4
210 */
211 public abstract Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple key);
212
213 public abstract AggregateResult getAggregateResult(final Tuple key);
214
215 /**
216 * @since 2.4
217 */
218 public abstract Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple key);
219
220 protected Tuple tupleFromAggregateResult(final Tuple groupTuple, final AggregateResult aggregateResult) {
221 if (aggregateResult == null) {
222 return null;
223 } else {
224 return Tuples.staticArityLeftInheritanceTupleOf(groupTuple, runtimeContext.wrapElement(aggregateResult));
225 }
226 }
227
228 /**
229 * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original
230 * signature.
231 *
232 * @author Gabor Bergmann
233 * @author Tamas Szabo
234 *
235 */
236 protected class AggregatorOuterIndexer extends StandardIndexer {
237
238 /**
239 * @since 2.4
240 */
241 protected NetworkStructureChangeSensitiveLogic logic;
242
243 public AggregatorOuterIndexer() {
244 super(AbstractColumnAggregatorNode.this.reteContainer, TupleMask.omit(sourceWidth, sourceWidth + 1));
245 this.parent = AbstractColumnAggregatorNode.this;
246 this.logic = createLogic();
247 }
248
249 @Override
250 public void networkStructureChanged() {
251 super.networkStructureChanged();
252 this.logic = createLogic();
253 }
254
255 @Override
256 public Collection<Tuple> get(final Tuple signature) {
257 return this.logic.get(signature);
258 }
259
260 @Override
261 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) {
262 return this.logic.getTimeline(signature);
263 }
264
265 /**
266 * @since 2.4
267 */
268 public void propagate(final Direction direction, final Tuple tuple, final Tuple group,
269 final Timestamp timestamp) {
270 if (tuple != null) {
271 propagate(direction, tuple, group, true, timestamp);
272 }
273 }
274
275 @Override
276 public Node getActiveNode() {
277 return AbstractColumnAggregatorNode.this;
278 }
279
280 /**
281 * @since 2.4
282 */
283 protected NetworkStructureChangeSensitiveLogic createLogic() {
284 if (this.reteContainer.isTimelyEvaluation()
285 && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) {
286 return this.TIMELY;
287 } else {
288 return this.TIMELESS;
289 }
290 }
291
292 private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() {
293
294 @Override
295 public Collection<Tuple> get(final Tuple signature) {
296 final Tuple aggregateTuple = getAggregateTuple(signature);
297 if (aggregateTuple == null) {
298 return null;
299 } else {
300 return Collections.singleton(aggregateTuple);
301 }
302 }
303
304 @Override
305 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) {
306 throw new UnsupportedOperationException();
307 }
308
309 };
310
311 private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() {
312
313 @Override
314 public Collection<Tuple> get(final Tuple signatureWithResult) {
315 return TIMELESS.get(signatureWithResult);
316 }
317
318 @Override
319 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) {
320 final Map<Tuple, Timeline<Timestamp>> aggregateTuples = getAggregateTupleTimeline(signature);
321 if (aggregateTuples.isEmpty()) {
322 return null;
323 } else {
324 return aggregateTuples;
325 }
326 }
327
328 };
329
330 }
331
332 /**
333 * A special non-iterable index that checks a suspected aggregate value for a given signature. The signature for
334 * this index is the original 'group by' masked tuple, with the suspected result inserted at position
335 * resultPositionInSignature.
336 *
337 * @author Gabor Bergmann
338 * @author Tamas Szabo
339 *
340 */
341 protected class AggregatorOuterIdentityIndexer extends StandardIndexer {
342
343 protected final int resultPositionInSignature;
344 protected final TupleMask pruneResult;
345 protected final TupleMask reorderMask;
346 /**
347 * @since 2.4
348 */
349 protected NetworkStructureChangeSensitiveLogic logic;
350
351 public AggregatorOuterIdentityIndexer(final int resultPositionInSignature) {
352 super(AbstractColumnAggregatorNode.this.reteContainer,
353 TupleMask.displace(sourceWidth, resultPositionInSignature, sourceWidth + 1));
354 this.resultPositionInSignature = resultPositionInSignature;
355 this.pruneResult = TupleMask.omit(resultPositionInSignature, sourceWidth + 1);
356 if (resultPositionInSignature == sourceWidth) {
357 this.reorderMask = null;
358 } else {
359 this.reorderMask = mask;
360 }
361 this.logic = createLogic();
362 }
363
364 @Override
365 public void networkStructureChanged() {
366 super.networkStructureChanged();
367 this.logic = createLogic();
368 }
369
370 @Override
371 public Collection<Tuple> get(final Tuple signatureWithResult) {
372 return this.logic.get(signatureWithResult);
373 }
374
375 /**
376 * @since 2.4
377 */
378 @Override
379 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) {
380 return this.logic.getTimeline(signature);
381 }
382
383 /**
384 * @since 2.4
385 */
386 public void propagate(final Direction direction, final Tuple tuple, final Tuple group,
387 final Timestamp timestamp) {
388 if (tuple != null) {
389 propagate(direction, reorder(tuple), group, true, timestamp);
390 }
391 }
392
393 private Tuple reorder(final Tuple signatureWithResult) {
394 Tuple transformed;
395 if (reorderMask == null) {
396 transformed = signatureWithResult;
397 } else {
398 transformed = reorderMask.transform(signatureWithResult);
399 }
400 return transformed;
401 }
402
403 @Override
404 public Node getActiveNode() {
405 return this.parent;
406 }
407
408 /**
409 * @since 2.4
410 */
411 protected NetworkStructureChangeSensitiveLogic createLogic() {
412 if (this.reteContainer.isTimelyEvaluation()
413 && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) {
414 return this.TIMELY;
415 } else {
416 return this.TIMELESS;
417 }
418 }
419
420 private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() {
421
422 @Override
423 public Collection<Tuple> get(final Tuple signatureWithResult) {
424 final Tuple prunedSignature = pruneResult.transform(signatureWithResult);
425 final AggregateResult result = getAggregateResult(prunedSignature);
426 if (result != null && Objects.equals(signatureWithResult.get(resultPositionInSignature), result)) {
427 return Collections.singleton(signatureWithResult);
428 } else {
429 return null;
430 }
431 }
432
433 @Override
434 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) {
435 throw new UnsupportedOperationException();
436 }
437
438 };
439
440 private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() {
441
442 @Override
443 public Collection<Tuple> get(final Tuple signatureWithResult) {
444 return TIMELESS.get(signatureWithResult);
445 }
446
447 @Override
448 public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signatureWithResult) {
449 final Tuple prunedSignature = pruneResult.transform(signatureWithResult);
450 final Map<AggregateResult, Timeline<Timestamp>> result = getAggregateResultTimeline(prunedSignature);
451 for (final Entry<AggregateResult, Timeline<Timestamp>> entry : result.entrySet()) {
452 if (Objects.equals(signatureWithResult.get(resultPositionInSignature), entry.getKey())) {
453 return Collections.singletonMap(signatureWithResult, entry.getValue());
454 }
455 }
456 return null;
457 }
458
459 };
460
461 }
462
463 /**
464 * @since 2.4
465 */
466 protected static abstract class NetworkStructureChangeSensitiveLogic {
467
468 public abstract Collection<Tuple> get(final Tuple signatureWithResult);
469
470 public abstract Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature);
471
472 }
473
474}