aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLibravatar Kristóf Marussy <kristof@marussy.com>2023-10-26 20:37:40 +0200
committerLibravatar Kristóf Marussy <kristof@marussy.com>2023-10-26 20:42:51 +0200
commitc80751c8ec35aab8b3a2bfaf96e4ca82e29815a0 (patch)
tree61e068e0454d3a290dc7bb4ad09e42050b0b2145
parentchore(deps): dedupe Yarn dependencies (diff)
downloadrefinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.tar.gz
refinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.tar.zst
refinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.zip
refactor(interpreter): aggreagator batching
Optimize calls to potentially costly aggregators by only extracting the value from a stateful aggregator when it is needed by subsequent RETE nodes. This optimization only works with timeless evaluation and delete-and-rederive evaluation disabled, i.e., only for queries without any recursion. Potentially, it could also be extended to other mailboxes if needed. We replace the BehaviorChangingMailbox of ColumnAggregatorNode with a DefaultMailbox to force update batching. Batched updates only extract the value from the aggregator when it has been already updated with all received tuples.
-rw-r--r--subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java705
-rw-r--r--subprojects/store-query-interpreter/src/test/java/tools/refinery/store/query/interpreter/AggregatorBatchingTest.java186
2 files changed, 563 insertions, 328 deletions
diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java
index b1a25807..d5f4a0e4 100644
--- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java
+++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java
@@ -1,5 +1,6 @@
1/******************************************************************************* 1/*******************************************************************************
2 * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd. 2 * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd.
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
3 * This program and the accompanying materials are made available under the 4 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at 5 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html. 6 * http://www.eclipse.org/legal/epl-v20.html.
@@ -8,9 +9,6 @@
8 *******************************************************************************/ 9 *******************************************************************************/
9package tools.refinery.interpreter.rete.aggregation; 10package tools.refinery.interpreter.rete.aggregation;
10 11
11import java.util.Map;
12import java.util.Map.Entry;
13
14import tools.refinery.interpreter.matchers.context.IPosetComparator; 12import tools.refinery.interpreter.matchers.context.IPosetComparator;
15import tools.refinery.interpreter.matchers.psystem.aggregations.IMultisetAggregationOperator; 13import tools.refinery.interpreter.matchers.psystem.aggregations.IMultisetAggregationOperator;
16import tools.refinery.interpreter.matchers.tuple.Tuple; 14import tools.refinery.interpreter.matchers.tuple.Tuple;
@@ -26,8 +24,13 @@ import tools.refinery.interpreter.rete.network.communication.Timestamp;
26import tools.refinery.interpreter.rete.network.communication.timeless.RecursiveCommunicationGroup; 24import tools.refinery.interpreter.rete.network.communication.timeless.RecursiveCommunicationGroup;
27import tools.refinery.interpreter.rete.network.mailbox.Mailbox; 25import tools.refinery.interpreter.rete.network.mailbox.Mailbox;
28import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChangingMailbox; 26import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChangingMailbox;
27import tools.refinery.interpreter.rete.network.mailbox.timeless.DefaultMailbox;
29import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbox; 28import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbox;
30 29
30import java.util.Collection;
31import java.util.Map;
32import java.util.Map.Entry;
33
31/** 34/**
32 * Timeless implementation of the column aggregator node. 35 * Timeless implementation of the column aggregator node.
33 * <p> 36 * <p>
@@ -40,330 +43,376 @@ import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbo
40 * @since 1.4 43 * @since 1.4
41 */ 44 */
42public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult> 45public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult>
43 extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> 46 extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult>
44 implements RederivableNode, PosetAwareReceiver { 47 implements RederivableNode, PosetAwareReceiver {
45 48
46 /** 49 /**
47 * @since 1.6 50 * @since 1.6
48 */ 51 */
49 protected final IPosetComparator posetComparator; 52 protected final IPosetComparator posetComparator;
50 53
51 /** 54 /**
52 * @since 1.6 55 * @since 1.6
53 */ 56 */
54 protected final boolean deleteRederiveEvaluation; 57 protected final boolean deleteRederiveEvaluation;
55 58
56 // invariant: neutral values are not stored 59 // invariant: neutral values are not stored
57 /** 60 /**
58 * @since 1.6 61 * @since 1.6
59 */ 62 */
60 protected final Map<Tuple, Accumulator> memory; 63 protected final Map<Tuple, Accumulator> memory;
61 /** 64 /**
62 * @since 1.6 65 * @since 1.6
63 */ 66 */
64 protected final Map<Tuple, Accumulator> rederivableMemory; 67 protected final Map<Tuple, Accumulator> rederivableMemory;
65 68
66 /** 69 /**
67 * @since 1.7 70 * @since 1.7
68 */ 71 */
69 protected CommunicationGroup currentGroup; 72 protected CommunicationGroup currentGroup;
70 73
71 /** 74 /**
72 * Creates a new column aggregator node. 75 * Creates a new column aggregator node.
73 * 76 *
74 * @param reteContainer 77 * @param reteContainer the RETE container of the node
75 * the RETE container of the node 78 * @param operator the aggregation operator
76 * @param operator 79 * @param deleteRederiveEvaluation true if the node should run in DRED mode, false otherwise
77 * the aggregation operator 80 * @param groupMask the mask that masks a tuple to obtain the key that we are grouping-by
78 * @param deleteRederiveEvaluation 81 * @param columnMask the mask that masks a tuple to obtain the tuple element(s) that we are
79 * true if the node should run in DRED mode, false otherwise 82 * aggregating over
80 * @param groupMask 83 * @param posetComparator the poset comparator for the column, if known, otherwise it can be null
81 * the mask that masks a tuple to obtain the key that we are grouping-by 84 * @since 1.6
82 * @param columnMask 85 */
83 * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over 86 public ColumnAggregatorNode(final ReteContainer reteContainer,
84 * @param posetComparator 87 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
85 * the poset comparator for the column, if known, otherwise it can be null 88 final boolean deleteRederiveEvaluation, final TupleMask groupMask,
86 * @since 1.6 89 final TupleMask columnMask,
87 */ 90 final IPosetComparator posetComparator) {
88 public ColumnAggregatorNode(final ReteContainer reteContainer, 91 super(reteContainer, operator, groupMask, columnMask);
89 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, 92 this.memory = CollectionsFactory.createMap();
90 final boolean deleteRederiveEvaluation, final TupleMask groupMask, final TupleMask columnMask, 93 this.rederivableMemory = CollectionsFactory.createMap();
91 final IPosetComparator posetComparator) { 94 this.deleteRederiveEvaluation = deleteRederiveEvaluation;
92 super(reteContainer, operator, groupMask, columnMask); 95 this.posetComparator = posetComparator;
93 this.memory = CollectionsFactory.createMap(); 96 // mailbox MUST be instantiated after the fields are all set
94 this.rederivableMemory = CollectionsFactory.createMap(); 97 this.mailbox = instantiateMailbox();
95 this.deleteRederiveEvaluation = deleteRederiveEvaluation; 98 }
96 this.posetComparator = posetComparator; 99
97 // mailbox MUST be instantiated after the fields are all set 100 /**
98 this.mailbox = instantiateMailbox(); 101 * Creates a new column aggregator node.
99 } 102 *
100 103 * @param reteContainer the RETE container of the node
101 /** 104 * @param operator the aggregation operator
102 * Creates a new column aggregator node. 105 * @param groupMask the mask that masks a tuple to obtain the key that we are grouping-by
103 * 106 * @param aggregatedColumn the index of the column that the aggregator node is aggregating over
104 * @param reteContainer 107 */
105 * the RETE container of the node 108 public ColumnAggregatorNode(final ReteContainer reteContainer,
106 * @param operator 109 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
107 * the aggregation operator 110 final TupleMask groupMask, final int aggregatedColumn) {
108 * @param groupMask 111 this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn,
109 * the mask that masks a tuple to obtain the key that we are grouping-by 112 groupMask.sourceWidth),
110 * @param aggregatedColumn 113 null);
111 * the index of the column that the aggregator node is aggregating over 114 }
112 */ 115
113 public ColumnAggregatorNode(final ReteContainer reteContainer, 116 @Override
114 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, 117 public boolean isInDRedMode() {
115 final TupleMask groupMask, final int aggregatedColumn) { 118 return this.deleteRederiveEvaluation;
116 this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth), 119 }
117 null); 120
118 } 121 @Override
119 122 protected Mailbox instantiateMailbox() {
120 @Override 123 if (groupMask != null && columnMask != null && posetComparator != null) {
121 public boolean isInDRedMode() { 124 return new PosetAwareMailbox(this, this.reteContainer);
122 return this.deleteRederiveEvaluation; 125 } else if (deleteRederiveEvaluation) {
123 } 126 return new BehaviorChangingMailbox(this, this.reteContainer);
124 127 } else {
125 @Override 128 // Disable fall-through to enabled batched updates.
126 protected Mailbox instantiateMailbox() { 129 return new DefaultMailbox(this, this.reteContainer);
127 if (groupMask != null && columnMask != null && posetComparator != null) { 130 }
128 return new PosetAwareMailbox(this, this.reteContainer); 131 }
129 } else { 132
130 return new BehaviorChangingMailbox(this, this.reteContainer); 133 @Override
131 } 134 public TupleMask getCoreMask() {
132 } 135 return groupMask;
133 136 }
134 @Override 137
135 public TupleMask getCoreMask() { 138 @Override
136 return groupMask; 139 public TupleMask getPosetMask() {
137 } 140 return columnMask;
138 141 }
139 @Override 142
140 public TupleMask getPosetMask() { 143 @Override
141 return columnMask; 144 public IPosetComparator getPosetComparator() {
142 } 145 return posetComparator;
143 146 }
144 @Override 147
145 public IPosetComparator getPosetComparator() { 148 @Override
146 return posetComparator; 149 public void rederiveOne() {
147 } 150 final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next();
148 151 final Tuple group = entry.getKey();
149 @Override 152 final Accumulator accumulator = entry.getValue();
150 public void rederiveOne() { 153 rederivableMemory.remove(group);
151 final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next(); 154 memory.put(group, accumulator);
152 final Tuple group = entry.getKey(); 155 // unregister the node if there is nothing left to be re-derived
153 final Accumulator accumulator = entry.getValue(); 156 if (this.rederivableMemory.isEmpty()) {
154 rederivableMemory.remove(group); 157 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
155 memory.put(group, accumulator); 158 }
156 // unregister the node if there is nothing left to be re-derived 159 final AggregateResult value = operator.getAggregate(accumulator);
157 if (this.rederivableMemory.isEmpty()) { 160 propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO);
158 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); 161 }
159 } 162
160 final AggregateResult value = operator.getAggregate(accumulator); 163 @Override
161 propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO); 164 public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) {
162 } 165 if (this.deleteRederiveEvaluation) {
163 166 updateWithDeleteAndRederive(direction, update, monotone);
164 @Override 167 } else {
165 public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) { 168 updateDefault(direction, update, Timestamp.ZERO);
166 if (this.deleteRederiveEvaluation) { 169 }
167 updateWithDeleteAndRederive(direction, update, monotone); 170 }
168 } else { 171
169 updateDefault(direction, update, Timestamp.ZERO); 172 @Override
170 } 173 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
171 } 174 updateWithPosetInfo(direction, update, false);
172 175 }
173 @Override 176
174 public void update(final Direction direction, final Tuple update, final Timestamp timestamp) { 177 /**
175 updateWithPosetInfo(direction, update, false); 178 * @since 2.4
176 } 179 */
177 180 protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) {
178 /** 181 final Tuple key = groupMask.transform(update);
179 * @since 2.4 182 final Tuple value = columnMask.transform(update);
180 */ 183 @SuppressWarnings("unchecked") final Domain aggregableValue =
181 protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) { 184 (Domain) runtimeContext.unwrapElement(value.get(0));
182 final Tuple key = groupMask.transform(update); 185 final boolean isInsertion = direction == Direction.INSERT;
183 final Tuple value = columnMask.transform(update); 186
184 @SuppressWarnings("unchecked") 187 final Accumulator oldMainAccumulator = getMainAccumulator(key);
185 final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); 188 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
186 final boolean isInsertion = direction == Direction.INSERT; 189
187 190 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion);
188 final Accumulator oldMainAccumulator = getMainAccumulator(key); 191 storeIfNotNeutral(key, newMainAccumulator, memory);
189 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); 192 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
190 193
191 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion); 194 propagateAggregateResultUpdate(key, oldValue, newValue, timestamp);
192 storeIfNotNeutral(key, newMainAccumulator, memory); 195 }
193 final AggregateResult newValue = operator.getAggregate(newMainAccumulator); 196
194 197 /**
195 propagateAggregateResultUpdate(key, oldValue, newValue, timestamp); 198 * @since 2.4
196 } 199 */
197 200 protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) {
198 /** 201 final Tuple group = groupMask.transform(update);
199 * @since 2.4 202 final Tuple value = columnMask.transform(update);
200 */ 203 @SuppressWarnings("unchecked") final Domain aggregableValue =
201 protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) { 204 (Domain) runtimeContext.unwrapElement(value.get(0));
202 final Tuple group = groupMask.transform(update); 205 final boolean isInsertion = direction == Direction.INSERT;
203 final Tuple value = columnMask.transform(update); 206
204 @SuppressWarnings("unchecked") 207 Accumulator oldMainAccumulator = memory.get(group);
205 final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); 208 Accumulator oldRederivableAccumulator = rederivableMemory.get(group);
206 final boolean isInsertion = direction == Direction.INSERT; 209
207 210 if (direction == Direction.INSERT) {
208 Accumulator oldMainAccumulator = memory.get(group); 211 // INSERT
209 Accumulator oldRederivableAccumulator = rederivableMemory.get(group); 212 if (oldRederivableAccumulator != null) {
210 213 // the group is in the re-derivable memory
211 if (direction == Direction.INSERT) { 214 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator,
212 // INSERT 215 aggregableValue, isInsertion);
213 if (oldRederivableAccumulator != null) { 216 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory);
214 // the group is in the re-derivable memory 217 if (rederivableMemory.isEmpty()) {
215 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, 218 // there is nothing left to be re-derived
216 aggregableValue, isInsertion); 219 // this can happen if the accumulator became neutral in response to the INSERT
217 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); 220 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
218 if (rederivableMemory.isEmpty()) { 221 }
219 // there is nothing left to be re-derived 222 } else {
220 // this can happen if the accumulator became neutral in response to the INSERT 223 // the group is in the main memory
221 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); 224 // at this point, it can happen that we need to initialize with a neutral accumulator
222 } 225 if (oldMainAccumulator == null) {
223 } else { 226 oldMainAccumulator = operator.createNeutral();
224 // the group is in the main memory 227 }
225 // at this point, it can happen that we need to initialize with a neutral accumulator 228
226 if (oldMainAccumulator == null) { 229 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
227 oldMainAccumulator = operator.createNeutral(); 230 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue,
228 } 231 isInsertion);
229 232 storeIfNotNeutral(group, newMainAccumulator, memory);
230 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); 233 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
231 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, 234 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO);
232 isInsertion); 235 }
233 storeIfNotNeutral(group, newMainAccumulator, memory); 236 } else {
234 final AggregateResult newValue = operator.getAggregate(newMainAccumulator); 237 // DELETE
235 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); 238 if (oldRederivableAccumulator != null) {
236 } 239 // the group is in the re-derivable memory
237 } else { 240 if (oldMainAccumulator != null) {
238 // DELETE 241 issueError("[INTERNAL ERROR] Inconsistent state for " + update
239 if (oldRederivableAccumulator != null) { 242 + " because it is present both in the main and re-derivable memory in the " +
240 // the group is in the re-derivable memory 243 "ColumnAggregatorNode "
241 if (oldMainAccumulator != null) { 244 + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null);
242 issueError("[INTERNAL ERROR] Inconsistent state for " + update 245 }
243 + " because it is present both in the main and re-derivable memory in the ColumnAggregatorNode " 246 try {
244 + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null); 247 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator,
245 } 248 aggregableValue, isInsertion);
246 try { 249 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory);
247 final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, 250 if (rederivableMemory.isEmpty()) {
248 aggregableValue, isInsertion); 251 // there is nothing left to be re-derived
249 storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); 252 // this can happen if the accumulator became neutral in response to the DELETE
250 if (rederivableMemory.isEmpty()) { 253 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this);
251 // there is nothing left to be re-derived 254 }
252 // this can happen if the accumulator became neutral in response to the DELETE 255 } catch (final NullPointerException ex) {
253 ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); 256 issueError("[INTERNAL ERROR] Deleting a domain element in " + update
254 } 257 + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) "
255 } catch (final NullPointerException ex) { 258 + getTraceInfoPatternsEnumerated(), ex);
256 issueError("[INTERNAL ERROR] Deleting a domain element in " + update 259 }
257 + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) " 260 } else {
258 + getTraceInfoPatternsEnumerated(), ex); 261 // the group is in the main memory
259 } 262 // at this point, it can happen that we need to initialize with a neutral accumulator
260 } else { 263 if (oldMainAccumulator == null) {
261 // the group is in the main memory 264 oldMainAccumulator = operator.createNeutral();
262 // at this point, it can happen that we need to initialize with a neutral accumulator 265 }
263 if (oldMainAccumulator == null) { 266
264 oldMainAccumulator = operator.createNeutral(); 267 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator);
265 } 268 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue,
266 269 isInsertion);
267 final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); 270 final AggregateResult newValue = operator.getAggregate(newMainAccumulator);
268 final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, 271
269 isInsertion); 272 if (monotone) {
270 final AggregateResult newValue = operator.getAggregate(newMainAccumulator); 273 storeIfNotNeutral(group, newMainAccumulator, memory);
271 274 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO);
272 if (monotone) { 275 } else {
273 storeIfNotNeutral(group, newMainAccumulator, memory); 276 final boolean wasEmpty = rederivableMemory.isEmpty();
274 propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); 277 if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) {
275 } else { 278 ((RecursiveCommunicationGroup) currentGroup).addRederivable(this);
276 final boolean wasEmpty = rederivableMemory.isEmpty(); 279 }
277 if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) { 280 memory.remove(group);
278 ((RecursiveCommunicationGroup) currentGroup).addRederivable(this); 281 propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO);
279 } 282 }
280 memory.remove(group); 283 }
281 propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO); 284 }
282 } 285 }
283 } 286
284 } 287 @Override
285 } 288 public void batchUpdate(Collection<Entry<Tuple, Integer>> updates, Timestamp timestamp) {
286 289 if (!Timestamp.ZERO.equals(timestamp)) {
287 @Override 290 throw new IllegalArgumentException("Timely operation is not supported");
288 public void clear() { 291 }
289 this.memory.clear(); 292 if (deleteRederiveEvaluation || posetComparator != null) {
290 this.rederivableMemory.clear(); 293 super.batchUpdate(updates, timestamp);
291 this.childMailboxes.clear(); 294 return;
292 } 295 }
293 296 propagateBatchUpdate(updates, timestamp);
294 /** 297 }
295 * Returns true if the accumulator was stored, false otherwise. 298
296 * 299 private void propagateBatchUpdate(Collection<Entry<Tuple, Integer>> updates, Timestamp timestamp) {
297 * @since 1.6 300 if (updates.isEmpty()) {
298 */ 301 return;
299 protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator, 302 }
300 final Map<Tuple, Accumulator> memory) { 303 var oldValues = CollectionsFactory.<Tuple, AggregateResult>createMap();
301 if (operator.isNeutral(accumulator)) { 304 for (var entry : updates) {
302 memory.remove(key); 305 var update = entry.getKey();
303 return false; 306 var key = groupMask.transform(update);
304 } else { 307 var value = columnMask.transform(update);
305 memory.put(key, accumulator); 308 @SuppressWarnings("unchecked")
306 return true; 309 var valueToAggregate = (Domain) runtimeContext.unwrapElement(value.get(0));
307 } 310 int count = entry.getValue();
308 } 311 boolean isInsertion = true;
309 312 if (count < 0) {
310 @Override 313 isInsertion = false;
311 public Tuple getAggregateTuple(final Tuple group) { 314 count = -count;
312 final Accumulator accumulator = getMainAccumulator(group); 315 }
313 final AggregateResult result = operator.getAggregate(accumulator); 316
314 return tupleFromAggregateResult(group, result); 317 var oldMainAccumulator = memory.get(key);
315 } 318 oldValues.computeIfAbsent(key, ignoredKey ->
316 319 oldMainAccumulator == null ? NEUTRAL : operator.getAggregate(oldMainAccumulator));
317 @Override 320 Accumulator newMainAccumulator = oldMainAccumulator == null ? operator.createNeutral() :
318 public AggregateResult getAggregateResult(final Tuple group) { 321 oldMainAccumulator;
319 final Accumulator accumulator = getMainAccumulator(group); 322 for (int i = 0; i < count; i++) {
320 return operator.getAggregate(accumulator); 323 newMainAccumulator = operator.update(newMainAccumulator, valueToAggregate, isInsertion);
321 } 324 }
322 325 storeIfNotNeutral(key, newMainAccumulator, memory);
323 @Override 326 }
324 public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) { 327 for (var entry : oldValues.entrySet()) {
325 throw new UnsupportedOperationException(); 328 var key = entry.getKey();
326 } 329 var oldValue = entry.getValue();
327 330 var newMainAccumulator = getMainAccumulator(key);
328 @Override 331 var newValue = operator.getAggregate(newMainAccumulator);
329 public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) { 332 propagateAggregateResultUpdate(key, oldValue, newValue, timestamp);
330 throw new UnsupportedOperationException(); 333 }
331 } 334 }
332 335
333 /** 336 @Override
334 * @since 1.6 337 public void clear() {
335 */ 338 this.memory.clear();
336 protected Accumulator getMainAccumulator(final Tuple key) { 339 this.rederivableMemory.clear();
337 return getAccumulator(key, memory); 340 this.childMailboxes.clear();
338 } 341 }
339 342
340 /** 343 /**
341 * @since 1.6 344 * Returns true if the accumulator was stored, false otherwise.
342 */ 345 *
343 protected Accumulator getRederivableAccumulator(final Tuple key) { 346 * @since 1.6
344 return getAccumulator(key, rederivableMemory); 347 */
345 } 348 protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator,
346 349 final Map<Tuple, Accumulator> memory) {
347 /** 350 if (operator.isNeutral(accumulator)) {
348 * @since 1.6 351 memory.remove(key);
349 */ 352 return false;
350 protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) { 353 } else {
351 Accumulator accumulator = memory.get(key); 354 memory.put(key, accumulator);
352 if (accumulator == null) { 355 return true;
353 return operator.createNeutral(); 356 }
354 } else { 357 }
355 return accumulator; 358
356 } 359 @Override
357 } 360 public Tuple getAggregateTuple(final Tuple group) {
358 361 final Accumulator accumulator = getMainAccumulator(group);
359 @Override 362 final AggregateResult result = operator.getAggregate(accumulator);
360 public CommunicationGroup getCurrentGroup() { 363 return tupleFromAggregateResult(group, result);
361 return currentGroup; 364 }
362 } 365
363 366 @Override
364 @Override 367 public AggregateResult getAggregateResult(final Tuple group) {
365 public void setCurrentGroup(final CommunicationGroup currentGroup) { 368 final Accumulator accumulator = getMainAccumulator(group);
366 this.currentGroup = currentGroup; 369 return operator.getAggregate(accumulator);
367 } 370 }
371
372 @Override
373 public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) {
374 throw new UnsupportedOperationException();
375 }
376
377 @Override
378 public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) {
379 throw new UnsupportedOperationException();
380 }
381
382 /**
383 * @since 1.6
384 */
385 protected Accumulator getMainAccumulator(final Tuple key) {
386 return getAccumulator(key, memory);
387 }
388
389 /**
390 * @since 1.6
391 */
392 protected Accumulator getRederivableAccumulator(final Tuple key) {
393 return getAccumulator(key, rederivableMemory);
394 }
395
396 /**
397 * @since 1.6
398 */
399 protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) {
400 Accumulator accumulator = memory.get(key);
401 if (accumulator == null) {
402 return operator.createNeutral();
403 } else {
404 return accumulator;
405 }
406 }
407
408 @Override
409 public CommunicationGroup getCurrentGroup() {
410 return currentGroup;
411 }
412
413 @Override
414 public void setCurrentGroup(final CommunicationGroup currentGroup) {
415 this.currentGroup = currentGroup;
416 }
368 417
369} 418}
diff --git a/subprojects/store-query-interpreter/src/test/java/tools/refinery/store/query/interpreter/AggregatorBatchingTest.java b/subprojects/store-query-interpreter/src/test/java/tools/refinery/store/query/interpreter/AggregatorBatchingTest.java
new file mode 100644
index 00000000..d8e06d82
--- /dev/null
+++ b/subprojects/store-query-interpreter/src/test/java/tools/refinery/store/query/interpreter/AggregatorBatchingTest.java
@@ -0,0 +1,186 @@
1/*
2 * SPDX-FileCopyrightText: 2023 The Refinery Authors <https://refinery.tools/>
3 *
4 * SPDX-License-Identifier: EPL-2.0
5 */
6package tools.refinery.store.query.interpreter;
7
8import org.junit.jupiter.api.Test;
9import tools.refinery.store.model.Model;
10import tools.refinery.store.model.ModelStore;
11import tools.refinery.store.query.ModelQueryAdapter;
12import tools.refinery.store.query.dnf.Query;
13import tools.refinery.store.query.term.StatefulAggregate;
14import tools.refinery.store.query.term.StatefulAggregator;
15import tools.refinery.store.query.term.Variable;
16import tools.refinery.store.query.view.AnySymbolView;
17import tools.refinery.store.query.view.FunctionView;
18import tools.refinery.store.query.view.KeyOnlyView;
19import tools.refinery.store.representation.Symbol;
20import tools.refinery.store.tuple.Tuple;
21
22import java.util.Map;
23import java.util.Optional;
24
25import static org.hamcrest.MatcherAssert.assertThat;
26import static org.hamcrest.Matchers.is;
27import static tools.refinery.store.query.interpreter.tests.QueryAssertions.assertNullableResults;
28
29class AggregatorBatchingTest {
30 private static final Symbol<Boolean> person = Symbol.of("Person", 1);
31 private static final Symbol<Integer> values = Symbol.of("values", 2, Integer.class, null);
32 private static final AnySymbolView personView = new KeyOnlyView<>(person);
33 private static final FunctionView<Integer> valuesView = new FunctionView<>(values);
34
35 private final Query<Integer> query = Query.of(Integer.class, (builder, p1, output) -> builder
36 .clause(
37 personView.call(p1),
38 output.assign(valuesView.aggregate(new InstrumentedAggregator(), p1, Variable.of()))
39 ));
40
41 private int extractCount = 0;
42
43 @Test
44 void batchTest() {
45 var model = createModel();
46 var personInterpretation = model.getInterpretation(person);
47 var valuesInterpretation = model.getInterpretation(values);
48 var queryEngine = model.getAdapter(ModelQueryAdapter.class);
49 var resultSet = queryEngine.getResultSet(query);
50
51 assertThat(extractCount, is(1));
52
53 personInterpretation.put(Tuple.of(0), true);
54 personInterpretation.put(Tuple.of(1), true);
55
56 valuesInterpretation.put(Tuple.of(0, 0), 1);
57 valuesInterpretation.put(Tuple.of(0, 1), 2);
58 valuesInterpretation.put(Tuple.of(0, 2), 3);
59 valuesInterpretation.put(Tuple.of(1, 0), 1);
60 valuesInterpretation.put(Tuple.of(1, 1), -1);
61
62 queryEngine.flushChanges();
63
64 assertThat(extractCount, is(5));
65
66 assertNullableResults(Map.of(
67 Tuple.of(0), Optional.of(6),
68 Tuple.of(1), Optional.of(0),
69 Tuple.of(2), Optional.empty()
70 ), resultSet);
71 }
72
73 @Test
74 void separateTest() {
75 var model = createModel();
76 var personInterpretation = model.getInterpretation(person);
77 var valuesInterpretation = model.getInterpretation(values);
78 var queryEngine = model.getAdapter(ModelQueryAdapter.class);
79 var resultSet = queryEngine.getResultSet(query);
80
81 assertThat(extractCount, is(1));
82
83 personInterpretation.put(Tuple.of(0), true);
84 personInterpretation.put(Tuple.of(1), true);
85
86 queryEngine.flushChanges();
87 assertThat(extractCount, is(3));
88
89 valuesInterpretation.put(Tuple.of(0, 0), 1);
90 valuesInterpretation.put(Tuple.of(1, 0), 1);
91
92 queryEngine.flushChanges();
93 assertThat(extractCount, is(5));
94 assertNullableResults(Map.of(
95 Tuple.of(0), Optional.of(1),
96 Tuple.of(1), Optional.of(1),
97 Tuple.of(2), Optional.empty()
98 ), resultSet);
99
100 valuesInterpretation.put(Tuple.of(0, 1), 2);
101 valuesInterpretation.put(Tuple.of(1, 1), -1);
102
103 queryEngine.flushChanges();
104 assertThat(extractCount, is(9));
105 assertNullableResults(Map.of(
106 Tuple.of(0), Optional.of(3),
107 Tuple.of(1), Optional.of(0),
108 Tuple.of(2), Optional.empty()
109 ), resultSet);
110
111 valuesInterpretation.put(Tuple.of(0, 2), 3);
112
113 queryEngine.flushChanges();
114 assertThat(extractCount, is(11));
115 assertNullableResults(Map.of(
116 Tuple.of(0), Optional.of(6),
117 Tuple.of(1), Optional.of(0),
118 Tuple.of(2), Optional.empty()
119 ), resultSet);
120 }
121
122 private Model createModel() {
123 var store = ModelStore.builder()
124 .symbols(person, values)
125 .with(QueryInterpreterAdapter.builder()
126 .query(query))
127 .build();
128 return store.createEmptyModel();
129 }
130
131 class InstrumentedAggregator implements StatefulAggregator<Integer, Integer> {
132 @Override
133 public Class<Integer> getResultType() {
134 return Integer.class;
135 }
136
137 @Override
138 public Class<Integer> getInputType() {
139 return Integer.class;
140 }
141
142 @Override
143 public StatefulAggregate<Integer, Integer> createEmptyAggregate() {
144 return new InstrumentedAggregate();
145 }
146 }
147
148 class InstrumentedAggregate implements StatefulAggregate<Integer, Integer> {
149 private int sum;
150
151 public InstrumentedAggregate() {
152 this(0);
153 }
154
155 private InstrumentedAggregate(int sum) {
156 this.sum = sum;
157 }
158
159
160 @Override
161 public void add(Integer value) {
162 sum += value;
163 }
164
165 @Override
166 public void remove(Integer value) {
167 sum -= value;
168 }
169
170 @Override
171 public Integer getResult() {
172 extractCount++;
173 return sum;
174 }
175
176 @Override
177 public boolean isEmpty() {
178 return sum == 0;
179 }
180
181 @Override
182 public StatefulAggregate<Integer, Integer> deepCopy() {
183 return new InstrumentedAggregate(sum);
184 }
185 }
186}