diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java new file mode 100644 index 00000000..d9a94a82 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java | |||
@@ -0,0 +1,278 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2009 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.aggregation; | ||
11 | |||
12 | import java.util.Collection; | ||
13 | import java.util.Collections; | ||
14 | import java.util.Map; | ||
15 | import java.util.Map.Entry; | ||
16 | |||
17 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
18 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
19 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
21 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
23 | import tools.refinery.viatra.runtime.rete.index.DefaultIndexerListener; | ||
24 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
25 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
26 | import tools.refinery.viatra.runtime.rete.index.StandardIndexer; | ||
27 | import tools.refinery.viatra.runtime.rete.network.Node; | ||
28 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
29 | import tools.refinery.viatra.runtime.rete.network.StandardNode; | ||
30 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
31 | import tools.refinery.viatra.runtime.rete.traceability.TraceInfo; | ||
32 | |||
33 | /** | ||
34 | * A special node depending on a projection indexer to aggregate tuple groups with the same projection. Only propagates | ||
35 | * the aggregates of non-empty groups. Use the outer indexers to circumvent. | ||
36 | * <p> | ||
37 | * This node cannot be used in recursive differential dataflow evaluation. | ||
38 | * | ||
39 | * @author Gabor Bergmann | ||
40 | * @since 1.4 | ||
41 | */ | ||
42 | public abstract class IndexerBasedAggregatorNode extends StandardNode implements IAggregatorNode { | ||
43 | |||
44 | ProjectionIndexer projection; | ||
45 | IndexerBasedAggregatorNode me; | ||
46 | int sourceWidth; | ||
47 | Map<Tuple, Object> mainAggregates; | ||
48 | |||
49 | AggregatorOuterIndexer aggregatorOuterIndexer = null; | ||
50 | AggregatorOuterIdentityIndexer[] aggregatorOuterIdentityIndexers = null; | ||
51 | |||
52 | /** | ||
53 | * MUST call initializeWith() afterwards! | ||
54 | */ | ||
55 | public IndexerBasedAggregatorNode(ReteContainer reteContainer) { | ||
56 | super(reteContainer); | ||
57 | this.me = this; | ||
58 | mainAggregates = CollectionsFactory.createMap(); | ||
59 | } | ||
60 | |||
61 | @Override | ||
62 | public void networkStructureChanged() { | ||
63 | if (this.reteContainer.isTimelyEvaluation() && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) { | ||
64 | throw new IllegalStateException(this.toString() + " cannot be used in recursive differential dataflow evaluation!"); | ||
65 | } | ||
66 | super.networkStructureChanged(); | ||
67 | } | ||
68 | |||
69 | /** | ||
70 | * @param projection | ||
71 | * the projection indexer whose tuple groups should be aggregated | ||
72 | */ | ||
73 | public void initializeWith(ProjectionIndexer projection) { | ||
74 | this.projection = projection; | ||
75 | this.sourceWidth = projection.getMask().indices.length; | ||
76 | |||
77 | for (Tuple signature : projection.getSignatures()) { | ||
78 | mainAggregates.put(signature, aggregateGroup(signature, projection.get(signature))); | ||
79 | } | ||
80 | projection.attachListener(new DefaultIndexerListener(this) { | ||
81 | @Override | ||
82 | public void notifyIndexerUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change, Timestamp timestamp) { | ||
83 | aggregateUpdate(direction, updateElement, signature, change); | ||
84 | } | ||
85 | }); | ||
86 | } | ||
87 | |||
88 | /** | ||
89 | * Aggregates (reduces) a group of tuples. The group can be null. | ||
90 | */ | ||
91 | public abstract Object aggregateGroup(Tuple signature, Collection<Tuple> group); | ||
92 | |||
93 | |||
94 | /** | ||
95 | * Aggregates (reduces) a group of tuples, having access to the previous aggregated value (before the update) and | ||
96 | * the update definition. Defaults to aggregateGroup(). Override to increase performance. | ||
97 | * @since 2.4 | ||
98 | */ | ||
99 | public Object aggregateGroupAfterUpdate(Tuple signature, Collection<Tuple> currentGroup, Object oldAggregate, | ||
100 | Direction direction, Tuple updateElement, boolean change) { | ||
101 | return aggregateGroup(signature, currentGroup); | ||
102 | } | ||
103 | |||
104 | protected Tuple aggregateAndPack(Tuple signature, Collection<Tuple> group) { | ||
105 | return packResult(signature, aggregateGroup(signature, group)); | ||
106 | } | ||
107 | |||
108 | @Override | ||
109 | public Indexer getAggregatorOuterIndexer() { | ||
110 | if (aggregatorOuterIndexer == null) { | ||
111 | aggregatorOuterIndexer = new AggregatorOuterIndexer(); | ||
112 | this.getCommunicationTracker().registerDependency(this, aggregatorOuterIndexer); | ||
113 | // reteContainer.connectAndSynchronize(this, aggregatorOuterIndexer); | ||
114 | } | ||
115 | return aggregatorOuterIndexer; | ||
116 | } | ||
117 | |||
118 | @Override | ||
119 | public Indexer getAggregatorOuterIdentityIndexer(int resultPositionInSignature) { | ||
120 | if (aggregatorOuterIdentityIndexers == null) | ||
121 | aggregatorOuterIdentityIndexers = new AggregatorOuterIdentityIndexer[sourceWidth + 1]; | ||
122 | if (aggregatorOuterIdentityIndexers[resultPositionInSignature] == null) { | ||
123 | aggregatorOuterIdentityIndexers[resultPositionInSignature] = new AggregatorOuterIdentityIndexer( | ||
124 | resultPositionInSignature); | ||
125 | this.getCommunicationTracker().registerDependency(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]); | ||
126 | // reteContainer.connectAndSynchronize(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]); | ||
127 | } | ||
128 | return aggregatorOuterIdentityIndexers[resultPositionInSignature]; | ||
129 | } | ||
130 | |||
131 | @Override | ||
132 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
133 | for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) { | ||
134 | collector.add(packResult(aggregateEntry.getKey(), aggregateEntry.getValue())); | ||
135 | } | ||
136 | } | ||
137 | |||
138 | @Override | ||
139 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
140 | // use all zero timestamps because this node cannot be used in recursive groups anyway | ||
141 | for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) { | ||
142 | collector.put(packResult(aggregateEntry.getKey(), aggregateEntry.getValue()), Timestamp.INSERT_AT_ZERO_TIMELINE); | ||
143 | } | ||
144 | } | ||
145 | |||
146 | protected Tuple packResult(Tuple signature, Object result) { | ||
147 | return Tuples.staticArityLeftInheritanceTupleOf(signature, result); | ||
148 | } | ||
149 | |||
150 | /** | ||
151 | * @since 2.4 | ||
152 | */ | ||
153 | protected void aggregateUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change) { | ||
154 | Collection<Tuple> currentGroup = projection.get(signature); | ||
155 | // these will be null if group is empty | ||
156 | Object oldAggregate = mainAggregates.get(signature); | ||
157 | Object safeOldAggregate = oldAggregate == null ? aggregateGroup(signature, null) : oldAggregate; | ||
158 | boolean empty = currentGroup == null || currentGroup.isEmpty(); | ||
159 | Object newAggregate = empty ? null : aggregateGroupAfterUpdate(signature, currentGroup, safeOldAggregate/* | ||
160 | * non-null | ||
161 | */, | ||
162 | direction, updateElement, change); | ||
163 | if (!empty) | ||
164 | mainAggregates.put(signature, newAggregate); | ||
165 | else | ||
166 | mainAggregates.remove(signature); | ||
167 | Tuple oldTuple = packResult(signature, safeOldAggregate); | ||
168 | Tuple newTuple = packResult(signature, newAggregate == null ? aggregateGroup(signature, null) : newAggregate); | ||
169 | if (oldAggregate != null) | ||
170 | propagateUpdate(Direction.DELETE, oldTuple, Timestamp.ZERO); // direct outputs lack non-empty groups | ||
171 | if (newAggregate != null) | ||
172 | propagateUpdate(Direction.INSERT, newTuple, Timestamp.ZERO); // direct outputs lack non-empty groups | ||
173 | if (aggregatorOuterIndexer != null) | ||
174 | aggregatorOuterIndexer.propagate(signature, oldTuple, newTuple); | ||
175 | if (aggregatorOuterIdentityIndexers != null) | ||
176 | for (AggregatorOuterIdentityIndexer aggregatorOuterIdentityIndexer : aggregatorOuterIdentityIndexers) | ||
177 | if (aggregatorOuterIdentityIndexer != null) | ||
178 | aggregatorOuterIdentityIndexer.propagate(signature, oldTuple, newTuple); | ||
179 | } | ||
180 | |||
181 | private Object getAggregate(Tuple signature) { | ||
182 | Object aggregate = mainAggregates.get(signature); | ||
183 | return aggregate == null ? aggregateGroup(signature, null) : aggregate; | ||
184 | } | ||
185 | |||
186 | @Override | ||
187 | public void assignTraceInfo(TraceInfo traceInfo) { | ||
188 | super.assignTraceInfo(traceInfo); | ||
189 | if (traceInfo.propagateToIndexerParent() && projection != null) | ||
190 | projection.acceptPropagatedTraceInfo(traceInfo); | ||
191 | } | ||
192 | |||
193 | /** | ||
194 | * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original | ||
195 | * signature. | ||
196 | * | ||
197 | * @author Gabor Bergmann | ||
198 | */ | ||
199 | class AggregatorOuterIndexer extends StandardIndexer { | ||
200 | |||
201 | public AggregatorOuterIndexer() { | ||
202 | super(me.reteContainer, TupleMask.omit(sourceWidth, sourceWidth + 1)); | ||
203 | this.parent = me; | ||
204 | } | ||
205 | |||
206 | @Override | ||
207 | public Collection<Tuple> get(Tuple signature) { | ||
208 | return Collections.singleton(packResult(signature, getAggregate(signature))); | ||
209 | } | ||
210 | |||
211 | public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) { | ||
212 | propagate(Direction.INSERT, newTuple, signature, false, Timestamp.ZERO); | ||
213 | propagate(Direction.DELETE, oldTuple, signature, false, Timestamp.ZERO); | ||
214 | } | ||
215 | |||
216 | @Override | ||
217 | public Node getActiveNode() { | ||
218 | return projection.getActiveNode(); | ||
219 | } | ||
220 | |||
221 | } | ||
222 | |||
223 | /** | ||
224 | * A special non-iterable index that checks a suspected aggregate value for a given signature. The signature for | ||
225 | * this index is the original signature of the projection index, with the suspected result inserted at position | ||
226 | * resultPositionInSignature. | ||
227 | * | ||
228 | * @author Gabor Bergmann | ||
229 | */ | ||
230 | |||
231 | class AggregatorOuterIdentityIndexer extends StandardIndexer { | ||
232 | int resultPositionInSignature; | ||
233 | TupleMask pruneResult; | ||
234 | TupleMask reorderMask; | ||
235 | |||
236 | public AggregatorOuterIdentityIndexer(int resultPositionInSignature) { | ||
237 | super(me.reteContainer, TupleMask.displace(sourceWidth, resultPositionInSignature, sourceWidth + 1)); | ||
238 | this.parent = me; | ||
239 | // this.localAggregates = new HashMap<Tuple, Tuple>(); | ||
240 | this.resultPositionInSignature = resultPositionInSignature; | ||
241 | this.pruneResult = TupleMask.omit(resultPositionInSignature, sourceWidth + 1); | ||
242 | if (resultPositionInSignature == sourceWidth) | ||
243 | this.reorderMask = null; | ||
244 | else | ||
245 | this.reorderMask = mask; | ||
246 | } | ||
247 | |||
248 | @Override | ||
249 | public Collection<Tuple> get(Tuple signatureWithResult) { | ||
250 | Tuple prunedSignature = pruneResult.transform(signatureWithResult); | ||
251 | Object result = getAggregate(prunedSignature); | ||
252 | if (signatureWithResult.get(resultPositionInSignature).equals(result)) | ||
253 | return Collections.singleton(signatureWithResult); | ||
254 | else | ||
255 | return null; | ||
256 | } | ||
257 | |||
258 | public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) { | ||
259 | propagate(Direction.INSERT, reorder(newTuple), signature, true, Timestamp.ZERO); | ||
260 | propagate(Direction.DELETE, reorder(oldTuple), signature, true, Timestamp.ZERO); | ||
261 | } | ||
262 | |||
263 | private Tuple reorder(Tuple signatureWithResult) { | ||
264 | Tuple transformed; | ||
265 | if (reorderMask == null) | ||
266 | transformed = signatureWithResult; | ||
267 | else | ||
268 | transformed = reorderMask.transform(signatureWithResult); | ||
269 | return transformed; | ||
270 | } | ||
271 | |||
272 | @Override | ||
273 | public Node getActiveNode() { | ||
274 | return projection.getActiveNode(); | ||
275 | } | ||
276 | } | ||
277 | |||
278 | } | ||