aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java279
1 files changed, 279 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java
new file mode 100644
index 00000000..cf2c2b2d
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulSequentialTimelyColumnAggregatorNode.java
@@ -0,0 +1,279 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation.timely;
10
11import java.util.Collections;
12import java.util.Map;
13import java.util.Map.Entry;
14import java.util.Objects;
15import java.util.TreeMap;
16
17import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
18import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
19import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
20import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
21import tools.refinery.viatra.runtime.matchers.util.Direction;
22import tools.refinery.viatra.runtime.matchers.util.IDeltaBag;
23import tools.refinery.viatra.runtime.matchers.util.Preconditions;
24import tools.refinery.viatra.runtime.matchers.util.Signed;
25import tools.refinery.viatra.runtime.matchers.util.timeline.Diff;
26import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulSequentialTimelyColumnAggregatorNode.CumulativeAggregate;
27import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulSequentialTimelyColumnAggregatorNode.FoldingState;
28import tools.refinery.viatra.runtime.rete.network.ReteContainer;
29import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
30import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
31
32/**
33 * Faithful column aggregator with sequential aggregation architecture.
34 *
35 * @author Tamas Szabo
36 * @since 2.4
37 *
38 */
39public class FaithfulSequentialTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult> extends
40 FaithfulTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult, CumulativeAggregate<Domain, Accumulator, AggregateResult>, FoldingState<Domain, AggregateResult>>
41 implements ResumableNode {
42
43 protected boolean isRecursiveAggregation;
44
45 public FaithfulSequentialTimelyColumnAggregatorNode(final ReteContainer reteContainer,
46 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
47 final TupleMask groupMask, final TupleMask columnMask) {
48 super(reteContainer, operator, groupMask, columnMask);
49 this.isRecursiveAggregation = false;
50 }
51
52 @Override
53 public void networkStructureChanged() {
54 super.networkStructureChanged();
55 this.isRecursiveAggregation = this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this);
56 }
57
58 @Override
59 protected Map<AggregateResult, Diff<Timestamp>> doFoldingStep(final Tuple group,
60 final FoldingState<Domain, AggregateResult> state, final Timestamp timestamp) {
61 final CumulativeAggregate<Domain, Accumulator, AggregateResult> aggregate = getAggregate(group, timestamp);
62 if (state.delta.isEmpty() && Objects.equals(state.oldResult, state.newResult)) {
63 gcAggregates(aggregate, group, timestamp);
64 return Collections.emptyMap();
65 } else {
66 final Map<AggregateResult, Diff<Timestamp>> diffMap = CollectionsFactory.createMap();
67 final Timestamp nextTimestamp = this.aggregates.get(group).higherKey(timestamp);
68
69 final AggregateResult previousOldResult = state.oldResult;
70 final AggregateResult previousNewResult = state.newResult;
71
72 final AggregateResult currentOldResult = previousOldResult == null
73 ? operator.getAggregate(aggregate.positive)
74 : operator.combine(previousOldResult, aggregate.positive);
75
76 for (final Entry<Domain, Integer> entry : state.delta.entriesWithMultiplicities()) {
77 final boolean isInsertion = entry.getValue() > 0;
78 final Domain aggregand = entry.getKey();
79 if (isInsertion) {
80 for (int i = 0; i < entry.getValue(); i++) {
81 if (isRecursiveAggregation) {
82 final boolean contains = aggregate.negative.containsNonZero(aggregand);
83 if (contains) {
84 aggregate.negative.addOne(aggregand);
85 } else {
86 aggregate.positive = operator.update(aggregate.positive, aggregand, true);
87 }
88 } else {
89 aggregate.positive = operator.update(aggregate.positive, aggregand, true);
90 }
91 }
92 } else {
93 for (int i = 0; i < -entry.getValue(); i++) {
94 if (isRecursiveAggregation) {
95 final boolean contains = operator.contains(aggregand, aggregate.positive);
96 if (contains) {
97 aggregate.positive = operator.update(aggregate.positive, aggregand, false);
98 } else {
99 aggregate.negative.removeOne(aggregand);
100 }
101 } else {
102 aggregate.positive = operator.update(aggregate.positive, aggregand, false);
103 }
104 }
105 }
106 }
107
108 final AggregateResult currentNewResult = previousNewResult == null
109 ? operator.getAggregate(aggregate.positive)
110 : operator.combine(previousNewResult, aggregate.positive);
111
112 aggregate.cachedResult = currentNewResult;
113
114 final boolean sameResult = Objects.equals(currentOldResult, currentNewResult);
115 if (!sameResult) {
116 // current old result disappears here
117 appendDiff(currentOldResult, new Signed<>(Direction.DELETE, timestamp), diffMap);
118 if (nextTimestamp != null) {
119 appendDiff(currentOldResult, new Signed<>(Direction.INSERT, nextTimestamp), diffMap);
120 }
121
122 // current new result appears here
123 appendDiff(currentNewResult, new Signed<>(Direction.INSERT, timestamp), diffMap);
124 if (nextTimestamp != null) {
125 appendDiff(currentNewResult, new Signed<>(Direction.DELETE, nextTimestamp), diffMap);
126 }
127 }
128
129 gcAggregates(aggregate, group, timestamp);
130 updateTimeline(group, diffMap);
131
132 // prepare folding state for next timestamp
133 if (nextTimestamp != null && !sameResult) {
134 final FoldingState<Domain, AggregateResult> newState = new FoldingState<>();
135 // DO NOT push forward the delta in the folding state!!! that one only affects the input timestamp
136 newState.oldResult = currentOldResult;
137 newState.newResult = currentNewResult;
138 addFoldingState(group, newState, nextTimestamp);
139 }
140
141 return diffMap;
142 }
143 }
144
145 @Override
146 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
147 final Tuple group = groupMask.transform(update);
148 final Tuple value = columnMask.transform(update);
149 @SuppressWarnings("unchecked")
150 final Domain aggregand = (Domain) runtimeContext.unwrapElement(value.get(0));
151 final boolean isInsertion = direction == Direction.INSERT;
152
153 final AggregateResult previousResult = getResultRaw(group, timestamp, true);
154 final FoldingState<Domain, AggregateResult> state = new FoldingState<Domain, AggregateResult>();
155 if (isInsertion) {
156 state.delta.addOne(aggregand);
157 } else {
158 state.delta.removeOne(aggregand);
159 }
160 state.oldResult = previousResult;
161 state.newResult = previousResult;
162
163 // it is acceptable if both oldResult and newResult are null at this point
164 // in that case we did not have a previous entry at a lower timestamp
165
166 addFoldingState(group, state, timestamp);
167 }
168
169 protected AggregateResult getResultRaw(final Tuple group, final Timestamp timestamp, final boolean lower) {
170 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> entryMap = this.aggregates
171 .get(group);
172 if (entryMap == null) {
173 return null;
174 } else {
175 CumulativeAggregate<Domain, Accumulator, AggregateResult> aggregate = null;
176 if (lower) {
177 final Entry<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> lowerEntry = entryMap
178 .lowerEntry(timestamp);
179 if (lowerEntry != null) {
180 aggregate = lowerEntry.getValue();
181 }
182 } else {
183 aggregate = entryMap.get(timestamp);
184 }
185 if (aggregate == null) {
186 return null;
187 } else {
188 return aggregate.cachedResult;
189 }
190 }
191 }
192
193 @Override
194 protected void gcAggregates(final CumulativeAggregate<Domain, Accumulator, AggregateResult> aggregate,
195 final Tuple group, final Timestamp timestamp) {
196 if (operator.isNeutral(aggregate.positive) && aggregate.negative.isEmpty()) {
197 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> groupAggregates = this.aggregates
198 .get(group);
199 groupAggregates.remove(timestamp);
200 if (groupAggregates.isEmpty()) {
201 this.aggregates.remove(group);
202 }
203 }
204 }
205
206 @Override
207 protected CumulativeAggregate<Domain, Accumulator, AggregateResult> getAggregate(final Tuple group,
208 final Timestamp timestamp) {
209 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> groupAggregates = this.aggregates
210 .computeIfAbsent(group, k -> CollectionsFactory.createTreeMap());
211 return groupAggregates.computeIfAbsent(timestamp, k -> {
212 final CumulativeAggregate<Domain, Accumulator, AggregateResult> aggregate = new CumulativeAggregate<>();
213 aggregate.positive = operator.createNeutral();
214 return aggregate;
215 });
216 }
217
218 @Override
219 public AggregateResult getAggregateResult(final Tuple group) {
220 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> groupAggregates = this.aggregates
221 .get(group);
222 if (groupAggregates != null) {
223 final Entry<Timestamp, CumulativeAggregate<Domain, Accumulator, AggregateResult>> lastEntry = groupAggregates
224 .lastEntry();
225 return lastEntry.getValue().cachedResult;
226 } else {
227 return NEUTRAL;
228 }
229 }
230
231 protected static class CumulativeAggregate<Domain, Accumulator, AggregateResult> {
232 protected Accumulator positive;
233 protected IDeltaBag<Domain> negative;
234 protected AggregateResult cachedResult;
235
236 protected CumulativeAggregate() {
237 this.negative = CollectionsFactory.createDeltaBag();
238 }
239
240 @Override
241 public String toString() {
242 return "positive=" + positive + " negative=" + negative + " cachedResult=" + cachedResult;
243 }
244 }
245
246 protected static class FoldingState<Domain, AggregateResult>
247 implements MergeableFoldingState<FoldingState<Domain, AggregateResult>> {
248 protected IDeltaBag<Domain> delta;
249 protected AggregateResult oldResult;
250 protected AggregateResult newResult;
251
252 protected FoldingState() {
253 this.delta = CollectionsFactory.createDeltaBag();
254 }
255
256 @Override
257 public String toString() {
258 return "delta=" + delta + " oldResult=" + oldResult + " newResult=" + newResult;
259 }
260
261 /**
262 * The returned result will never be null, even if the resulting delta set is empty.
263 */
264 @Override
265 public FoldingState<Domain, AggregateResult> merge(final FoldingState<Domain, AggregateResult> that) {
266 Preconditions.checkArgument(that != null);
267 // 'this' was the previously registered folding state
268 // 'that' is the new folding state being pushed upwards
269 final FoldingState<Domain, AggregateResult> result = new FoldingState<Domain, AggregateResult>();
270 this.delta.forEachEntryWithMultiplicities((d, m) -> result.delta.addSigned(d, m));
271 that.delta.forEachEntryWithMultiplicities((d, m) -> result.delta.addSigned(d, m));
272 result.oldResult = this.oldResult;
273 result.newResult = that.newResult;
274 return result;
275 }
276
277 }
278
279}