aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java247
1 files changed, 247 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java
new file mode 100644
index 00000000..8fe9a4e9
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java
@@ -0,0 +1,247 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation.timely;
10
11import java.util.Collections;
12import java.util.Map;
13import java.util.Map.Entry;
14import java.util.Objects;
15import java.util.TreeMap;
16
17import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
18import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
19import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
20import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
21import tools.refinery.viatra.runtime.matchers.util.Signed;
22import tools.refinery.viatra.runtime.matchers.util.timeline.Diff;
23import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
24import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines;
25import tools.refinery.viatra.runtime.rete.aggregation.AbstractColumnAggregatorNode;
26import tools.refinery.viatra.runtime.rete.aggregation.GroupedMap;
27import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulTimelyColumnAggregatorNode.MergeableFoldingState;
28import tools.refinery.viatra.runtime.rete.network.ReteContainer;
29import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
30import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
31import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
32import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
33import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;
34
35/**
36 * Faithful timely implementation of the column aggregator node. Complete timelines (series of appearance &
37 * disappearance) are maintained for tuples. <br>
38 * <br>
39 * Subclasses are responsible for implementing the aggregator architecture, and they must use the CumulativeAggregate
40 * type parameter for that. <br>
41 * <br>
42 * This node supports recursive aggregation.
43 *
44 * @author Tamas Szabo
45 * @since 2.4
46 */
47public abstract class FaithfulTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult, CumulativeAggregate, FoldingState extends MergeableFoldingState<FoldingState>>
48 extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> implements ResumableNode {
49
50 protected final Map<Tuple, TreeMap<Timestamp, CumulativeAggregate>> aggregates;
51 protected final Map<Tuple, Map<AggregateResult, Timeline<Timestamp>>> timelines;
52 protected final TreeMap<Timestamp, Map<Tuple, FoldingState>> foldingState;
53 protected CommunicationGroup communicationGroup;
54
55 public FaithfulTimelyColumnAggregatorNode(final ReteContainer reteContainer,
56 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
57 final TupleMask groupMask, final TupleMask columnMask) {
58 super(reteContainer, operator, groupMask, columnMask);
59 this.aggregates = CollectionsFactory.createMap();
60 this.timelines = CollectionsFactory.createMap();
61 this.foldingState = CollectionsFactory.createTreeMap();
62 // mailbox MUST be instantiated after the fields are all set
63 this.mailbox = instantiateMailbox();
64 }
65
66 @Override
67 protected Mailbox instantiateMailbox() {
68 return new TimelyMailbox(this, this.reteContainer);
69 }
70
71 @Override
72 public void clear() {
73 this.mailbox.clear();
74 this.aggregates.clear();
75 this.timelines.clear();
76 this.children.clear();
77 this.childMailboxes.clear();
78 this.foldingState.clear();
79 }
80
81 /**
82 * Registers the given folding state for the specified timestamp and tuple. If there is already a state stored, the
83 * two states will be merged together.
84 *
85 *
86 */
87 protected void addFoldingState(final Tuple group, final FoldingState state, final Timestamp timestamp) {
88 // assert !state.delta.isEmpty();
89 final Map<Tuple, FoldingState> tupleMap = this.foldingState.computeIfAbsent(timestamp,
90 k -> CollectionsFactory.createMap());
91 tupleMap.compute(group, (k, v) -> {
92 return v == null ? state : v.merge(state);
93 });
94 }
95
96 @Override
97 public Timestamp getResumableTimestamp() {
98 if (this.foldingState.isEmpty()) {
99 return null;
100 } else {
101 return this.foldingState.firstKey();
102 }
103 }
104
105 @Override
106 public void resumeAt(final Timestamp timestamp) {
107 Timestamp current = this.getResumableTimestamp();
108 if (current == null) {
109 throw new IllegalStateException("There is nothing to fold!");
110 } else if (current.compareTo(timestamp) != 0) {
111 throw new IllegalStateException("Expected to continue folding at " + timestamp + "!");
112 }
113
114 final Map<Tuple, FoldingState> tupleMap = this.foldingState.remove(timestamp);
115 for (final Entry<Tuple, FoldingState> groupEntry : tupleMap.entrySet()) {
116 final Tuple group = groupEntry.getKey();
117 final FoldingState value = groupEntry.getValue();
118 final Map<AggregateResult, Diff<Timestamp>> diffMap = doFoldingStep(group, value, timestamp);
119 for (final Entry<AggregateResult, Diff<Timestamp>> resultEntry : diffMap.entrySet()) {
120 for (final Signed<Timestamp> signed : resultEntry.getValue()) {
121 propagate(signed.getDirection(), group, resultEntry.getKey(), signed.getPayload());
122 }
123 }
124 }
125
126 final Timestamp nextTimestamp = this.getResumableTimestamp();
127 if (Objects.equals(timestamp, nextTimestamp)) {
128 throw new IllegalStateException(
129 "Folding at " + timestamp + " produced more folding work at the same timestamp!");
130 } else if (nextTimestamp != null) {
131 this.communicationGroup.notifyHasMessage(this.mailbox, nextTimestamp);
132 }
133 }
134
135 protected abstract Map<AggregateResult, Diff<Timestamp>> doFoldingStep(final Tuple group, final FoldingState state,
136 final Timestamp timestamp);
137
138 /**
139 * Updates and garbage collects the timeline of the given tuple based on the given diffs.
140 */
141 protected void updateTimeline(final Tuple group, final Map<AggregateResult, Diff<Timestamp>> diffs) {
142 if (!diffs.isEmpty()) {
143 this.timelines.compute(group, (k, resultTimelines) -> {
144 if (resultTimelines == null) {
145 resultTimelines = CollectionsFactory.createMap();
146 }
147 for (final Entry<AggregateResult, Diff<Timestamp>> entry : diffs.entrySet()) {
148 final AggregateResult result = entry.getKey();
149 resultTimelines.compute(result, (k2, oldResultTimeline) -> {
150 final Diff<Timestamp> currentResultDiffs = entry.getValue();
151 if (oldResultTimeline == null) {
152 oldResultTimeline = getInitialTimeline(result);
153 }
154 final Timeline<Timestamp> timeline = oldResultTimeline.mergeAdditive(currentResultDiffs);
155 if (timeline.isEmpty()) {
156 return null;
157 } else {
158 return timeline;
159 }
160 });
161 }
162 if (resultTimelines.isEmpty()) {
163 return null;
164 } else {
165 return resultTimelines;
166 }
167 });
168 }
169 }
170
171 /**
172 * Garbage collects the counter of the given group and timestamp if the bag of aggregands is empty.
173 */
174 protected abstract void gcAggregates(final CumulativeAggregate aggregate, final Tuple group,
175 final Timestamp timestamp);
176
177 /**
178 * On-demand initializes and returns the aggregate for the given group and timestamp.
179 */
180 protected abstract CumulativeAggregate getAggregate(final Tuple group, final Timestamp timestamp);
181
182 protected static final Timeline<Timestamp> NEUTRAL_INITIAL_TIMELINE = Timestamp.INSERT_AT_ZERO_TIMELINE;
183 protected static final Timeline<Timestamp> NON_NEUTRAL_INITIAL_TIMELINE = Timelines.createEmpty();
184
185 protected Timeline<Timestamp> getInitialTimeline(final AggregateResult result) {
186 if (NEUTRAL == result) {
187 return NEUTRAL_INITIAL_TIMELINE;
188 } else {
189 return NON_NEUTRAL_INITIAL_TIMELINE;
190 }
191 }
192
193 protected static <AggregateResult> void appendDiff(final AggregateResult result, final Signed<Timestamp> diff,
194 final Map<AggregateResult, Diff<Timestamp>> diffs) {
195 if (result != null) {
196 diffs.compute(result, (k, timeLineDiff) -> {
197 if (timeLineDiff == null) {
198 timeLineDiff = new Diff<>();
199 }
200 timeLineDiff.add(diff);
201 return timeLineDiff;
202 });
203 }
204 }
205
206 @Override
207 public Tuple getAggregateTuple(final Tuple group) {
208 return tupleFromAggregateResult(group, getAggregateResult(group));
209 }
210
211 @Override
212 public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple group) {
213 final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = this.timelines.get(group);
214 if (resultTimelines == null) {
215 if (NEUTRAL == null) {
216 return Collections.emptyMap();
217 } else {
218 return Collections.singletonMap(NEUTRAL, NEUTRAL_INITIAL_TIMELINE);
219 }
220 } else {
221 return resultTimelines;
222 }
223 }
224
225 @Override
226 public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple group) {
227 final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = getAggregateResultTimeline(group);
228 return new GroupedMap<AggregateResult, Timeline<Timestamp>>(group, resultTimelines, this.runtimeContext);
229 }
230
231 @Override
232 public CommunicationGroup getCurrentGroup() {
233 return communicationGroup;
234 }
235
236 @Override
237 public void setCurrentGroup(final CommunicationGroup currentGroup) {
238 this.communicationGroup = currentGroup;
239 }
240
241 protected interface MergeableFoldingState<T> {
242
243 public abstract T merge(final T that);
244
245 }
246
247} \ No newline at end of file