diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java new file mode 100644 index 00000000..2588bde1 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/AbstractColumnAggregatorNode.java | |||
@@ -0,0 +1,474 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.aggregation; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Map; | ||
14 | import java.util.Map.Entry; | ||
15 | import java.util.Objects; | ||
16 | |||
17 | import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext; | ||
18 | import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator; | ||
19 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
20 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
21 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
23 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
25 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
26 | import tools.refinery.viatra.runtime.rete.index.StandardIndexer; | ||
27 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
28 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
29 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
30 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
31 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
32 | import tools.refinery.viatra.runtime.rete.single.SingleInputNode; | ||
33 | |||
34 | /** | ||
35 | * Groups incoming tuples by the given mask, and aggregates values at a specific index in each group. | ||
36 | * <p> | ||
37 | * Direct children are not supported, use via outer join indexers instead. | ||
38 | * <p> | ||
39 | * There are both timeless and timely implementations. | ||
40 | * | ||
41 | * @author Tamas Szabo | ||
42 | * @since 2.2 | ||
43 | * | ||
44 | */ | ||
45 | public abstract class AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> extends SingleInputNode | ||
46 | implements Clearable, IAggregatorNode { | ||
47 | |||
48 | /** | ||
49 | * @since 1.6 | ||
50 | */ | ||
51 | protected final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator; | ||
52 | |||
53 | /** | ||
54 | * @since 1.6 | ||
55 | */ | ||
56 | protected final TupleMask groupMask; | ||
57 | |||
58 | /** | ||
59 | * @since 1.6 | ||
60 | */ | ||
61 | protected final TupleMask columnMask; | ||
62 | |||
63 | /** | ||
64 | * @since 1.6 | ||
65 | */ | ||
66 | protected final int sourceWidth; | ||
67 | |||
68 | /** | ||
69 | * @since 1.6 | ||
70 | */ | ||
71 | protected final IQueryRuntimeContext runtimeContext; | ||
72 | |||
73 | protected final AggregateResult NEUTRAL; | ||
74 | |||
75 | protected AggregatorOuterIndexer aggregatorOuterIndexer; | ||
76 | |||
77 | @SuppressWarnings("rawtypes") | ||
78 | protected AbstractColumnAggregatorNode.AggregatorOuterIdentityIndexer[] aggregatorOuterIdentityIndexers; | ||
79 | |||
80 | /** | ||
81 | * Creates a new column aggregator node. | ||
82 | * | ||
83 | * @param reteContainer | ||
84 | * the RETE container of the node | ||
85 | * @param operator | ||
86 | * the aggregation operator | ||
87 | * @param deleteRederiveEvaluation | ||
88 | * true if the node should run in DRED mode, false otherwise | ||
89 | * @param groupMask | ||
90 | * the mask that masks a tuple to obtain the key that we are grouping-by | ||
91 | * @param columnMask | ||
92 | * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over | ||
93 | * @param posetComparator | ||
94 | * the poset comparator for the column, if known, otherwise it can be null | ||
95 | * @since 1.6 | ||
96 | */ | ||
97 | public AbstractColumnAggregatorNode(final ReteContainer reteContainer, | ||
98 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
99 | final TupleMask groupMask, final TupleMask columnMask) { | ||
100 | super(reteContainer); | ||
101 | this.operator = operator; | ||
102 | this.groupMask = groupMask; | ||
103 | this.columnMask = columnMask; | ||
104 | this.sourceWidth = groupMask.indices.length; | ||
105 | this.runtimeContext = reteContainer.getNetwork().getEngine().getRuntimeContext(); | ||
106 | this.NEUTRAL = operator.getAggregate(operator.createNeutral()); | ||
107 | reteContainer.registerClearable(this); | ||
108 | } | ||
109 | |||
110 | /** | ||
111 | * Creates a new column aggregator node. | ||
112 | * | ||
113 | * @param reteContainer | ||
114 | * the RETE container of the node | ||
115 | * @param operator | ||
116 | * the aggregation operator | ||
117 | * @param groupMask | ||
118 | * the mask that masks a tuple to obtain the key that we are grouping-by | ||
119 | * @param aggregatedColumn | ||
120 | * the index of the column that the aggregator node is aggregating over | ||
121 | */ | ||
122 | public AbstractColumnAggregatorNode(final ReteContainer reteContainer, | ||
123 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
124 | final TupleMask groupMask, final int aggregatedColumn) { | ||
125 | this(reteContainer, operator, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth)); | ||
126 | } | ||
127 | |||
128 | @Override | ||
129 | public CommunicationTracker getCommunicationTracker() { | ||
130 | return this.reteContainer.getCommunicationTracker(); | ||
131 | } | ||
132 | |||
133 | @Override | ||
134 | public void pullInto(Collection<Tuple> collector, boolean flush) { | ||
135 | // DIRECT CHILDREN NOT SUPPORTED | ||
136 | throw new UnsupportedOperationException(); | ||
137 | } | ||
138 | |||
139 | @Override | ||
140 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
141 | // DIRECT CHILDREN NOT SUPPORTED | ||
142 | throw new UnsupportedOperationException(); | ||
143 | } | ||
144 | |||
145 | @Override | ||
146 | public void appendChild(Receiver receiver) { | ||
147 | // DIRECT CHILDREN NOT SUPPORTED | ||
148 | throw new UnsupportedOperationException(); | ||
149 | } | ||
150 | |||
151 | @Override | ||
152 | public Indexer getAggregatorOuterIndexer() { | ||
153 | if (aggregatorOuterIndexer == null) { | ||
154 | aggregatorOuterIndexer = new AggregatorOuterIndexer(); | ||
155 | this.getCommunicationTracker().registerDependency(this, aggregatorOuterIndexer); | ||
156 | } | ||
157 | return aggregatorOuterIndexer; | ||
158 | } | ||
159 | |||
160 | @Override | ||
161 | public Indexer getAggregatorOuterIdentityIndexer(final int resultPositionInSignature) { | ||
162 | if (aggregatorOuterIdentityIndexers == null) { | ||
163 | aggregatorOuterIdentityIndexers = new AbstractColumnAggregatorNode.AggregatorOuterIdentityIndexer[sourceWidth | ||
164 | + 1]; | ||
165 | } | ||
166 | if (aggregatorOuterIdentityIndexers[resultPositionInSignature] == null) { | ||
167 | aggregatorOuterIdentityIndexers[resultPositionInSignature] = new AggregatorOuterIdentityIndexer( | ||
168 | resultPositionInSignature); | ||
169 | this.getCommunicationTracker().registerDependency(this, | ||
170 | aggregatorOuterIdentityIndexers[resultPositionInSignature]); | ||
171 | } | ||
172 | return aggregatorOuterIdentityIndexers[resultPositionInSignature]; | ||
173 | } | ||
174 | |||
175 | /** | ||
176 | * @since 2.4 | ||
177 | */ | ||
178 | public void propagateAggregateResultUpdate(final Tuple group, final AggregateResult oldValue, | ||
179 | final AggregateResult newValue, final Timestamp timestamp) { | ||
180 | if (!Objects.equals(oldValue, newValue)) { | ||
181 | propagate(Direction.DELETE, group, oldValue, timestamp); | ||
182 | propagate(Direction.INSERT, group, newValue, timestamp); | ||
183 | } | ||
184 | } | ||
185 | |||
186 | /** | ||
187 | * @since 2.4 | ||
188 | */ | ||
189 | @SuppressWarnings("unchecked") | ||
190 | public void propagate(final Direction direction, final Tuple group, final AggregateResult value, | ||
191 | final Timestamp timestamp) { | ||
192 | final Tuple tuple = tupleFromAggregateResult(group, value); | ||
193 | |||
194 | if (aggregatorOuterIndexer != null) { | ||
195 | aggregatorOuterIndexer.propagate(direction, tuple, group, timestamp); | ||
196 | } | ||
197 | if (aggregatorOuterIdentityIndexers != null) { | ||
198 | for (final AggregatorOuterIdentityIndexer aggregatorOuterIdentityIndexer : aggregatorOuterIdentityIndexers) { | ||
199 | if (aggregatorOuterIdentityIndexer != null) { | ||
200 | aggregatorOuterIdentityIndexer.propagate(direction, tuple, group, timestamp); | ||
201 | } | ||
202 | } | ||
203 | } | ||
204 | } | ||
205 | |||
206 | public abstract Tuple getAggregateTuple(final Tuple key); | ||
207 | |||
208 | /** | ||
209 | * @since 2.4 | ||
210 | */ | ||
211 | public abstract Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple key); | ||
212 | |||
213 | public abstract AggregateResult getAggregateResult(final Tuple key); | ||
214 | |||
215 | /** | ||
216 | * @since 2.4 | ||
217 | */ | ||
218 | public abstract Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple key); | ||
219 | |||
220 | protected Tuple tupleFromAggregateResult(final Tuple groupTuple, final AggregateResult aggregateResult) { | ||
221 | if (aggregateResult == null) { | ||
222 | return null; | ||
223 | } else { | ||
224 | return Tuples.staticArityLeftInheritanceTupleOf(groupTuple, runtimeContext.wrapElement(aggregateResult)); | ||
225 | } | ||
226 | } | ||
227 | |||
228 | /** | ||
229 | * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original | ||
230 | * signature. | ||
231 | * | ||
232 | * @author Gabor Bergmann | ||
233 | * @author Tamas Szabo | ||
234 | * | ||
235 | */ | ||
236 | protected class AggregatorOuterIndexer extends StandardIndexer { | ||
237 | |||
238 | /** | ||
239 | * @since 2.4 | ||
240 | */ | ||
241 | protected NetworkStructureChangeSensitiveLogic logic; | ||
242 | |||
243 | public AggregatorOuterIndexer() { | ||
244 | super(AbstractColumnAggregatorNode.this.reteContainer, TupleMask.omit(sourceWidth, sourceWidth + 1)); | ||
245 | this.parent = AbstractColumnAggregatorNode.this; | ||
246 | this.logic = createLogic(); | ||
247 | } | ||
248 | |||
249 | @Override | ||
250 | public void networkStructureChanged() { | ||
251 | super.networkStructureChanged(); | ||
252 | this.logic = createLogic(); | ||
253 | } | ||
254 | |||
255 | @Override | ||
256 | public Collection<Tuple> get(final Tuple signature) { | ||
257 | return this.logic.get(signature); | ||
258 | } | ||
259 | |||
260 | @Override | ||
261 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) { | ||
262 | return this.logic.getTimeline(signature); | ||
263 | } | ||
264 | |||
265 | /** | ||
266 | * @since 2.4 | ||
267 | */ | ||
268 | public void propagate(final Direction direction, final Tuple tuple, final Tuple group, | ||
269 | final Timestamp timestamp) { | ||
270 | if (tuple != null) { | ||
271 | propagate(direction, tuple, group, true, timestamp); | ||
272 | } | ||
273 | } | ||
274 | |||
275 | @Override | ||
276 | public Node getActiveNode() { | ||
277 | return AbstractColumnAggregatorNode.this; | ||
278 | } | ||
279 | |||
280 | /** | ||
281 | * @since 2.4 | ||
282 | */ | ||
283 | protected NetworkStructureChangeSensitiveLogic createLogic() { | ||
284 | if (this.reteContainer.isTimelyEvaluation() | ||
285 | && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) { | ||
286 | return this.TIMELY; | ||
287 | } else { | ||
288 | return this.TIMELESS; | ||
289 | } | ||
290 | } | ||
291 | |||
292 | private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() { | ||
293 | |||
294 | @Override | ||
295 | public Collection<Tuple> get(final Tuple signature) { | ||
296 | final Tuple aggregateTuple = getAggregateTuple(signature); | ||
297 | if (aggregateTuple == null) { | ||
298 | return null; | ||
299 | } else { | ||
300 | return Collections.singleton(aggregateTuple); | ||
301 | } | ||
302 | } | ||
303 | |||
304 | @Override | ||
305 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) { | ||
306 | throw new UnsupportedOperationException(); | ||
307 | } | ||
308 | |||
309 | }; | ||
310 | |||
311 | private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() { | ||
312 | |||
313 | @Override | ||
314 | public Collection<Tuple> get(final Tuple signatureWithResult) { | ||
315 | return TIMELESS.get(signatureWithResult); | ||
316 | } | ||
317 | |||
318 | @Override | ||
319 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) { | ||
320 | final Map<Tuple, Timeline<Timestamp>> aggregateTuples = getAggregateTupleTimeline(signature); | ||
321 | if (aggregateTuples.isEmpty()) { | ||
322 | return null; | ||
323 | } else { | ||
324 | return aggregateTuples; | ||
325 | } | ||
326 | } | ||
327 | |||
328 | }; | ||
329 | |||
330 | } | ||
331 | |||
332 | /** | ||
333 | * A special non-iterable index that checks a suspected aggregate value for a given signature. The signature for | ||
334 | * this index is the original 'group by' masked tuple, with the suspected result inserted at position | ||
335 | * resultPositionInSignature. | ||
336 | * | ||
337 | * @author Gabor Bergmann | ||
338 | * @author Tamas Szabo | ||
339 | * | ||
340 | */ | ||
341 | protected class AggregatorOuterIdentityIndexer extends StandardIndexer { | ||
342 | |||
343 | protected final int resultPositionInSignature; | ||
344 | protected final TupleMask pruneResult; | ||
345 | protected final TupleMask reorderMask; | ||
346 | /** | ||
347 | * @since 2.4 | ||
348 | */ | ||
349 | protected NetworkStructureChangeSensitiveLogic logic; | ||
350 | |||
351 | public AggregatorOuterIdentityIndexer(final int resultPositionInSignature) { | ||
352 | super(AbstractColumnAggregatorNode.this.reteContainer, | ||
353 | TupleMask.displace(sourceWidth, resultPositionInSignature, sourceWidth + 1)); | ||
354 | this.resultPositionInSignature = resultPositionInSignature; | ||
355 | this.pruneResult = TupleMask.omit(resultPositionInSignature, sourceWidth + 1); | ||
356 | if (resultPositionInSignature == sourceWidth) { | ||
357 | this.reorderMask = null; | ||
358 | } else { | ||
359 | this.reorderMask = mask; | ||
360 | } | ||
361 | this.logic = createLogic(); | ||
362 | } | ||
363 | |||
364 | @Override | ||
365 | public void networkStructureChanged() { | ||
366 | super.networkStructureChanged(); | ||
367 | this.logic = createLogic(); | ||
368 | } | ||
369 | |||
370 | @Override | ||
371 | public Collection<Tuple> get(final Tuple signatureWithResult) { | ||
372 | return this.logic.get(signatureWithResult); | ||
373 | } | ||
374 | |||
375 | /** | ||
376 | * @since 2.4 | ||
377 | */ | ||
378 | @Override | ||
379 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) { | ||
380 | return this.logic.getTimeline(signature); | ||
381 | } | ||
382 | |||
383 | /** | ||
384 | * @since 2.4 | ||
385 | */ | ||
386 | public void propagate(final Direction direction, final Tuple tuple, final Tuple group, | ||
387 | final Timestamp timestamp) { | ||
388 | if (tuple != null) { | ||
389 | propagate(direction, reorder(tuple), group, true, timestamp); | ||
390 | } | ||
391 | } | ||
392 | |||
393 | private Tuple reorder(final Tuple signatureWithResult) { | ||
394 | Tuple transformed; | ||
395 | if (reorderMask == null) { | ||
396 | transformed = signatureWithResult; | ||
397 | } else { | ||
398 | transformed = reorderMask.transform(signatureWithResult); | ||
399 | } | ||
400 | return transformed; | ||
401 | } | ||
402 | |||
403 | @Override | ||
404 | public Node getActiveNode() { | ||
405 | return this.parent; | ||
406 | } | ||
407 | |||
408 | /** | ||
409 | * @since 2.4 | ||
410 | */ | ||
411 | protected NetworkStructureChangeSensitiveLogic createLogic() { | ||
412 | if (this.reteContainer.isTimelyEvaluation() | ||
413 | && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) { | ||
414 | return this.TIMELY; | ||
415 | } else { | ||
416 | return this.TIMELESS; | ||
417 | } | ||
418 | } | ||
419 | |||
420 | private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() { | ||
421 | |||
422 | @Override | ||
423 | public Collection<Tuple> get(final Tuple signatureWithResult) { | ||
424 | final Tuple prunedSignature = pruneResult.transform(signatureWithResult); | ||
425 | final AggregateResult result = getAggregateResult(prunedSignature); | ||
426 | if (result != null && Objects.equals(signatureWithResult.get(resultPositionInSignature), result)) { | ||
427 | return Collections.singleton(signatureWithResult); | ||
428 | } else { | ||
429 | return null; | ||
430 | } | ||
431 | } | ||
432 | |||
433 | @Override | ||
434 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature) { | ||
435 | throw new UnsupportedOperationException(); | ||
436 | } | ||
437 | |||
438 | }; | ||
439 | |||
440 | private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() { | ||
441 | |||
442 | @Override | ||
443 | public Collection<Tuple> get(final Tuple signatureWithResult) { | ||
444 | return TIMELESS.get(signatureWithResult); | ||
445 | } | ||
446 | |||
447 | @Override | ||
448 | public Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signatureWithResult) { | ||
449 | final Tuple prunedSignature = pruneResult.transform(signatureWithResult); | ||
450 | final Map<AggregateResult, Timeline<Timestamp>> result = getAggregateResultTimeline(prunedSignature); | ||
451 | for (final Entry<AggregateResult, Timeline<Timestamp>> entry : result.entrySet()) { | ||
452 | if (Objects.equals(signatureWithResult.get(resultPositionInSignature), entry.getKey())) { | ||
453 | return Collections.singletonMap(signatureWithResult, entry.getValue()); | ||
454 | } | ||
455 | } | ||
456 | return null; | ||
457 | } | ||
458 | |||
459 | }; | ||
460 | |||
461 | } | ||
462 | |||
463 | /** | ||
464 | * @since 2.4 | ||
465 | */ | ||
466 | protected static abstract class NetworkStructureChangeSensitiveLogic { | ||
467 | |||
468 | public abstract Collection<Tuple> get(final Tuple signatureWithResult); | ||
469 | |||
470 | public abstract Map<Tuple, Timeline<Timestamp>> getTimeline(final Tuple signature); | ||
471 | |||
472 | } | ||
473 | |||
474 | } | ||