aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java278
1 files changed, 278 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java
new file mode 100644
index 00000000..d9a94a82
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java
@@ -0,0 +1,278 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2009 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.aggregation;
11
12import java.util.Collection;
13import java.util.Collections;
14import java.util.Map;
15import java.util.Map.Entry;
16
17import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
18import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
19import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
20import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
21import tools.refinery.viatra.runtime.matchers.util.Direction;
22import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
23import tools.refinery.viatra.runtime.rete.index.DefaultIndexerListener;
24import tools.refinery.viatra.runtime.rete.index.Indexer;
25import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
26import tools.refinery.viatra.runtime.rete.index.StandardIndexer;
27import tools.refinery.viatra.runtime.rete.network.Node;
28import tools.refinery.viatra.runtime.rete.network.ReteContainer;
29import tools.refinery.viatra.runtime.rete.network.StandardNode;
30import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
31import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;
32
33/**
34 * A special node depending on a projection indexer to aggregate tuple groups with the same projection. Only propagates
35 * the aggregates of non-empty groups. Use the outer indexers to circumvent.
36 * <p>
37 * This node cannot be used in recursive differential dataflow evaluation.
38 *
39 * @author Gabor Bergmann
40 * @since 1.4
41 */
42public abstract class IndexerBasedAggregatorNode extends StandardNode implements IAggregatorNode {
43
44 ProjectionIndexer projection;
45 IndexerBasedAggregatorNode me;
46 int sourceWidth;
47 Map<Tuple, Object> mainAggregates;
48
49 AggregatorOuterIndexer aggregatorOuterIndexer = null;
50 AggregatorOuterIdentityIndexer[] aggregatorOuterIdentityIndexers = null;
51
52 /**
53 * MUST call initializeWith() afterwards!
54 */
55 public IndexerBasedAggregatorNode(ReteContainer reteContainer) {
56 super(reteContainer);
57 this.me = this;
58 mainAggregates = CollectionsFactory.createMap();
59 }
60
61 @Override
62 public void networkStructureChanged() {
63 if (this.reteContainer.isTimelyEvaluation() && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) {
64 throw new IllegalStateException(this.toString() + " cannot be used in recursive differential dataflow evaluation!");
65 }
66 super.networkStructureChanged();
67 }
68
69 /**
70 * @param projection
71 * the projection indexer whose tuple groups should be aggregated
72 */
73 public void initializeWith(ProjectionIndexer projection) {
74 this.projection = projection;
75 this.sourceWidth = projection.getMask().indices.length;
76
77 for (Tuple signature : projection.getSignatures()) {
78 mainAggregates.put(signature, aggregateGroup(signature, projection.get(signature)));
79 }
80 projection.attachListener(new DefaultIndexerListener(this) {
81 @Override
82 public void notifyIndexerUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change, Timestamp timestamp) {
83 aggregateUpdate(direction, updateElement, signature, change);
84 }
85 });
86 }
87
88 /**
89 * Aggregates (reduces) a group of tuples. The group can be null.
90 */
91 public abstract Object aggregateGroup(Tuple signature, Collection<Tuple> group);
92
93
94 /**
95 * Aggregates (reduces) a group of tuples, having access to the previous aggregated value (before the update) and
96 * the update definition. Defaults to aggregateGroup(). Override to increase performance.
97 * @since 2.4
98 */
99 public Object aggregateGroupAfterUpdate(Tuple signature, Collection<Tuple> currentGroup, Object oldAggregate,
100 Direction direction, Tuple updateElement, boolean change) {
101 return aggregateGroup(signature, currentGroup);
102 }
103
104 protected Tuple aggregateAndPack(Tuple signature, Collection<Tuple> group) {
105 return packResult(signature, aggregateGroup(signature, group));
106 }
107
108 @Override
109 public Indexer getAggregatorOuterIndexer() {
110 if (aggregatorOuterIndexer == null) {
111 aggregatorOuterIndexer = new AggregatorOuterIndexer();
112 this.getCommunicationTracker().registerDependency(this, aggregatorOuterIndexer);
113 // reteContainer.connectAndSynchronize(this, aggregatorOuterIndexer);
114 }
115 return aggregatorOuterIndexer;
116 }
117
118 @Override
119 public Indexer getAggregatorOuterIdentityIndexer(int resultPositionInSignature) {
120 if (aggregatorOuterIdentityIndexers == null)
121 aggregatorOuterIdentityIndexers = new AggregatorOuterIdentityIndexer[sourceWidth + 1];
122 if (aggregatorOuterIdentityIndexers[resultPositionInSignature] == null) {
123 aggregatorOuterIdentityIndexers[resultPositionInSignature] = new AggregatorOuterIdentityIndexer(
124 resultPositionInSignature);
125 this.getCommunicationTracker().registerDependency(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]);
126 // reteContainer.connectAndSynchronize(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]);
127 }
128 return aggregatorOuterIdentityIndexers[resultPositionInSignature];
129 }
130
131 @Override
132 public void pullInto(final Collection<Tuple> collector, final boolean flush) {
133 for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) {
134 collector.add(packResult(aggregateEntry.getKey(), aggregateEntry.getValue()));
135 }
136 }
137
138 @Override
139 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
140 // use all zero timestamps because this node cannot be used in recursive groups anyway
141 for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) {
142 collector.put(packResult(aggregateEntry.getKey(), aggregateEntry.getValue()), Timestamp.INSERT_AT_ZERO_TIMELINE);
143 }
144 }
145
146 protected Tuple packResult(Tuple signature, Object result) {
147 return Tuples.staticArityLeftInheritanceTupleOf(signature, result);
148 }
149
150 /**
151 * @since 2.4
152 */
153 protected void aggregateUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change) {
154 Collection<Tuple> currentGroup = projection.get(signature);
155 // these will be null if group is empty
156 Object oldAggregate = mainAggregates.get(signature);
157 Object safeOldAggregate = oldAggregate == null ? aggregateGroup(signature, null) : oldAggregate;
158 boolean empty = currentGroup == null || currentGroup.isEmpty();
159 Object newAggregate = empty ? null : aggregateGroupAfterUpdate(signature, currentGroup, safeOldAggregate/*
160 * non-null
161 */,
162 direction, updateElement, change);
163 if (!empty)
164 mainAggregates.put(signature, newAggregate);
165 else
166 mainAggregates.remove(signature);
167 Tuple oldTuple = packResult(signature, safeOldAggregate);
168 Tuple newTuple = packResult(signature, newAggregate == null ? aggregateGroup(signature, null) : newAggregate);
169 if (oldAggregate != null)
170 propagateUpdate(Direction.DELETE, oldTuple, Timestamp.ZERO); // direct outputs lack non-empty groups
171 if (newAggregate != null)
172 propagateUpdate(Direction.INSERT, newTuple, Timestamp.ZERO); // direct outputs lack non-empty groups
173 if (aggregatorOuterIndexer != null)
174 aggregatorOuterIndexer.propagate(signature, oldTuple, newTuple);
175 if (aggregatorOuterIdentityIndexers != null)
176 for (AggregatorOuterIdentityIndexer aggregatorOuterIdentityIndexer : aggregatorOuterIdentityIndexers)
177 if (aggregatorOuterIdentityIndexer != null)
178 aggregatorOuterIdentityIndexer.propagate(signature, oldTuple, newTuple);
179 }
180
181 private Object getAggregate(Tuple signature) {
182 Object aggregate = mainAggregates.get(signature);
183 return aggregate == null ? aggregateGroup(signature, null) : aggregate;
184 }
185
186 @Override
187 public void assignTraceInfo(TraceInfo traceInfo) {
188 super.assignTraceInfo(traceInfo);
189 if (traceInfo.propagateToIndexerParent() && projection != null)
190 projection.acceptPropagatedTraceInfo(traceInfo);
191 }
192
193 /**
194 * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original
195 * signature.
196 *
197 * @author Gabor Bergmann
198 */
199 class AggregatorOuterIndexer extends StandardIndexer {
200
201 public AggregatorOuterIndexer() {
202 super(me.reteContainer, TupleMask.omit(sourceWidth, sourceWidth + 1));
203 this.parent = me;
204 }
205
206 @Override
207 public Collection<Tuple> get(Tuple signature) {
208 return Collections.singleton(packResult(signature, getAggregate(signature)));
209 }
210
211 public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) {
212 propagate(Direction.INSERT, newTuple, signature, false, Timestamp.ZERO);
213 propagate(Direction.DELETE, oldTuple, signature, false, Timestamp.ZERO);
214 }
215
216 @Override
217 public Node getActiveNode() {
218 return projection.getActiveNode();
219 }
220
221 }
222
223 /**
224 * A special non-iterable index that checks a suspected aggregate value for a given signature. The signature for
225 * this index is the original signature of the projection index, with the suspected result inserted at position
226 * resultPositionInSignature.
227 *
228 * @author Gabor Bergmann
229 */
230
231 class AggregatorOuterIdentityIndexer extends StandardIndexer {
232 int resultPositionInSignature;
233 TupleMask pruneResult;
234 TupleMask reorderMask;
235
236 public AggregatorOuterIdentityIndexer(int resultPositionInSignature) {
237 super(me.reteContainer, TupleMask.displace(sourceWidth, resultPositionInSignature, sourceWidth + 1));
238 this.parent = me;
239 // this.localAggregates = new HashMap<Tuple, Tuple>();
240 this.resultPositionInSignature = resultPositionInSignature;
241 this.pruneResult = TupleMask.omit(resultPositionInSignature, sourceWidth + 1);
242 if (resultPositionInSignature == sourceWidth)
243 this.reorderMask = null;
244 else
245 this.reorderMask = mask;
246 }
247
248 @Override
249 public Collection<Tuple> get(Tuple signatureWithResult) {
250 Tuple prunedSignature = pruneResult.transform(signatureWithResult);
251 Object result = getAggregate(prunedSignature);
252 if (signatureWithResult.get(resultPositionInSignature).equals(result))
253 return Collections.singleton(signatureWithResult);
254 else
255 return null;
256 }
257
258 public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) {
259 propagate(Direction.INSERT, reorder(newTuple), signature, true, Timestamp.ZERO);
260 propagate(Direction.DELETE, reorder(oldTuple), signature, true, Timestamp.ZERO);
261 }
262
263 private Tuple reorder(Tuple signatureWithResult) {
264 Tuple transformed;
265 if (reorderMask == null)
266 transformed = signatureWithResult;
267 else
268 transformed = reorderMask.transform(signatureWithResult);
269 return transformed;
270 }
271
272 @Override
273 public Node getActiveNode() {
274 return projection.getActiveNode();
275 }
276 }
277
278}