diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java | 247 |
1 files changed, 247 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java new file mode 100644 index 00000000..8fe9a4e9 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FaithfulTimelyColumnAggregatorNode.java | |||
@@ -0,0 +1,247 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.aggregation.timely; | ||
10 | |||
11 | import java.util.Collections; | ||
12 | import java.util.Map; | ||
13 | import java.util.Map.Entry; | ||
14 | import java.util.Objects; | ||
15 | import java.util.TreeMap; | ||
16 | |||
17 | import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator; | ||
18 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
19 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
21 | import tools.refinery.viatra.runtime.matchers.util.Signed; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.timeline.Diff; | ||
23 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines; | ||
25 | import tools.refinery.viatra.runtime.rete.aggregation.AbstractColumnAggregatorNode; | ||
26 | import tools.refinery.viatra.runtime.rete.aggregation.GroupedMap; | ||
27 | import tools.refinery.viatra.runtime.rete.aggregation.timely.FaithfulTimelyColumnAggregatorNode.MergeableFoldingState; | ||
28 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
29 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
30 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
31 | import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode; | ||
32 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
33 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
34 | |||
35 | /** | ||
36 | * Faithful timely implementation of the column aggregator node. Complete timelines (series of appearance & | ||
37 | * disappearance) are maintained for tuples. <br> | ||
38 | * <br> | ||
39 | * Subclasses are responsible for implementing the aggregator architecture, and they must use the CumulativeAggregate | ||
40 | * type parameter for that. <br> | ||
41 | * <br> | ||
42 | * This node supports recursive aggregation. | ||
43 | * | ||
44 | * @author Tamas Szabo | ||
45 | * @since 2.4 | ||
46 | */ | ||
47 | public abstract class FaithfulTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult, CumulativeAggregate, FoldingState extends MergeableFoldingState<FoldingState>> | ||
48 | extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> implements ResumableNode { | ||
49 | |||
50 | protected final Map<Tuple, TreeMap<Timestamp, CumulativeAggregate>> aggregates; | ||
51 | protected final Map<Tuple, Map<AggregateResult, Timeline<Timestamp>>> timelines; | ||
52 | protected final TreeMap<Timestamp, Map<Tuple, FoldingState>> foldingState; | ||
53 | protected CommunicationGroup communicationGroup; | ||
54 | |||
55 | public FaithfulTimelyColumnAggregatorNode(final ReteContainer reteContainer, | ||
56 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
57 | final TupleMask groupMask, final TupleMask columnMask) { | ||
58 | super(reteContainer, operator, groupMask, columnMask); | ||
59 | this.aggregates = CollectionsFactory.createMap(); | ||
60 | this.timelines = CollectionsFactory.createMap(); | ||
61 | this.foldingState = CollectionsFactory.createTreeMap(); | ||
62 | // mailbox MUST be instantiated after the fields are all set | ||
63 | this.mailbox = instantiateMailbox(); | ||
64 | } | ||
65 | |||
66 | @Override | ||
67 | protected Mailbox instantiateMailbox() { | ||
68 | return new TimelyMailbox(this, this.reteContainer); | ||
69 | } | ||
70 | |||
71 | @Override | ||
72 | public void clear() { | ||
73 | this.mailbox.clear(); | ||
74 | this.aggregates.clear(); | ||
75 | this.timelines.clear(); | ||
76 | this.children.clear(); | ||
77 | this.childMailboxes.clear(); | ||
78 | this.foldingState.clear(); | ||
79 | } | ||
80 | |||
81 | /** | ||
82 | * Registers the given folding state for the specified timestamp and tuple. If there is already a state stored, the | ||
83 | * two states will be merged together. | ||
84 | * | ||
85 | * | ||
86 | */ | ||
87 | protected void addFoldingState(final Tuple group, final FoldingState state, final Timestamp timestamp) { | ||
88 | // assert !state.delta.isEmpty(); | ||
89 | final Map<Tuple, FoldingState> tupleMap = this.foldingState.computeIfAbsent(timestamp, | ||
90 | k -> CollectionsFactory.createMap()); | ||
91 | tupleMap.compute(group, (k, v) -> { | ||
92 | return v == null ? state : v.merge(state); | ||
93 | }); | ||
94 | } | ||
95 | |||
96 | @Override | ||
97 | public Timestamp getResumableTimestamp() { | ||
98 | if (this.foldingState.isEmpty()) { | ||
99 | return null; | ||
100 | } else { | ||
101 | return this.foldingState.firstKey(); | ||
102 | } | ||
103 | } | ||
104 | |||
105 | @Override | ||
106 | public void resumeAt(final Timestamp timestamp) { | ||
107 | Timestamp current = this.getResumableTimestamp(); | ||
108 | if (current == null) { | ||
109 | throw new IllegalStateException("There is nothing to fold!"); | ||
110 | } else if (current.compareTo(timestamp) != 0) { | ||
111 | throw new IllegalStateException("Expected to continue folding at " + timestamp + "!"); | ||
112 | } | ||
113 | |||
114 | final Map<Tuple, FoldingState> tupleMap = this.foldingState.remove(timestamp); | ||
115 | for (final Entry<Tuple, FoldingState> groupEntry : tupleMap.entrySet()) { | ||
116 | final Tuple group = groupEntry.getKey(); | ||
117 | final FoldingState value = groupEntry.getValue(); | ||
118 | final Map<AggregateResult, Diff<Timestamp>> diffMap = doFoldingStep(group, value, timestamp); | ||
119 | for (final Entry<AggregateResult, Diff<Timestamp>> resultEntry : diffMap.entrySet()) { | ||
120 | for (final Signed<Timestamp> signed : resultEntry.getValue()) { | ||
121 | propagate(signed.getDirection(), group, resultEntry.getKey(), signed.getPayload()); | ||
122 | } | ||
123 | } | ||
124 | } | ||
125 | |||
126 | final Timestamp nextTimestamp = this.getResumableTimestamp(); | ||
127 | if (Objects.equals(timestamp, nextTimestamp)) { | ||
128 | throw new IllegalStateException( | ||
129 | "Folding at " + timestamp + " produced more folding work at the same timestamp!"); | ||
130 | } else if (nextTimestamp != null) { | ||
131 | this.communicationGroup.notifyHasMessage(this.mailbox, nextTimestamp); | ||
132 | } | ||
133 | } | ||
134 | |||
135 | protected abstract Map<AggregateResult, Diff<Timestamp>> doFoldingStep(final Tuple group, final FoldingState state, | ||
136 | final Timestamp timestamp); | ||
137 | |||
138 | /** | ||
139 | * Updates and garbage collects the timeline of the given tuple based on the given diffs. | ||
140 | */ | ||
141 | protected void updateTimeline(final Tuple group, final Map<AggregateResult, Diff<Timestamp>> diffs) { | ||
142 | if (!diffs.isEmpty()) { | ||
143 | this.timelines.compute(group, (k, resultTimelines) -> { | ||
144 | if (resultTimelines == null) { | ||
145 | resultTimelines = CollectionsFactory.createMap(); | ||
146 | } | ||
147 | for (final Entry<AggregateResult, Diff<Timestamp>> entry : diffs.entrySet()) { | ||
148 | final AggregateResult result = entry.getKey(); | ||
149 | resultTimelines.compute(result, (k2, oldResultTimeline) -> { | ||
150 | final Diff<Timestamp> currentResultDiffs = entry.getValue(); | ||
151 | if (oldResultTimeline == null) { | ||
152 | oldResultTimeline = getInitialTimeline(result); | ||
153 | } | ||
154 | final Timeline<Timestamp> timeline = oldResultTimeline.mergeAdditive(currentResultDiffs); | ||
155 | if (timeline.isEmpty()) { | ||
156 | return null; | ||
157 | } else { | ||
158 | return timeline; | ||
159 | } | ||
160 | }); | ||
161 | } | ||
162 | if (resultTimelines.isEmpty()) { | ||
163 | return null; | ||
164 | } else { | ||
165 | return resultTimelines; | ||
166 | } | ||
167 | }); | ||
168 | } | ||
169 | } | ||
170 | |||
171 | /** | ||
172 | * Garbage collects the counter of the given group and timestamp if the bag of aggregands is empty. | ||
173 | */ | ||
174 | protected abstract void gcAggregates(final CumulativeAggregate aggregate, final Tuple group, | ||
175 | final Timestamp timestamp); | ||
176 | |||
177 | /** | ||
178 | * On-demand initializes and returns the aggregate for the given group and timestamp. | ||
179 | */ | ||
180 | protected abstract CumulativeAggregate getAggregate(final Tuple group, final Timestamp timestamp); | ||
181 | |||
182 | protected static final Timeline<Timestamp> NEUTRAL_INITIAL_TIMELINE = Timestamp.INSERT_AT_ZERO_TIMELINE; | ||
183 | protected static final Timeline<Timestamp> NON_NEUTRAL_INITIAL_TIMELINE = Timelines.createEmpty(); | ||
184 | |||
185 | protected Timeline<Timestamp> getInitialTimeline(final AggregateResult result) { | ||
186 | if (NEUTRAL == result) { | ||
187 | return NEUTRAL_INITIAL_TIMELINE; | ||
188 | } else { | ||
189 | return NON_NEUTRAL_INITIAL_TIMELINE; | ||
190 | } | ||
191 | } | ||
192 | |||
193 | protected static <AggregateResult> void appendDiff(final AggregateResult result, final Signed<Timestamp> diff, | ||
194 | final Map<AggregateResult, Diff<Timestamp>> diffs) { | ||
195 | if (result != null) { | ||
196 | diffs.compute(result, (k, timeLineDiff) -> { | ||
197 | if (timeLineDiff == null) { | ||
198 | timeLineDiff = new Diff<>(); | ||
199 | } | ||
200 | timeLineDiff.add(diff); | ||
201 | return timeLineDiff; | ||
202 | }); | ||
203 | } | ||
204 | } | ||
205 | |||
206 | @Override | ||
207 | public Tuple getAggregateTuple(final Tuple group) { | ||
208 | return tupleFromAggregateResult(group, getAggregateResult(group)); | ||
209 | } | ||
210 | |||
211 | @Override | ||
212 | public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple group) { | ||
213 | final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = this.timelines.get(group); | ||
214 | if (resultTimelines == null) { | ||
215 | if (NEUTRAL == null) { | ||
216 | return Collections.emptyMap(); | ||
217 | } else { | ||
218 | return Collections.singletonMap(NEUTRAL, NEUTRAL_INITIAL_TIMELINE); | ||
219 | } | ||
220 | } else { | ||
221 | return resultTimelines; | ||
222 | } | ||
223 | } | ||
224 | |||
225 | @Override | ||
226 | public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple group) { | ||
227 | final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = getAggregateResultTimeline(group); | ||
228 | return new GroupedMap<AggregateResult, Timeline<Timestamp>>(group, resultTimelines, this.runtimeContext); | ||
229 | } | ||
230 | |||
231 | @Override | ||
232 | public CommunicationGroup getCurrentGroup() { | ||
233 | return communicationGroup; | ||
234 | } | ||
235 | |||
236 | @Override | ||
237 | public void setCurrentGroup(final CommunicationGroup currentGroup) { | ||
238 | this.communicationGroup = currentGroup; | ||
239 | } | ||
240 | |||
241 | protected interface MergeableFoldingState<T> { | ||
242 | |||
243 | public abstract T merge(final T that); | ||
244 | |||
245 | } | ||
246 | |||
247 | } \ No newline at end of file | ||