aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java212
1 files changed, 212 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java
new file mode 100644
index 00000000..19e02f10
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulParallelTimelyColumnAggregatorNode.java
@@ -0,0 +1,212 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation.timely;
10
11import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
12import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
13import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
14import tools.refinery.viatra.runtime.matchers.util.*;
15import tools.refinery.viatra.runtime.matchers.util.timeline.Diff;
16import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulParallelTimelyColumnAggregatorNode.CumulativeAggregate;
17import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulParallelTimelyColumnAggregatorNode.FoldingState;
18import tools.refinery.viatra.runtime.rete.network.ReteContainer;
19import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
20import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
21
22import java.util.Collections;
23import java.util.Map;
24import java.util.Map.Entry;
25import java.util.Objects;
26import java.util.TreeMap;
27
28/**
29 * Faithful column aggregator with parallel aggregation architecture.
30 *
31 * @author Tamas Szabo
32 * @since 2.4
33 *
34 */
35public class FaithfulParallelTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult> extends
36 FaithfulTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult, CumulativeAggregate<Domain, Accumulator>, FoldingState<Domain>>
37 implements ResumableNode {
38
39 public FaithfulParallelTimelyColumnAggregatorNode(final ReteContainer reteContainer,
40 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
41 final TupleMask groupMask, final TupleMask columnMask) {
42 super(reteContainer, operator, groupMask, columnMask);
43 }
44
45 public FaithfulParallelTimelyColumnAggregatorNode(final ReteContainer reteContainer,
46 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
47 final TupleMask groupMask, final int aggregatedColumn) {
48 this(reteContainer, operator, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth));
49 }
50
51 @Override
52 protected Map<AggregateResult, Diff<Timestamp>> doFoldingStep(final Tuple group, final FoldingState<Domain> state,
53 final Timestamp timestamp) {
54 final CumulativeAggregate<Domain, Accumulator> aggregate = getAggregate(group, timestamp);
55 if (state.delta.isEmpty()) {
56 gcAggregates(aggregate, group, timestamp);
57 return Collections.emptyMap();
58 } else {
59 final Map<AggregateResult, Diff<Timestamp>> diffMap = CollectionsFactory.createMap();
60 final Timestamp nextTimestamp = this.aggregates.get(group).higherKey(timestamp);
61
62 final AggregateResult currentOldResult = operator.getAggregate(aggregate.accumulator);
63
64 for (final Entry<Domain, Integer> entry : state.delta.entriesWithMultiplicities()) {
65 final boolean isInsertion = entry.getValue() > 0;
66 final Domain aggregand = entry.getKey();
67 for (int i = 0; i < Math.abs(entry.getValue()); i++) {
68 aggregate.accumulator = operator.update(aggregate.accumulator, aggregand, isInsertion);
69 }
70 }
71
72 final AggregateResult currentNewResult = operator.getAggregate(aggregate.accumulator);
73
74 if (!Objects.equals(currentOldResult, currentNewResult)) {
75 // current old result disappears here
76 appendDiff(currentOldResult, new Signed<>(Direction.DELETE, timestamp), diffMap);
77 if (nextTimestamp != null) {
78 appendDiff(currentOldResult, new Signed<>(Direction.INSERT, nextTimestamp), diffMap);
79 }
80
81 // current new result appears here
82 appendDiff(currentNewResult, new Signed<>(Direction.INSERT, timestamp), diffMap);
83 if (nextTimestamp != null) {
84 appendDiff(currentNewResult, new Signed<>(Direction.DELETE, nextTimestamp), diffMap);
85 }
86 }
87
88 gcAggregates(aggregate, group, timestamp);
89 updateTimeline(group, diffMap);
90
91 // prepare folding state for next timestamp
92 if (nextTimestamp != null) {
93 final FoldingState<Domain> newState = new FoldingState<>();
94 newState.delta = state.delta;
95 addFoldingState(group, newState, nextTimestamp);
96 }
97
98 return diffMap;
99 }
100 }
101
102 @Override
103 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
104 final Tuple group = groupMask.transform(update);
105 final Tuple value = columnMask.transform(update);
106 @SuppressWarnings("unchecked")
107 final Domain aggregand = (Domain) runtimeContext.unwrapElement(value.get(0));
108 final boolean isInsertion = direction == Direction.INSERT;
109
110 final CumulativeAggregate<Domain, Accumulator> aggregate = getAggregate(group, timestamp);
111 final FoldingState<Domain> state = new FoldingState<>();
112 if (isInsertion) {
113 aggregate.aggregands.addOne(aggregand);
114 state.delta.addOne(aggregand);
115 } else {
116 aggregate.aggregands.removeOne(aggregand);
117 state.delta.removeOne(aggregand);
118 }
119
120 addFoldingState(group, state, timestamp);
121 }
122
123 /**
124 * Garbage collects the counter of the given group and timestamp if the bag of aggregands is empty.
125 */
126 @Override
127 protected void gcAggregates(final CumulativeAggregate<Domain, Accumulator> aggregate, final Tuple group,
128 final Timestamp timestamp) {
129 if (aggregate.aggregands.isEmpty()) {
130 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator>> groupAggregates = this.aggregates
131 .get(group);
132 groupAggregates.remove(timestamp);
133 if (groupAggregates.isEmpty()) {
134 this.aggregates.remove(group);
135 }
136 }
137 }
138
139 /**
140 * On-demand initializes and returns the aggregate for the given group and timestamp.
141 */
142 @Override
143 protected CumulativeAggregate<Domain, Accumulator> getAggregate(final Tuple group, final Timestamp timestamp) {
144 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator>> groupAggregates = this.aggregates
145 .computeIfAbsent(group, k -> CollectionsFactory.createTreeMap());
146 return groupAggregates.computeIfAbsent(timestamp, k -> {
147 final CumulativeAggregate<Domain, Accumulator> aggregate = new CumulativeAggregate<>();
148 final Entry<Timestamp, CumulativeAggregate<Domain, Accumulator>> lowerEntry = groupAggregates
149 .lowerEntry(timestamp);
150 if (lowerEntry == null) {
151 aggregate.accumulator = operator.createNeutral();
152 } else {
153 aggregate.accumulator = operator.clone(lowerEntry.getValue().accumulator);
154 }
155 return aggregate;
156 });
157 }
158
159 @Override
160 public AggregateResult getAggregateResult(final Tuple group) {
161 final TreeMap<Timestamp, CumulativeAggregate<Domain, Accumulator>> groupAggregates = this.aggregates.get(group);
162 if (groupAggregates != null) {
163 final Entry<Timestamp, CumulativeAggregate<Domain, Accumulator>> lastEntry = groupAggregates.lastEntry();
164 return operator.getAggregate(lastEntry.getValue().accumulator);
165 } else {
166 return NEUTRAL;
167 }
168 }
169
170 protected static class CumulativeAggregate<Domain, Accumulator> {
171 protected Accumulator accumulator;
172 protected IDeltaBag<Domain> aggregands;
173
174 protected CumulativeAggregate() {
175 this.aggregands = CollectionsFactory.createDeltaBag();
176 }
177
178 @Override
179 public String toString() {
180 return "accumulator=" + accumulator + " aggregands=" + aggregands;
181 }
182 }
183
184 protected static class FoldingState<Domain> implements MergeableFoldingState<FoldingState<Domain>> {
185 protected IDeltaBag<Domain> delta;
186
187 protected FoldingState() {
188 this.delta = CollectionsFactory.createDeltaBag();
189 }
190
191 @Override
192 public String toString() {
193 return "delta=" + delta;
194 }
195
196 /**
197 * The returned result will never be null, even if the resulting delta set is empty.
198 */
199 @Override
200 public FoldingState<Domain> merge(final FoldingState<Domain> that) {
201 Preconditions.checkArgument(that != null);
202 // 'this' was the previously registered folding state
203 // 'that' is the new folding state being pushed upwards
204 final FoldingState<Domain> result = new FoldingState<>();
205 this.delta.forEachEntryWithMultiplicities((d, m) -> result.delta.addSigned(d, m));
206 that.delta.forEachEntryWithMultiplicities((d, m) -> result.delta.addSigned(d, m));
207 return result;
208 }
209
210 }
211
212}