aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java106
1 files changed, 106 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java
new file mode 100644
index 00000000..733d2585
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyParallelTimelyColumnAggregatorNode.java
@@ -0,0 +1,106 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation.timely;
10
11import java.util.Map.Entry;
12import java.util.TreeMap;
13
14import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
15import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
16import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
17import tools.refinery.viatra.runtime.matchers.util.Direction;
18import tools.refinery.viatra.runtime.rete.network.ReteContainer;
19import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
20
21/**
22 * First-only column aggregator with parallel aggregation architecture.
23 *
24 * @author Tamas Szabo
25 * @since 2.4
26 */
27public class FirstOnlyParallelTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult>
28 extends FirstOnlyTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult> {
29
30 public FirstOnlyParallelTimelyColumnAggregatorNode(final ReteContainer reteContainer,
31 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
32 final TupleMask groupMask, final TupleMask columnMask) {
33 super(reteContainer, operator, groupMask, columnMask);
34 }
35
36 /**
37 * Accumulator gets modified at the input timestamp and at all higher timestamps. Folding cannot be interrupted if
38 * the new aggregate result is the same as the old at an intermediate timestamp because aggregands need to be copied
39 * over to all accumulators at the higher timestamps.
40 */
41 @Override
42 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
43 final Tuple group = groupMask.transform(update);
44 final Tuple value = columnMask.transform(update);
45 @SuppressWarnings("unchecked")
46 final Domain aggregand = (Domain) runtimeContext.unwrapElement(value.get(0));
47 final boolean isInsertion = direction == Direction.INSERT;
48
49 final AggregateResult previousResult = getResultRaw(group, timestamp, true);
50
51 Accumulator oldAccumulator = getAccumulator(group, timestamp);
52 AggregateResult oldResult = operator.getAggregate(oldAccumulator);
53
54 Accumulator newAccumulator = operator.update(oldAccumulator, aggregand, isInsertion);
55 AggregateResult newResult = operator.getAggregate(newAccumulator);
56
57 storeIfNotNeutral(group, newAccumulator, newResult, timestamp);
58
59 propagateWithChecks(group, timestamp, previousResult, previousResult, oldResult, newResult);
60
61 AggregateResult previousOldResult = oldResult;
62 AggregateResult previousNewResult = newResult;
63 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> groupEntries = this.memory
64 .get(group);
65
66 Timestamp currentTimestamp = groupEntries == null ? null : groupEntries.higherKey(timestamp);
67
68 while (currentTimestamp != null) {
69 final CumulativeAggregate<Accumulator, AggregateResult> groupEntry = groupEntries.get(currentTimestamp);
70 oldResult = groupEntry.result;
71 oldAccumulator = groupEntry.accumulator;
72 newAccumulator = operator.update(oldAccumulator, aggregand, isInsertion);
73 newResult = operator.getAggregate(newAccumulator);
74
75 storeIfNotNeutral(group, newAccumulator, newResult, currentTimestamp);
76
77 propagateWithChecks(group, currentTimestamp, previousOldResult, previousNewResult, oldResult, newResult);
78
79 previousOldResult = oldResult;
80 previousNewResult = newResult;
81 currentTimestamp = groupEntries.higherKey(currentTimestamp);
82 }
83 }
84
85 @Override
86 protected Accumulator getAccumulator(final Tuple group, final Timestamp timestamp) {
87 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group);
88 if (entryMap == null) {
89 return operator.createNeutral();
90 } else {
91 final CumulativeAggregate<Accumulator, AggregateResult> entry = entryMap.get(timestamp);
92 if (entry == null) {
93 final Entry<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> lowerEntry = entryMap
94 .lowerEntry(timestamp);
95 if (lowerEntry == null) {
96 return operator.createNeutral();
97 } else {
98 return operator.clone(lowerEntry.getValue().accumulator);
99 }
100 } else {
101 return entry.accumulator;
102 }
103 }
104 }
105
106}