diff options
author | Kristóf Marussy <marussy@mit.bme.hu> | 2023-09-14 19:29:36 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-14 19:29:36 +0200 |
commit | 98ed3b6db5f4e51961a161050cc31c66015116e8 (patch) | |
tree | 8bfd6d9bc8d6ed23b9eb0f889dd40b6c24fe8f92 /subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java | |
parent | Merge pull request #38 from nagilooh/design-space-exploration (diff) | |
parent | Merge remote-tracking branch 'upstream/main' into partial-interpretation (diff) | |
download | refinery-98ed3b6db5f4e51961a161050cc31c66015116e8.tar.gz refinery-98ed3b6db5f4e51961a161050cc31c66015116e8.tar.zst refinery-98ed3b6db5f4e51961a161050cc31c66015116e8.zip |
Merge pull request #39 from kris7t/partial-interpretation
Implement partial interpretation based model generation
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java | 369 |
1 files changed, 369 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java new file mode 100644 index 00000000..4480aed8 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/ColumnAggregatorNode.java | |||
@@ -0,0 +1,369 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.aggregation; | ||
10 | |||
11 | import java.util.Map; | ||
12 | import java.util.Map.Entry; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.matchers.context.IPosetComparator; | ||
15 | import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
21 | import tools.refinery.viatra.runtime.rete.network.PosetAwareReceiver; | ||
22 | import tools.refinery.viatra.runtime.rete.network.RederivableNode; | ||
23 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.timeless.RecursiveCommunicationGroup; | ||
27 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
28 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
29 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.PosetAwareMailbox; | ||
30 | |||
31 | /** | ||
32 | * Timeless implementation of the column aggregator node. | ||
33 | * <p> | ||
34 | * The node is capable of operating in the delete and re-derive mode. In this mode, it is also possible to equip the | ||
35 | * node with an {@link IPosetComparator} to identify monotone changes; thus, ensuring that a fix-point can be reached | ||
36 | * during the evaluation. | ||
37 | * | ||
38 | * @author Gabor Bergmann | ||
39 | * @author Tamas Szabo | ||
40 | * @since 1.4 | ||
41 | */ | ||
42 | public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult> | ||
43 | extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> | ||
44 | implements RederivableNode, PosetAwareReceiver { | ||
45 | |||
46 | /** | ||
47 | * @since 1.6 | ||
48 | */ | ||
49 | protected final IPosetComparator posetComparator; | ||
50 | |||
51 | /** | ||
52 | * @since 1.6 | ||
53 | */ | ||
54 | protected final boolean deleteRederiveEvaluation; | ||
55 | |||
56 | // invariant: neutral values are not stored | ||
57 | /** | ||
58 | * @since 1.6 | ||
59 | */ | ||
60 | protected final Map<Tuple, Accumulator> memory; | ||
61 | /** | ||
62 | * @since 1.6 | ||
63 | */ | ||
64 | protected final Map<Tuple, Accumulator> rederivableMemory; | ||
65 | |||
66 | /** | ||
67 | * @since 1.7 | ||
68 | */ | ||
69 | protected CommunicationGroup currentGroup; | ||
70 | |||
71 | /** | ||
72 | * Creates a new column aggregator node. | ||
73 | * | ||
74 | * @param reteContainer | ||
75 | * the RETE container of the node | ||
76 | * @param operator | ||
77 | * the aggregation operator | ||
78 | * @param deleteRederiveEvaluation | ||
79 | * true if the node should run in DRED mode, false otherwise | ||
80 | * @param groupMask | ||
81 | * the mask that masks a tuple to obtain the key that we are grouping-by | ||
82 | * @param columnMask | ||
83 | * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over | ||
84 | * @param posetComparator | ||
85 | * the poset comparator for the column, if known, otherwise it can be null | ||
86 | * @since 1.6 | ||
87 | */ | ||
88 | public ColumnAggregatorNode(final ReteContainer reteContainer, | ||
89 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
90 | final boolean deleteRederiveEvaluation, final TupleMask groupMask, final TupleMask columnMask, | ||
91 | final IPosetComparator posetComparator) { | ||
92 | super(reteContainer, operator, groupMask, columnMask); | ||
93 | this.memory = CollectionsFactory.createMap(); | ||
94 | this.rederivableMemory = CollectionsFactory.createMap(); | ||
95 | this.deleteRederiveEvaluation = deleteRederiveEvaluation; | ||
96 | this.posetComparator = posetComparator; | ||
97 | // mailbox MUST be instantiated after the fields are all set | ||
98 | this.mailbox = instantiateMailbox(); | ||
99 | } | ||
100 | |||
101 | /** | ||
102 | * Creates a new column aggregator node. | ||
103 | * | ||
104 | * @param reteContainer | ||
105 | * the RETE container of the node | ||
106 | * @param operator | ||
107 | * the aggregation operator | ||
108 | * @param groupMask | ||
109 | * the mask that masks a tuple to obtain the key that we are grouping-by | ||
110 | * @param aggregatedColumn | ||
111 | * the index of the column that the aggregator node is aggregating over | ||
112 | */ | ||
113 | public ColumnAggregatorNode(final ReteContainer reteContainer, | ||
114 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
115 | final TupleMask groupMask, final int aggregatedColumn) { | ||
116 | this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth), | ||
117 | null); | ||
118 | } | ||
119 | |||
120 | @Override | ||
121 | public boolean isInDRedMode() { | ||
122 | return this.deleteRederiveEvaluation; | ||
123 | } | ||
124 | |||
125 | @Override | ||
126 | protected Mailbox instantiateMailbox() { | ||
127 | if (groupMask != null && columnMask != null && posetComparator != null) { | ||
128 | return new PosetAwareMailbox(this, this.reteContainer); | ||
129 | } else { | ||
130 | return new BehaviorChangingMailbox(this, this.reteContainer); | ||
131 | } | ||
132 | } | ||
133 | |||
134 | @Override | ||
135 | public TupleMask getCoreMask() { | ||
136 | return groupMask; | ||
137 | } | ||
138 | |||
139 | @Override | ||
140 | public TupleMask getPosetMask() { | ||
141 | return columnMask; | ||
142 | } | ||
143 | |||
144 | @Override | ||
145 | public IPosetComparator getPosetComparator() { | ||
146 | return posetComparator; | ||
147 | } | ||
148 | |||
149 | @Override | ||
150 | public void rederiveOne() { | ||
151 | final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next(); | ||
152 | final Tuple group = entry.getKey(); | ||
153 | final Accumulator accumulator = entry.getValue(); | ||
154 | rederivableMemory.remove(group); | ||
155 | memory.put(group, accumulator); | ||
156 | // unregister the node if there is nothing left to be re-derived | ||
157 | if (this.rederivableMemory.isEmpty()) { | ||
158 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | ||
159 | } | ||
160 | final AggregateResult value = operator.getAggregate(accumulator); | ||
161 | propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO); | ||
162 | } | ||
163 | |||
164 | @Override | ||
165 | public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) { | ||
166 | if (this.deleteRederiveEvaluation) { | ||
167 | updateWithDeleteAndRederive(direction, update, monotone); | ||
168 | } else { | ||
169 | updateDefault(direction, update, Timestamp.ZERO); | ||
170 | } | ||
171 | } | ||
172 | |||
173 | @Override | ||
174 | public void update(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
175 | updateWithPosetInfo(direction, update, false); | ||
176 | } | ||
177 | |||
178 | /** | ||
179 | * @since 2.4 | ||
180 | */ | ||
181 | protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
182 | final Tuple key = groupMask.transform(update); | ||
183 | final Tuple value = columnMask.transform(update); | ||
184 | @SuppressWarnings("unchecked") | ||
185 | final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); | ||
186 | final boolean isInsertion = direction == Direction.INSERT; | ||
187 | |||
188 | final Accumulator oldMainAccumulator = getMainAccumulator(key); | ||
189 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | ||
190 | |||
191 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion); | ||
192 | storeIfNotNeutral(key, newMainAccumulator, memory); | ||
193 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | ||
194 | |||
195 | propagateAggregateResultUpdate(key, oldValue, newValue, timestamp); | ||
196 | } | ||
197 | |||
198 | /** | ||
199 | * @since 2.4 | ||
200 | */ | ||
201 | protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) { | ||
202 | final Tuple group = groupMask.transform(update); | ||
203 | final Tuple value = columnMask.transform(update); | ||
204 | @SuppressWarnings("unchecked") | ||
205 | final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); | ||
206 | final boolean isInsertion = direction == Direction.INSERT; | ||
207 | |||
208 | Accumulator oldMainAccumulator = memory.get(group); | ||
209 | Accumulator oldRederivableAccumulator = rederivableMemory.get(group); | ||
210 | |||
211 | if (direction == Direction.INSERT) { | ||
212 | // INSERT | ||
213 | if (oldRederivableAccumulator != null) { | ||
214 | // the group is in the re-derivable memory | ||
215 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, | ||
216 | aggregableValue, isInsertion); | ||
217 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); | ||
218 | if (rederivableMemory.isEmpty()) { | ||
219 | // there is nothing left to be re-derived | ||
220 | // this can happen if the accumulator became neutral in response to the INSERT | ||
221 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | ||
222 | } | ||
223 | } else { | ||
224 | // the group is in the main memory | ||
225 | // at this point, it can happen that we need to initialize with a neutral accumulator | ||
226 | if (oldMainAccumulator == null) { | ||
227 | oldMainAccumulator = operator.createNeutral(); | ||
228 | } | ||
229 | |||
230 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | ||
231 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, | ||
232 | isInsertion); | ||
233 | storeIfNotNeutral(group, newMainAccumulator, memory); | ||
234 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | ||
235 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); | ||
236 | } | ||
237 | } else { | ||
238 | // DELETE | ||
239 | if (oldRederivableAccumulator != null) { | ||
240 | // the group is in the re-derivable memory | ||
241 | if (oldMainAccumulator != null) { | ||
242 | issueError("[INTERNAL ERROR] Inconsistent state for " + update | ||
243 | + " because it is present both in the main and re-derivable memory in the ColumnAggregatorNode " | ||
244 | + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null); | ||
245 | } | ||
246 | try { | ||
247 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, | ||
248 | aggregableValue, isInsertion); | ||
249 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); | ||
250 | if (rederivableMemory.isEmpty()) { | ||
251 | // there is nothing left to be re-derived | ||
252 | // this can happen if the accumulator became neutral in response to the DELETE | ||
253 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | ||
254 | } | ||
255 | } catch (final NullPointerException ex) { | ||
256 | issueError("[INTERNAL ERROR] Deleting a domain element in " + update | ||
257 | + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) " | ||
258 | + getTraceInfoPatternsEnumerated(), ex); | ||
259 | } | ||
260 | } else { | ||
261 | // the group is in the main memory | ||
262 | // at this point, it can happen that we need to initialize with a neutral accumulator | ||
263 | if (oldMainAccumulator == null) { | ||
264 | oldMainAccumulator = operator.createNeutral(); | ||
265 | } | ||
266 | |||
267 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | ||
268 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, | ||
269 | isInsertion); | ||
270 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | ||
271 | |||
272 | if (monotone) { | ||
273 | storeIfNotNeutral(group, newMainAccumulator, memory); | ||
274 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); | ||
275 | } else { | ||
276 | final boolean wasEmpty = rederivableMemory.isEmpty(); | ||
277 | if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) { | ||
278 | ((RecursiveCommunicationGroup) currentGroup).addRederivable(this); | ||
279 | } | ||
280 | memory.remove(group); | ||
281 | propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO); | ||
282 | } | ||
283 | } | ||
284 | } | ||
285 | } | ||
286 | |||
287 | @Override | ||
288 | public void clear() { | ||
289 | this.memory.clear(); | ||
290 | this.rederivableMemory.clear(); | ||
291 | this.childMailboxes.clear(); | ||
292 | } | ||
293 | |||
294 | /** | ||
295 | * Returns true if the accumulator was stored, false otherwise. | ||
296 | * | ||
297 | * @since 1.6 | ||
298 | */ | ||
299 | protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator, | ||
300 | final Map<Tuple, Accumulator> memory) { | ||
301 | if (operator.isNeutral(accumulator)) { | ||
302 | memory.remove(key); | ||
303 | return false; | ||
304 | } else { | ||
305 | memory.put(key, accumulator); | ||
306 | return true; | ||
307 | } | ||
308 | } | ||
309 | |||
310 | @Override | ||
311 | public Tuple getAggregateTuple(final Tuple group) { | ||
312 | final Accumulator accumulator = getMainAccumulator(group); | ||
313 | final AggregateResult result = operator.getAggregate(accumulator); | ||
314 | return tupleFromAggregateResult(group, result); | ||
315 | } | ||
316 | |||
317 | @Override | ||
318 | public AggregateResult getAggregateResult(final Tuple group) { | ||
319 | final Accumulator accumulator = getMainAccumulator(group); | ||
320 | return operator.getAggregate(accumulator); | ||
321 | } | ||
322 | |||
323 | @Override | ||
324 | public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) { | ||
325 | throw new UnsupportedOperationException(); | ||
326 | } | ||
327 | |||
328 | @Override | ||
329 | public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) { | ||
330 | throw new UnsupportedOperationException(); | ||
331 | } | ||
332 | |||
333 | /** | ||
334 | * @since 1.6 | ||
335 | */ | ||
336 | protected Accumulator getMainAccumulator(final Tuple key) { | ||
337 | return getAccumulator(key, memory); | ||
338 | } | ||
339 | |||
340 | /** | ||
341 | * @since 1.6 | ||
342 | */ | ||
343 | protected Accumulator getRederivableAccumulator(final Tuple key) { | ||
344 | return getAccumulator(key, rederivableMemory); | ||
345 | } | ||
346 | |||
347 | /** | ||
348 | * @since 1.6 | ||
349 | */ | ||
350 | protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) { | ||
351 | Accumulator accumulator = memory.get(key); | ||
352 | if (accumulator == null) { | ||
353 | return operator.createNeutral(); | ||
354 | } else { | ||
355 | return accumulator; | ||
356 | } | ||
357 | } | ||
358 | |||
359 | @Override | ||
360 | public CommunicationGroup getCurrentGroup() { | ||
361 | return currentGroup; | ||
362 | } | ||
363 | |||
364 | @Override | ||
365 | public void setCurrentGroup(final CommunicationGroup currentGroup) { | ||
366 | this.currentGroup = currentGroup; | ||
367 | } | ||
368 | |||
369 | } | ||