diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/eval/OutputCachingEvaluatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/eval/OutputCachingEvaluatorNode.java | 311 |
1 files changed, 311 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/eval/OutputCachingEvaluatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/eval/OutputCachingEvaluatorNode.java new file mode 100644 index 00000000..40a20c4e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/eval/OutputCachingEvaluatorNode.java | |||
@@ -0,0 +1,311 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2013, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.eval; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Iterator; | ||
14 | import java.util.Map; | ||
15 | import java.util.Map.Entry; | ||
16 | |||
17 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
18 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
21 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.Signed; | ||
23 | import tools.refinery.viatra.runtime.matchers.util.TimelyMemory; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.timeline.Diff; | ||
25 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
26 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
27 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
28 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
29 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
30 | import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode; | ||
31 | |||
32 | /** | ||
33 | * An evaluator node that caches the evaluation result. This node is also capable of caching the timestamps associated | ||
34 | * with the result tuples if it is used in recursive differential dataflow evaluation. | ||
35 | * | ||
36 | * @author Bergmann Gabor | ||
37 | * @author Tamas Szabo | ||
38 | */ | ||
39 | public class OutputCachingEvaluatorNode extends AbstractEvaluatorNode implements Clearable, ResumableNode { | ||
40 | |||
41 | /** | ||
42 | * @since 2.3 | ||
43 | */ | ||
44 | protected NetworkStructureChangeSensitiveLogic logic; | ||
45 | |||
46 | /** | ||
47 | * @since 2.4 | ||
48 | */ | ||
49 | protected Map<Tuple, Iterable<Tuple>> outputCache; | ||
50 | |||
51 | /** | ||
52 | * Maps input tuples to timestamps. It is wrong to map evaluation result to timestamps because the different input | ||
53 | * tuples may yield the same evaluation result. This field is null as long as this node is in a non-recursive group. | ||
54 | * | ||
55 | * @since 2.4 | ||
56 | */ | ||
57 | protected TimelyMemory<Timestamp> memory; | ||
58 | |||
59 | /** | ||
60 | * @since 2.4 | ||
61 | */ | ||
62 | protected CommunicationGroup group; | ||
63 | |||
64 | /** | ||
65 | * @since 1.5 | ||
66 | */ | ||
67 | public OutputCachingEvaluatorNode(final ReteContainer reteContainer, final EvaluatorCore core) { | ||
68 | super(reteContainer, core); | ||
69 | reteContainer.registerClearable(this); | ||
70 | this.outputCache = CollectionsFactory.createMap(); | ||
71 | this.logic = createLogic(); | ||
72 | } | ||
73 | |||
74 | @Override | ||
75 | public CommunicationGroup getCurrentGroup() { | ||
76 | return this.group; | ||
77 | } | ||
78 | |||
79 | @Override | ||
80 | public void setCurrentGroup(final CommunicationGroup group) { | ||
81 | this.group = group; | ||
82 | } | ||
83 | |||
84 | @Override | ||
85 | public void networkStructureChanged() { | ||
86 | super.networkStructureChanged(); | ||
87 | this.logic = createLogic(); | ||
88 | } | ||
89 | |||
90 | @Override | ||
91 | public void clear() { | ||
92 | this.outputCache.clear(); | ||
93 | if (this.memory != null) { | ||
94 | this.memory.clear(); | ||
95 | } | ||
96 | } | ||
97 | |||
98 | /** | ||
99 | * @since 2.3 | ||
100 | */ | ||
101 | protected NetworkStructureChangeSensitiveLogic createLogic() { | ||
102 | if (this.reteContainer.isTimelyEvaluation() | ||
103 | && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) { | ||
104 | if (this.memory == null) { | ||
105 | this.memory = new TimelyMemory<Timestamp>(reteContainer.isTimelyEvaluation() && reteContainer | ||
106 | .getTimelyConfiguration().getTimelineRepresentation() == TimelineRepresentation.FAITHFUL); | ||
107 | } | ||
108 | return TIMELY; | ||
109 | } else { | ||
110 | return TIMELESS; | ||
111 | } | ||
112 | } | ||
113 | |||
114 | @Override | ||
115 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
116 | this.logic.pullInto(collector, flush); | ||
117 | } | ||
118 | |||
119 | @Override | ||
120 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
121 | this.logic.pullIntoWithTimeline(collector, flush); | ||
122 | } | ||
123 | |||
124 | @Override | ||
125 | public void update(final Direction direction, final Tuple input, final Timestamp timestamp) { | ||
126 | this.logic.update(direction, input, timestamp); | ||
127 | } | ||
128 | |||
129 | /** | ||
130 | * @since 2.4 | ||
131 | */ | ||
132 | @Override | ||
133 | public Timestamp getResumableTimestamp() { | ||
134 | if (this.memory == null) { | ||
135 | return null; | ||
136 | } else { | ||
137 | return this.memory.getResumableTimestamp(); | ||
138 | } | ||
139 | } | ||
140 | |||
141 | /** | ||
142 | * @since 2.4 | ||
143 | */ | ||
144 | @Override | ||
145 | public void resumeAt(final Timestamp timestamp) { | ||
146 | this.logic.resumeAt(timestamp); | ||
147 | } | ||
148 | |||
149 | /** | ||
150 | * @since 2.3 | ||
151 | */ | ||
152 | protected static abstract class NetworkStructureChangeSensitiveLogic { | ||
153 | |||
154 | /** | ||
155 | * @since 2.4 | ||
156 | */ | ||
157 | public abstract void update(final Direction direction, final Tuple input, final Timestamp timestamp); | ||
158 | |||
159 | public abstract void pullInto(final Collection<Tuple> collector, final boolean flush); | ||
160 | |||
161 | /** | ||
162 | * @since 2.4 | ||
163 | */ | ||
164 | public abstract void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush); | ||
165 | |||
166 | /** | ||
167 | * @since 2.4 | ||
168 | */ | ||
169 | public abstract void resumeAt(final Timestamp timestamp); | ||
170 | |||
171 | } | ||
172 | |||
173 | private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() { | ||
174 | |||
175 | @Override | ||
176 | public void resumeAt(final Timestamp timestamp) { | ||
177 | // there is nothing to resume in the timeless case because we do not even care about timestamps | ||
178 | } | ||
179 | |||
180 | @Override | ||
181 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
182 | throw new UnsupportedOperationException(); | ||
183 | } | ||
184 | |||
185 | @Override | ||
186 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
187 | for (final Iterable<Tuple> output : outputCache.values()) { | ||
188 | if (output != NORESULT) { | ||
189 | final Iterator<Tuple> itr = output.iterator(); | ||
190 | while (itr.hasNext()) { | ||
191 | collector.add(itr.next()); | ||
192 | } | ||
193 | } | ||
194 | } | ||
195 | } | ||
196 | |||
197 | @Override | ||
198 | public void update(final Direction direction, final Tuple input, final Timestamp timestamp) { | ||
199 | if (direction == Direction.INSERT) { | ||
200 | final Iterable<Tuple> output = core.performEvaluation(input); | ||
201 | if (output != null) { | ||
202 | final Iterable<Tuple> previous = outputCache.put(input, output); | ||
203 | if (previous != null) { | ||
204 | throw new IllegalStateException( | ||
205 | String.format("Duplicate insertion of tuple %s into node %s", input, this)); | ||
206 | } | ||
207 | propagateIterableUpdate(direction, output, timestamp); | ||
208 | } | ||
209 | } else { | ||
210 | final Iterable<Tuple> output = outputCache.remove(input); | ||
211 | if (output != null) { | ||
212 | // may be null if no result was yielded | ||
213 | propagateIterableUpdate(direction, output, timestamp); | ||
214 | } | ||
215 | } | ||
216 | } | ||
217 | }; | ||
218 | |||
219 | private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() { | ||
220 | |||
221 | @Override | ||
222 | public void resumeAt(final Timestamp timestamp) { | ||
223 | final Map<Tuple, Diff<Timestamp>> diffMap = memory.resumeAt(timestamp); | ||
224 | |||
225 | for (final Entry<Tuple, Diff<Timestamp>> entry : diffMap.entrySet()) { | ||
226 | final Tuple input = entry.getKey(); | ||
227 | final Iterable<Tuple> output = outputCache.get(input); | ||
228 | if (output != NORESULT) { | ||
229 | for (final Signed<Timestamp> signed : entry.getValue()) { | ||
230 | propagateIterableUpdate(signed.getDirection(), output, signed.getPayload()); | ||
231 | } | ||
232 | } | ||
233 | |||
234 | if (memory.get(input) == null) { | ||
235 | outputCache.remove(input); | ||
236 | } | ||
237 | } | ||
238 | |||
239 | final Timestamp nextTimestamp = memory.getResumableTimestamp(); | ||
240 | if (nextTimestamp != null) { | ||
241 | group.notifyHasMessage(mailbox, nextTimestamp); | ||
242 | } | ||
243 | } | ||
244 | |||
245 | @Override | ||
246 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
247 | for (final Entry<Tuple, Timeline<Timestamp>> entry : memory.asMap().entrySet()) { | ||
248 | final Tuple input = entry.getKey(); | ||
249 | final Iterable<Tuple> output = outputCache.get(input); | ||
250 | if (output != NORESULT) { | ||
251 | final Timeline<Timestamp> timestamp = entry.getValue(); | ||
252 | final Iterator<Tuple> itr = output.iterator(); | ||
253 | while (itr.hasNext()) { | ||
254 | collector.put(itr.next(), timestamp); | ||
255 | } | ||
256 | } | ||
257 | } | ||
258 | } | ||
259 | |||
260 | @Override | ||
261 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
262 | TIMELESS.pullInto(collector, flush); | ||
263 | } | ||
264 | |||
265 | @Override | ||
266 | public void update(final Direction direction, final Tuple input, final Timestamp timestamp) { | ||
267 | if (direction == Direction.INSERT) { | ||
268 | Iterable<Tuple> output = outputCache.get(input); | ||
269 | if (output == null) { | ||
270 | output = core.performEvaluation(input); | ||
271 | if (output == null) { | ||
272 | // the evaluation result is really null | ||
273 | output = NORESULT; | ||
274 | } | ||
275 | outputCache.put(input, output); | ||
276 | } | ||
277 | final Diff<Timestamp> diff = memory.put(input, timestamp); | ||
278 | if (output != NORESULT) { | ||
279 | for (final Signed<Timestamp> signed : diff) { | ||
280 | propagateIterableUpdate(signed.getDirection(), output, signed.getPayload()); | ||
281 | } | ||
282 | } | ||
283 | } else { | ||
284 | final Iterable<Tuple> output = outputCache.get(input); | ||
285 | final Diff<Timestamp> diff = memory.remove(input, timestamp); | ||
286 | if (memory.get(input) == null) { | ||
287 | outputCache.remove(input); | ||
288 | } | ||
289 | if (output != NORESULT) { | ||
290 | for (final Signed<Timestamp> signed : diff) { | ||
291 | propagateIterableUpdate(signed.getDirection(), output, signed.getPayload()); | ||
292 | } | ||
293 | } | ||
294 | } | ||
295 | } | ||
296 | }; | ||
297 | |||
298 | /** | ||
299 | * This field is used to represent the "null" evaluation result. This is an optimization used in the timely case | ||
300 | * where the same tuple may be inserted multiple times with different timestamps. This way, we can also cache if | ||
301 | * something evaluated to null (instead of just forgetting about the previously computed result), thus avoiding the | ||
302 | * need to re-run a potentially expensive evaluation. | ||
303 | */ | ||
304 | private static final Iterable<Tuple> NORESULT = Collections | ||
305 | .singleton(Tuples.staticArityFlatTupleOf(NoResult.INSTANCE)); | ||
306 | |||
307 | private enum NoResult { | ||
308 | INSTANCE | ||
309 | } | ||
310 | |||
311 | } | ||