diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java new file mode 100644 index 00000000..0c73000e --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java | |||
@@ -0,0 +1,212 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.aggregation.timely; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Map; | ||
14 | import java.util.Map.Entry; | ||
15 | import java.util.Objects; | ||
16 | import java.util.TreeMap; | ||
17 | |||
18 | import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator; | ||
19 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
20 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
21 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
23 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines; | ||
25 | import tools.refinery.viatra.runtime.rete.aggregation.AbstractColumnAggregatorNode; | ||
26 | import tools.refinery.viatra.runtime.rete.aggregation.GroupedMap; | ||
27 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
28 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
29 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
30 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
31 | |||
32 | /** | ||
33 | * First-only timely implementation of the column aggregator node. Only timestamps of appearance are maintained for | ||
34 | * tuples instead of complete timelines. | ||
35 | * <br><br> | ||
36 | * Subclasses are responsible for implementing the aggregator architecture, and they must make use of the inner class {@link CumulativeAggregate}. | ||
37 | * <br><br> | ||
38 | * This node supports recursive aggregation. | ||
39 | * | ||
40 | * @author Tamas Szabo | ||
41 | * @since 2.4 | ||
42 | */ | ||
43 | public abstract class FirstOnlyTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult> | ||
44 | extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> { | ||
45 | |||
46 | protected final Map<Tuple, TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>>> memory; | ||
47 | |||
48 | public FirstOnlyTimelyColumnAggregatorNode(final ReteContainer reteContainer, | ||
49 | final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator, | ||
50 | final TupleMask groupMask, final TupleMask columnMask) { | ||
51 | super(reteContainer, operator, groupMask, columnMask); | ||
52 | this.memory = CollectionsFactory.createMap(); | ||
53 | // mailbox MUST be instantiated after the fields are all set | ||
54 | this.mailbox = instantiateMailbox(); | ||
55 | } | ||
56 | |||
57 | protected static class CumulativeAggregate<Accumulator, AggregateResult> { | ||
58 | // the accumulator storing the aggregands | ||
59 | protected Accumulator accumulator; | ||
60 | // the aggregate result at the timestamp where this cumulative aggregate is stored | ||
61 | protected AggregateResult result; | ||
62 | |||
63 | private CumulativeAggregate(final Accumulator accumulator, final AggregateResult result) { | ||
64 | this.accumulator = accumulator; | ||
65 | this.result = result; | ||
66 | } | ||
67 | |||
68 | } | ||
69 | |||
70 | public Collection<Tuple> getGroups() { | ||
71 | return this.memory.keySet(); | ||
72 | } | ||
73 | |||
74 | public AggregateResult getLastResult(final Tuple group) { | ||
75 | final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> groupMap = this.memory.get(group); | ||
76 | if (groupMap == null) { | ||
77 | return null; | ||
78 | } else { | ||
79 | return groupMap.lastEntry().getValue().result; | ||
80 | } | ||
81 | } | ||
82 | |||
83 | public Timestamp getLastTimestamp(final Tuple group) { | ||
84 | final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> groupMap = this.memory.get(group); | ||
85 | if (groupMap == null) { | ||
86 | return null; | ||
87 | } else { | ||
88 | return groupMap.lastEntry().getKey(); | ||
89 | } | ||
90 | } | ||
91 | |||
92 | @Override | ||
93 | protected Mailbox instantiateMailbox() { | ||
94 | return new TimelyMailbox(this, this.reteContainer); | ||
95 | } | ||
96 | |||
97 | @Override | ||
98 | public void clear() { | ||
99 | this.mailbox.clear(); | ||
100 | this.memory.clear(); | ||
101 | this.children.clear(); | ||
102 | this.childMailboxes.clear(); | ||
103 | } | ||
104 | |||
105 | protected void propagateWithChecks(final Tuple group, final Timestamp timestamp, | ||
106 | final AggregateResult previousOldResult, final AggregateResult previousNewResult, | ||
107 | final AggregateResult currentOldResult, final AggregateResult currentNewResult) { | ||
108 | final boolean jumpDown = Objects.equals(previousNewResult, currentOldResult); | ||
109 | final boolean jumpUp = Objects.equals(previousOldResult, currentNewResult); | ||
110 | final boolean resultsDiffer = !Objects.equals(currentOldResult, currentNewResult); | ||
111 | |||
112 | // uniqueness enforcement is happening here | ||
113 | if ((resultsDiffer || jumpDown) && !Objects.equals(previousOldResult, currentOldResult)) { | ||
114 | propagate(Direction.DELETE, group, currentOldResult, timestamp); | ||
115 | } | ||
116 | if ((resultsDiffer || jumpUp) && !Objects.equals(previousNewResult, currentNewResult)) { | ||
117 | propagate(Direction.INSERT, group, currentNewResult, timestamp); | ||
118 | } | ||
119 | } | ||
120 | |||
121 | /** | ||
122 | * Returns the aggregation architecture-specific accumulator at the specified timestamp for the given group. | ||
123 | */ | ||
124 | protected abstract Accumulator getAccumulator(final Tuple group, final Timestamp timestamp); | ||
125 | |||
126 | protected AggregateResult getResultRaw(final Tuple group, final Timestamp timestamp, final boolean lower) { | ||
127 | final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group); | ||
128 | if (entryMap == null) { | ||
129 | return null; | ||
130 | } else { | ||
131 | CumulativeAggregate<Accumulator, AggregateResult> entry = null; | ||
132 | if (lower) { | ||
133 | final Entry<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> lowerEntry = entryMap | ||
134 | .lowerEntry(timestamp); | ||
135 | if (lowerEntry != null) { | ||
136 | entry = lowerEntry.getValue(); | ||
137 | } | ||
138 | } else { | ||
139 | entry = entryMap.get(timestamp); | ||
140 | } | ||
141 | if (entry == null) { | ||
142 | return null; | ||
143 | } else { | ||
144 | return entry.result; | ||
145 | } | ||
146 | } | ||
147 | } | ||
148 | |||
149 | protected AggregateResult getResult(final Tuple group, final Timestamp timestamp, final boolean lower) { | ||
150 | final AggregateResult result = getResultRaw(group, timestamp, lower); | ||
151 | if (result == null) { | ||
152 | return NEUTRAL; | ||
153 | } else { | ||
154 | return result; | ||
155 | } | ||
156 | } | ||
157 | |||
158 | protected AggregateResult getResult(final Tuple group, final Timestamp timestamp) { | ||
159 | return getResult(group, timestamp, false); | ||
160 | } | ||
161 | |||
162 | protected void storeIfNotNeutral(final Tuple group, final Accumulator accumulator, final AggregateResult value, | ||
163 | final Timestamp timestamp) { | ||
164 | TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group); | ||
165 | if (operator.isNeutral(accumulator)) { | ||
166 | if (entryMap != null) { | ||
167 | entryMap.remove(timestamp); | ||
168 | if (entryMap.isEmpty()) { | ||
169 | this.memory.remove(group); | ||
170 | } | ||
171 | } | ||
172 | } else { | ||
173 | if (entryMap == null) { | ||
174 | entryMap = CollectionsFactory.createTreeMap(); | ||
175 | this.memory.put(group, entryMap); | ||
176 | } | ||
177 | entryMap.put(timestamp, new CumulativeAggregate<>(accumulator, value)); | ||
178 | } | ||
179 | } | ||
180 | |||
181 | @Override | ||
182 | public Tuple getAggregateTuple(final Tuple group) { | ||
183 | return tupleFromAggregateResult(group, getResult(group, Timestamp.ZERO)); | ||
184 | } | ||
185 | |||
186 | @Override | ||
187 | public AggregateResult getAggregateResult(final Tuple group) { | ||
188 | return getResult(group, Timestamp.ZERO); | ||
189 | } | ||
190 | |||
191 | @Override | ||
192 | public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple group) { | ||
193 | final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group); | ||
194 | if (entryMap == null) { | ||
195 | return Collections.emptyMap(); | ||
196 | } else { | ||
197 | final Map<AggregateResult, Timeline<Timestamp>> result = CollectionsFactory.createMap(); | ||
198 | for (final Entry<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entry : entryMap | ||
199 | .descendingMap().entrySet()) { | ||
200 | result.put(entry.getValue().result, Timelines.createFrom(entry.getKey())); | ||
201 | } | ||
202 | return result; | ||
203 | } | ||
204 | } | ||
205 | |||
206 | @Override | ||
207 | public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple group) { | ||
208 | final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = getAggregateResultTimeline(group); | ||
209 | return new GroupedMap<AggregateResult, Timeline<Timestamp>>(group, resultTimelines, this.runtimeContext); | ||
210 | } | ||
211 | |||
212 | } | ||