diff options
author | Kristóf Marussy <kristof@marussy.com> | 2023-10-26 20:37:40 +0200 |
---|---|---|
committer | Kristóf Marussy <kristof@marussy.com> | 2023-10-26 20:42:51 +0200 |
commit | c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0 (patch) | |
tree | 61e068e0454d3a290dc7bb4ad09e42050b0b2145 /subprojects/interpreter-rete | |
parent | chore(deps): dedupe Yarn dependencies (diff) | |
download | refinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.tar.gz refinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.tar.zst refinery-c80751c8ec35aab8b3a2bfaf96e4ca82e29815a0.zip |
refactor(interpreter): aggreagator batching
Optimize calls to potentially costly aggregators by only extracting the value
from a stateful aggregator when it is needed by subsequent RETE nodes.
This optimization only works with timeless evaluation and delete-and-rederive
evaluation disabled, i.e., only for queries without any recursion. Potentially,
it could also be extended to other mailboxes if needed.
We replace the BehaviorChangingMailbox of ColumnAggregatorNode with a
DefaultMailbox to force update batching. Batched updates only extract the value
from the aggregator when it has been already updated with all received tuples.
Diffstat (limited to 'subprojects/interpreter-rete')
-rw-r--r-- | subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java | 705 |
1 files changed, 377 insertions, 328 deletions
diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java index b1a25807..d5f4a0e4 100644 --- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/ColumnAggregatorNode.java | |||
@@ -1,5 +1,6 @@ | |||
1 | /******************************************************************************* | 1 | /******************************************************************************* |
2 | * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd. | 2 | * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd. |
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
3 | * This program and the accompanying materials are made available under the | 4 | * This program and the accompanying materials are made available under the |
4 | * terms of the Eclipse Public License v. 2.0 which is available at | 5 | * terms of the Eclipse Public License v. 2.0 which is available at |
5 | * http://www.eclipse.org/legal/epl-v20.html. | 6 | * http://www.eclipse.org/legal/epl-v20.html. |
@@ -8,9 +9,6 @@ | |||
8 | *******************************************************************************/ | 9 | *******************************************************************************/ |
9 | package tools.refinery.interpreter.rete.aggregation; | 10 | package tools.refinery.interpreter.rete.aggregation; |
10 | 11 | ||
11 | import java.util.Map; | ||
12 | import java.util.Map.Entry; | ||
13 | |||
14 | import tools.refinery.interpreter.matchers.context.IPosetComparator; | 12 | import tools.refinery.interpreter.matchers.context.IPosetComparator; |
15 | import tools.refinery.interpreter.matchers.psystem.aggregations.IMultisetAggregationOperator; | 13 | import tools.refinery.interpreter.matchers.psystem.aggregations.IMultisetAggregationOperator; |
16 | import tools.refinery.interpreter.matchers.tuple.Tuple; | 14 | import tools.refinery.interpreter.matchers.tuple.Tuple; |
@@ -26,8 +24,13 @@ import tools.refinery.interpreter.rete.network.communication.Timestamp; | |||
26 | import tools.refinery.interpreter.rete.network.communication.timeless.RecursiveCommunicationGroup; | 24 | import tools.refinery.interpreter.rete.network.communication.timeless.RecursiveCommunicationGroup; |
27 | import tools.refinery.interpreter.rete.network.mailbox.Mailbox; | 25 | import tools.refinery.interpreter.rete.network.mailbox.Mailbox; |
28 | import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChangingMailbox; | 26 | import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChangingMailbox; |
27 | import tools.refinery.interpreter.rete.network.mailbox.timeless.DefaultMailbox; | ||
29 | import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbox; | 28 | import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbox; |
30 | 29 | ||
30 | import java.util.Collection; | ||
31 | import java.util.Map; | ||
32 | import java.util.Map.Entry; | ||
33 | |||
31 | /** | 34 | /** |
32 | * Timeless implementation of the column aggregator node. | 35 | * Timeless implementation of the column aggregator node. |
33 | * <p> | 36 | * <p> |
@@ -40,330 +43,376 @@ import tools.refinery.interpreter.rete.network.mailbox.timeless.PosetAwareMailbo | |||
40 | * @since 1.4 | 43 | * @since 1.4 |
41 | */ | 44 | */ |
42 | public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult> | 45 | public class ColumnAggregatorNode<Domain, Accumulator, AggregateResult> |
43 | extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> | 46 | extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> |
44 | implements RederivableNode, PosetAwareReceiver { | 47 | implements RederivableNode, PosetAwareReceiver { |
45 | 48 | ||
46 | /** | 49 | /** |
47 | * @since 1.6 | 50 | * @since 1.6 |
48 | */ | 51 | */ |
49 | protected final IPosetComparator posetComparator; | 52 | protected final IPosetComparator posetComparator; |
50 | 53 | ||
51 | /** | 54 | /** |
52 | * @since 1.6 | 55 | * @since 1.6 |
53 | */ | 56 | */ |
54 | protected final boolean deleteRederiveEvaluation; | 57 | protected final boolean deleteRederiveEvaluation; |
55 | 58 | ||
56 | // invariant: neutral values are not stored | 59 | // invariant: neutral values are not stored |
57 | /** | 60 | /** |
58 | * @since 1.6 | 61 | * @since 1.6 |
59 | */ | 62 | */ |
60 | protected final Map<Tuple, Accumulator> memory; | 63 | protected final Map<Tuple, Accumulator> memory; |
61 | /** | 64 | /** |
62 | * @since 1.6 | 65 | * @since 1.6 |
63 | */ | 66 | */ |
64 | protected final Map<Tuple, Accumulator> rederivableMemory; | 67 | protected final Map<Tuple, Accumulator> rederivableMemory; |
65 | 68 | ||
66 | /** | 69 | /** |
67 | * @since 1.7 | 70 | * @since 1.7 |
68 | */ | 71 | */ |
69 | protected CommunicationGroup currentGroup; | 72 | protected CommunicationGroup currentGroup; |
70 | 73 | ||
71 | /** | 74 | /** |
72 | * Creates a new column aggregator node. | 75 | * Creates a new column aggregator node. |
73 | * | 76 | * |
74 | * @param reteContainer | 77 | * @param reteContainer the RETE container of the node |
75 | * the RETE container of the node | 78 | * @param operator the aggregation operator |
76 | * @param operator | 79 | * @param deleteRederiveEvaluation true if the node should run in DRED mode, false otherwise |
77 | * the aggregation operator | 80 | * @param groupMask the mask that masks a tuple to obtain the key that we are grouping-by |
78 | * @param deleteRederiveEvaluation | 81 | * @param columnMask the mask that masks a tuple to obtain the tuple element(s) that we are |
79 | * true if the node should run in DRED mode, false otherwise | 82 | * aggregating over |
80 | * @param groupMask | 83 | * @param posetComparator the poset comparator for the column, if known, otherwise it can be null |
81 | * the mask that masks a tuple to obtain the key that we are grouping-by | 84 | * @since 1.6 |
82 | * @param columnMask | 85 | */ |
83 | * the mask that masks a tuple to obtain the tuple element(s) that we are aggregating over | 86 | public ColumnAggregatorNode(final ReteContainer reteContainer, |
84 | * @param posetComparator | 87 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, |
85 | * the poset comparator for the column, if known, otherwise it can be null | 88 | final boolean deleteRederiveEvaluation, final TupleMask groupMask, |
86 | * @since 1.6 | 89 | final TupleMask columnMask, |
87 | */ | 90 | final IPosetComparator posetComparator) { |
88 | public ColumnAggregatorNode(final ReteContainer reteContainer, | 91 | super(reteContainer, operator, groupMask, columnMask); |
89 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | 92 | this.memory = CollectionsFactory.createMap(); |
90 | final boolean deleteRederiveEvaluation, final TupleMask groupMask, final TupleMask columnMask, | 93 | this.rederivableMemory = CollectionsFactory.createMap(); |
91 | final IPosetComparator posetComparator) { | 94 | this.deleteRederiveEvaluation = deleteRederiveEvaluation; |
92 | super(reteContainer, operator, groupMask, columnMask); | 95 | this.posetComparator = posetComparator; |
93 | this.memory = CollectionsFactory.createMap(); | 96 | // mailbox MUST be instantiated after the fields are all set |
94 | this.rederivableMemory = CollectionsFactory.createMap(); | 97 | this.mailbox = instantiateMailbox(); |
95 | this.deleteRederiveEvaluation = deleteRederiveEvaluation; | 98 | } |
96 | this.posetComparator = posetComparator; | 99 | |
97 | // mailbox MUST be instantiated after the fields are all set | 100 | /** |
98 | this.mailbox = instantiateMailbox(); | 101 | * Creates a new column aggregator node. |
99 | } | 102 | * |
100 | 103 | * @param reteContainer the RETE container of the node | |
101 | /** | 104 | * @param operator the aggregation operator |
102 | * Creates a new column aggregator node. | 105 | * @param groupMask the mask that masks a tuple to obtain the key that we are grouping-by |
103 | * | 106 | * @param aggregatedColumn the index of the column that the aggregator node is aggregating over |
104 | * @param reteContainer | 107 | */ |
105 | * the RETE container of the node | 108 | public ColumnAggregatorNode(final ReteContainer reteContainer, |
106 | * @param operator | 109 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, |
107 | * the aggregation operator | 110 | final TupleMask groupMask, final int aggregatedColumn) { |
108 | * @param groupMask | 111 | this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn, |
109 | * the mask that masks a tuple to obtain the key that we are grouping-by | 112 | groupMask.sourceWidth), |
110 | * @param aggregatedColumn | 113 | null); |
111 | * the index of the column that the aggregator node is aggregating over | 114 | } |
112 | */ | 115 | |
113 | public ColumnAggregatorNode(final ReteContainer reteContainer, | 116 | @Override |
114 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | 117 | public boolean isInDRedMode() { |
115 | final TupleMask groupMask, final int aggregatedColumn) { | 118 | return this.deleteRederiveEvaluation; |
116 | this(reteContainer, operator, false, groupMask, TupleMask.selectSingle(aggregatedColumn, groupMask.sourceWidth), | 119 | } |
117 | null); | 120 | |
118 | } | 121 | @Override |
119 | 122 | protected Mailbox instantiateMailbox() { | |
120 | @Override | 123 | if (groupMask != null && columnMask != null && posetComparator != null) { |
121 | public boolean isInDRedMode() { | 124 | return new PosetAwareMailbox(this, this.reteContainer); |
122 | return this.deleteRederiveEvaluation; | 125 | } else if (deleteRederiveEvaluation) { |
123 | } | 126 | return new BehaviorChangingMailbox(this, this.reteContainer); |
124 | 127 | } else { | |
125 | @Override | 128 | // Disable fall-through to enabled batched updates. |
126 | protected Mailbox instantiateMailbox() { | 129 | return new DefaultMailbox(this, this.reteContainer); |
127 | if (groupMask != null && columnMask != null && posetComparator != null) { | 130 | } |
128 | return new PosetAwareMailbox(this, this.reteContainer); | 131 | } |
129 | } else { | 132 | |
130 | return new BehaviorChangingMailbox(this, this.reteContainer); | 133 | @Override |
131 | } | 134 | public TupleMask getCoreMask() { |
132 | } | 135 | return groupMask; |
133 | 136 | } | |
134 | @Override | 137 | |
135 | public TupleMask getCoreMask() { | 138 | @Override |
136 | return groupMask; | 139 | public TupleMask getPosetMask() { |
137 | } | 140 | return columnMask; |
138 | 141 | } | |
139 | @Override | 142 | |
140 | public TupleMask getPosetMask() { | 143 | @Override |
141 | return columnMask; | 144 | public IPosetComparator getPosetComparator() { |
142 | } | 145 | return posetComparator; |
143 | 146 | } | |
144 | @Override | 147 | |
145 | public IPosetComparator getPosetComparator() { | 148 | @Override |
146 | return posetComparator; | 149 | public void rederiveOne() { |
147 | } | 150 | final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next(); |
148 | 151 | final Tuple group = entry.getKey(); | |
149 | @Override | 152 | final Accumulator accumulator = entry.getValue(); |
150 | public void rederiveOne() { | 153 | rederivableMemory.remove(group); |
151 | final Entry<Tuple, Accumulator> entry = rederivableMemory.entrySet().iterator().next(); | 154 | memory.put(group, accumulator); |
152 | final Tuple group = entry.getKey(); | 155 | // unregister the node if there is nothing left to be re-derived |
153 | final Accumulator accumulator = entry.getValue(); | 156 | if (this.rederivableMemory.isEmpty()) { |
154 | rederivableMemory.remove(group); | 157 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); |
155 | memory.put(group, accumulator); | 158 | } |
156 | // unregister the node if there is nothing left to be re-derived | 159 | final AggregateResult value = operator.getAggregate(accumulator); |
157 | if (this.rederivableMemory.isEmpty()) { | 160 | propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO); |
158 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | 161 | } |
159 | } | 162 | |
160 | final AggregateResult value = operator.getAggregate(accumulator); | 163 | @Override |
161 | propagateAggregateResultUpdate(group, NEUTRAL, value, Timestamp.ZERO); | 164 | public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) { |
162 | } | 165 | if (this.deleteRederiveEvaluation) { |
163 | 166 | updateWithDeleteAndRederive(direction, update, monotone); | |
164 | @Override | 167 | } else { |
165 | public void updateWithPosetInfo(final Direction direction, final Tuple update, final boolean monotone) { | 168 | updateDefault(direction, update, Timestamp.ZERO); |
166 | if (this.deleteRederiveEvaluation) { | 169 | } |
167 | updateWithDeleteAndRederive(direction, update, monotone); | 170 | } |
168 | } else { | 171 | |
169 | updateDefault(direction, update, Timestamp.ZERO); | 172 | @Override |
170 | } | 173 | public void update(final Direction direction, final Tuple update, final Timestamp timestamp) { |
171 | } | 174 | updateWithPosetInfo(direction, update, false); |
172 | 175 | } | |
173 | @Override | 176 | |
174 | public void update(final Direction direction, final Tuple update, final Timestamp timestamp) { | 177 | /** |
175 | updateWithPosetInfo(direction, update, false); | 178 | * @since 2.4 |
176 | } | 179 | */ |
177 | 180 | protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) { | |
178 | /** | 181 | final Tuple key = groupMask.transform(update); |
179 | * @since 2.4 | 182 | final Tuple value = columnMask.transform(update); |
180 | */ | 183 | @SuppressWarnings("unchecked") final Domain aggregableValue = |
181 | protected void updateDefault(final Direction direction, final Tuple update, final Timestamp timestamp) { | 184 | (Domain) runtimeContext.unwrapElement(value.get(0)); |
182 | final Tuple key = groupMask.transform(update); | 185 | final boolean isInsertion = direction == Direction.INSERT; |
183 | final Tuple value = columnMask.transform(update); | 186 | |
184 | @SuppressWarnings("unchecked") | 187 | final Accumulator oldMainAccumulator = getMainAccumulator(key); |
185 | final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); | 188 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); |
186 | final boolean isInsertion = direction == Direction.INSERT; | 189 | |
187 | 190 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion); | |
188 | final Accumulator oldMainAccumulator = getMainAccumulator(key); | 191 | storeIfNotNeutral(key, newMainAccumulator, memory); |
189 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | 192 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); |
190 | 193 | ||
191 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, isInsertion); | 194 | propagateAggregateResultUpdate(key, oldValue, newValue, timestamp); |
192 | storeIfNotNeutral(key, newMainAccumulator, memory); | 195 | } |
193 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | 196 | |
194 | 197 | /** | |
195 | propagateAggregateResultUpdate(key, oldValue, newValue, timestamp); | 198 | * @since 2.4 |
196 | } | 199 | */ |
197 | 200 | protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) { | |
198 | /** | 201 | final Tuple group = groupMask.transform(update); |
199 | * @since 2.4 | 202 | final Tuple value = columnMask.transform(update); |
200 | */ | 203 | @SuppressWarnings("unchecked") final Domain aggregableValue = |
201 | protected void updateWithDeleteAndRederive(final Direction direction, final Tuple update, final boolean monotone) { | 204 | (Domain) runtimeContext.unwrapElement(value.get(0)); |
202 | final Tuple group = groupMask.transform(update); | 205 | final boolean isInsertion = direction == Direction.INSERT; |
203 | final Tuple value = columnMask.transform(update); | 206 | |
204 | @SuppressWarnings("unchecked") | 207 | Accumulator oldMainAccumulator = memory.get(group); |
205 | final Domain aggregableValue = (Domain) runtimeContext.unwrapElement(value.get(0)); | 208 | Accumulator oldRederivableAccumulator = rederivableMemory.get(group); |
206 | final boolean isInsertion = direction == Direction.INSERT; | 209 | |
207 | 210 | if (direction == Direction.INSERT) { | |
208 | Accumulator oldMainAccumulator = memory.get(group); | 211 | // INSERT |
209 | Accumulator oldRederivableAccumulator = rederivableMemory.get(group); | 212 | if (oldRederivableAccumulator != null) { |
210 | 213 | // the group is in the re-derivable memory | |
211 | if (direction == Direction.INSERT) { | 214 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, |
212 | // INSERT | 215 | aggregableValue, isInsertion); |
213 | if (oldRederivableAccumulator != null) { | 216 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); |
214 | // the group is in the re-derivable memory | 217 | if (rederivableMemory.isEmpty()) { |
215 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, | 218 | // there is nothing left to be re-derived |
216 | aggregableValue, isInsertion); | 219 | // this can happen if the accumulator became neutral in response to the INSERT |
217 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); | 220 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); |
218 | if (rederivableMemory.isEmpty()) { | 221 | } |
219 | // there is nothing left to be re-derived | 222 | } else { |
220 | // this can happen if the accumulator became neutral in response to the INSERT | 223 | // the group is in the main memory |
221 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | 224 | // at this point, it can happen that we need to initialize with a neutral accumulator |
222 | } | 225 | if (oldMainAccumulator == null) { |
223 | } else { | 226 | oldMainAccumulator = operator.createNeutral(); |
224 | // the group is in the main memory | 227 | } |
225 | // at this point, it can happen that we need to initialize with a neutral accumulator | 228 | |
226 | if (oldMainAccumulator == null) { | 229 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); |
227 | oldMainAccumulator = operator.createNeutral(); | 230 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, |
228 | } | 231 | isInsertion); |
229 | 232 | storeIfNotNeutral(group, newMainAccumulator, memory); | |
230 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | 233 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); |
231 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, | 234 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); |
232 | isInsertion); | 235 | } |
233 | storeIfNotNeutral(group, newMainAccumulator, memory); | 236 | } else { |
234 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | 237 | // DELETE |
235 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); | 238 | if (oldRederivableAccumulator != null) { |
236 | } | 239 | // the group is in the re-derivable memory |
237 | } else { | 240 | if (oldMainAccumulator != null) { |
238 | // DELETE | 241 | issueError("[INTERNAL ERROR] Inconsistent state for " + update |
239 | if (oldRederivableAccumulator != null) { | 242 | + " because it is present both in the main and re-derivable memory in the " + |
240 | // the group is in the re-derivable memory | 243 | "ColumnAggregatorNode " |
241 | if (oldMainAccumulator != null) { | 244 | + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null); |
242 | issueError("[INTERNAL ERROR] Inconsistent state for " + update | 245 | } |
243 | + " because it is present both in the main and re-derivable memory in the ColumnAggregatorNode " | 246 | try { |
244 | + this + " for pattern(s) " + getTraceInfoPatternsEnumerated(), null); | 247 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, |
245 | } | 248 | aggregableValue, isInsertion); |
246 | try { | 249 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); |
247 | final Accumulator newRederivableAccumulator = operator.update(oldRederivableAccumulator, | 250 | if (rederivableMemory.isEmpty()) { |
248 | aggregableValue, isInsertion); | 251 | // there is nothing left to be re-derived |
249 | storeIfNotNeutral(group, newRederivableAccumulator, rederivableMemory); | 252 | // this can happen if the accumulator became neutral in response to the DELETE |
250 | if (rederivableMemory.isEmpty()) { | 253 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); |
251 | // there is nothing left to be re-derived | 254 | } |
252 | // this can happen if the accumulator became neutral in response to the DELETE | 255 | } catch (final NullPointerException ex) { |
253 | ((RecursiveCommunicationGroup) currentGroup).removeRederivable(this); | 256 | issueError("[INTERNAL ERROR] Deleting a domain element in " + update |
254 | } | 257 | + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) " |
255 | } catch (final NullPointerException ex) { | 258 | + getTraceInfoPatternsEnumerated(), ex); |
256 | issueError("[INTERNAL ERROR] Deleting a domain element in " + update | 259 | } |
257 | + " which did not exist before in ColumnAggregatorNode " + this + " for pattern(s) " | 260 | } else { |
258 | + getTraceInfoPatternsEnumerated(), ex); | 261 | // the group is in the main memory |
259 | } | 262 | // at this point, it can happen that we need to initialize with a neutral accumulator |
260 | } else { | 263 | if (oldMainAccumulator == null) { |
261 | // the group is in the main memory | 264 | oldMainAccumulator = operator.createNeutral(); |
262 | // at this point, it can happen that we need to initialize with a neutral accumulator | 265 | } |
263 | if (oldMainAccumulator == null) { | 266 | |
264 | oldMainAccumulator = operator.createNeutral(); | 267 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); |
265 | } | 268 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, |
266 | 269 | isInsertion); | |
267 | final AggregateResult oldValue = operator.getAggregate(oldMainAccumulator); | 270 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); |
268 | final Accumulator newMainAccumulator = operator.update(oldMainAccumulator, aggregableValue, | 271 | |
269 | isInsertion); | 272 | if (monotone) { |
270 | final AggregateResult newValue = operator.getAggregate(newMainAccumulator); | 273 | storeIfNotNeutral(group, newMainAccumulator, memory); |
271 | 274 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); | |
272 | if (monotone) { | 275 | } else { |
273 | storeIfNotNeutral(group, newMainAccumulator, memory); | 276 | final boolean wasEmpty = rederivableMemory.isEmpty(); |
274 | propagateAggregateResultUpdate(group, oldValue, newValue, Timestamp.ZERO); | 277 | if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) { |
275 | } else { | 278 | ((RecursiveCommunicationGroup) currentGroup).addRederivable(this); |
276 | final boolean wasEmpty = rederivableMemory.isEmpty(); | 279 | } |
277 | if (storeIfNotNeutral(group, newMainAccumulator, rederivableMemory) && wasEmpty) { | 280 | memory.remove(group); |
278 | ((RecursiveCommunicationGroup) currentGroup).addRederivable(this); | 281 | propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO); |
279 | } | 282 | } |
280 | memory.remove(group); | 283 | } |
281 | propagateAggregateResultUpdate(group, oldValue, NEUTRAL, Timestamp.ZERO); | 284 | } |
282 | } | 285 | } |
283 | } | 286 | |
284 | } | 287 | @Override |
285 | } | 288 | public void batchUpdate(Collection<Entry<Tuple, Integer>> updates, Timestamp timestamp) { |
286 | 289 | if (!Timestamp.ZERO.equals(timestamp)) { | |
287 | @Override | 290 | throw new IllegalArgumentException("Timely operation is not supported"); |
288 | public void clear() { | 291 | } |
289 | this.memory.clear(); | 292 | if (deleteRederiveEvaluation || posetComparator != null) { |
290 | this.rederivableMemory.clear(); | 293 | super.batchUpdate(updates, timestamp); |
291 | this.childMailboxes.clear(); | 294 | return; |
292 | } | 295 | } |
293 | 296 | propagateBatchUpdate(updates, timestamp); | |
294 | /** | 297 | } |
295 | * Returns true if the accumulator was stored, false otherwise. | 298 | |
296 | * | 299 | private void propagateBatchUpdate(Collection<Entry<Tuple, Integer>> updates, Timestamp timestamp) { |
297 | * @since 1.6 | 300 | if (updates.isEmpty()) { |
298 | */ | 301 | return; |
299 | protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator, | 302 | } |
300 | final Map<Tuple, Accumulator> memory) { | 303 | var oldValues = CollectionsFactory.<Tuple, AggregateResult>createMap(); |
301 | if (operator.isNeutral(accumulator)) { | 304 | for (var entry : updates) { |
302 | memory.remove(key); | 305 | var update = entry.getKey(); |
303 | return false; | 306 | var key = groupMask.transform(update); |
304 | } else { | 307 | var value = columnMask.transform(update); |
305 | memory.put(key, accumulator); | 308 | @SuppressWarnings("unchecked") |
306 | return true; | 309 | var valueToAggregate = (Domain) runtimeContext.unwrapElement(value.get(0)); |
307 | } | 310 | int count = entry.getValue(); |
308 | } | 311 | boolean isInsertion = true; |
309 | 312 | if (count < 0) { | |
310 | @Override | 313 | isInsertion = false; |
311 | public Tuple getAggregateTuple(final Tuple group) { | 314 | count = -count; |
312 | final Accumulator accumulator = getMainAccumulator(group); | 315 | } |
313 | final AggregateResult result = operator.getAggregate(accumulator); | 316 | |
314 | return tupleFromAggregateResult(group, result); | 317 | var oldMainAccumulator = memory.get(key); |
315 | } | 318 | oldValues.computeIfAbsent(key, ignoredKey -> |
316 | 319 | oldMainAccumulator == null ? NEUTRAL : operator.getAggregate(oldMainAccumulator)); | |
317 | @Override | 320 | Accumulator newMainAccumulator = oldMainAccumulator == null ? operator.createNeutral() : |
318 | public AggregateResult getAggregateResult(final Tuple group) { | 321 | oldMainAccumulator; |
319 | final Accumulator accumulator = getMainAccumulator(group); | 322 | for (int i = 0; i < count; i++) { |
320 | return operator.getAggregate(accumulator); | 323 | newMainAccumulator = operator.update(newMainAccumulator, valueToAggregate, isInsertion); |
321 | } | 324 | } |
322 | 325 | storeIfNotNeutral(key, newMainAccumulator, memory); | |
323 | @Override | 326 | } |
324 | public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) { | 327 | for (var entry : oldValues.entrySet()) { |
325 | throw new UnsupportedOperationException(); | 328 | var key = entry.getKey(); |
326 | } | 329 | var oldValue = entry.getValue(); |
327 | 330 | var newMainAccumulator = getMainAccumulator(key); | |
328 | @Override | 331 | var newValue = operator.getAggregate(newMainAccumulator); |
329 | public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) { | 332 | propagateAggregateResultUpdate(key, oldValue, newValue, timestamp); |
330 | throw new UnsupportedOperationException(); | 333 | } |
331 | } | 334 | } |
332 | 335 | ||
333 | /** | 336 | @Override |
334 | * @since 1.6 | 337 | public void clear() { |
335 | */ | 338 | this.memory.clear(); |
336 | protected Accumulator getMainAccumulator(final Tuple key) { | 339 | this.rederivableMemory.clear(); |
337 | return getAccumulator(key, memory); | 340 | this.childMailboxes.clear(); |
338 | } | 341 | } |
339 | 342 | ||
340 | /** | 343 | /** |
341 | * @since 1.6 | 344 | * Returns true if the accumulator was stored, false otherwise. |
342 | */ | 345 | * |
343 | protected Accumulator getRederivableAccumulator(final Tuple key) { | 346 | * @since 1.6 |
344 | return getAccumulator(key, rederivableMemory); | 347 | */ |
345 | } | 348 | protected boolean storeIfNotNeutral(final Tuple key, final Accumulator accumulator, |
346 | 349 | final Map<Tuple, Accumulator> memory) { | |
347 | /** | 350 | if (operator.isNeutral(accumulator)) { |
348 | * @since 1.6 | 351 | memory.remove(key); |
349 | */ | 352 | return false; |
350 | protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) { | 353 | } else { |
351 | Accumulator accumulator = memory.get(key); | 354 | memory.put(key, accumulator); |
352 | if (accumulator == null) { | 355 | return true; |
353 | return operator.createNeutral(); | 356 | } |
354 | } else { | 357 | } |
355 | return accumulator; | 358 | |
356 | } | 359 | @Override |
357 | } | 360 | public Tuple getAggregateTuple(final Tuple group) { |
358 | 361 | final Accumulator accumulator = getMainAccumulator(group); | |
359 | @Override | 362 | final AggregateResult result = operator.getAggregate(accumulator); |
360 | public CommunicationGroup getCurrentGroup() { | 363 | return tupleFromAggregateResult(group, result); |
361 | return currentGroup; | 364 | } |
362 | } | 365 | |
363 | 366 | @Override | |
364 | @Override | 367 | public AggregateResult getAggregateResult(final Tuple group) { |
365 | public void setCurrentGroup(final CommunicationGroup currentGroup) { | 368 | final Accumulator accumulator = getMainAccumulator(group); |
366 | this.currentGroup = currentGroup; | 369 | return operator.getAggregate(accumulator); |
367 | } | 370 | } |
371 | |||
372 | @Override | ||
373 | public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(Tuple key) { | ||
374 | throw new UnsupportedOperationException(); | ||
375 | } | ||
376 | |||
377 | @Override | ||
378 | public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(Tuple key) { | ||
379 | throw new UnsupportedOperationException(); | ||
380 | } | ||
381 | |||
382 | /** | ||
383 | * @since 1.6 | ||
384 | */ | ||
385 | protected Accumulator getMainAccumulator(final Tuple key) { | ||
386 | return getAccumulator(key, memory); | ||
387 | } | ||
388 | |||
389 | /** | ||
390 | * @since 1.6 | ||
391 | */ | ||
392 | protected Accumulator getRederivableAccumulator(final Tuple key) { | ||
393 | return getAccumulator(key, rederivableMemory); | ||
394 | } | ||
395 | |||
396 | /** | ||
397 | * @since 1.6 | ||
398 | */ | ||
399 | protected Accumulator getAccumulator(final Tuple key, final Map<Tuple, Accumulator> memory) { | ||
400 | Accumulator accumulator = memory.get(key); | ||
401 | if (accumulator == null) { | ||
402 | return operator.createNeutral(); | ||
403 | } else { | ||
404 | return accumulator; | ||
405 | } | ||
406 | } | ||
407 | |||
408 | @Override | ||
409 | public CommunicationGroup getCurrentGroup() { | ||
410 | return currentGroup; | ||
411 | } | ||
412 | |||
413 | @Override | ||
414 | public void setCurrentGroup(final CommunicationGroup currentGroup) { | ||
415 | this.currentGroup = currentGroup; | ||
416 | } | ||
368 | 417 | ||
369 | } | 418 | } |