aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java212
1 files changed, 212 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java
new file mode 100644
index 00000000..0c73000e
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/timely/FirstOnlyTimelyColumnAggregatorNode.java
@@ -0,0 +1,212 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2019, Tamas Szabo, itemis AG, Gabor Bergmann, IncQuery Labs Ltd.
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.aggregation.timely;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Map;
14import java.util.Map.Entry;
15import java.util.Objects;
16import java.util.TreeMap;
17
18import tools.refinery.viatra.runtime.matchers.psystem.aggregations.IMultisetAggregationOperator;
19import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
20import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
21import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
22import tools.refinery.viatra.runtime.matchers.util.Direction;
23import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
24import tools.refinery.viatra.runtime.matchers.util.timeline.Timelines;
25import tools.refinery.viatra.runtime.rete.aggregation.AbstractColumnAggregatorNode;
26import tools.refinery.viatra.runtime.rete.aggregation.GroupedMap;
27import tools.refinery.viatra.runtime.rete.network.ReteContainer;
28import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
29import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
30import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;
31
32/**
33 * First-only timely implementation of the column aggregator node. Only timestamps of appearance are maintained for
34 * tuples instead of complete timelines.
35 * <br><br>
36 * Subclasses are responsible for implementing the aggregator architecture, and they must make use of the inner class {@link CumulativeAggregate}.
37 * <br><br>
38 * This node supports recursive aggregation.
39 *
40 * @author Tamas Szabo
41 * @since 2.4
42 */
43public abstract class FirstOnlyTimelyColumnAggregatorNode<Domain, Accumulator, AggregateResult>
44 extends AbstractColumnAggregatorNode<Domain, Accumulator, AggregateResult> {
45
46 protected final Map<Tuple, TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>>> memory;
47
48 public FirstOnlyTimelyColumnAggregatorNode(final ReteContainer reteContainer,
49 final IMultisetAggregationOperator<Domain, Accumulator, AggregateResult> operator,
50 final TupleMask groupMask, final TupleMask columnMask) {
51 super(reteContainer, operator, groupMask, columnMask);
52 this.memory = CollectionsFactory.createMap();
53 // mailbox MUST be instantiated after the fields are all set
54 this.mailbox = instantiateMailbox();
55 }
56
57 protected static class CumulativeAggregate<Accumulator, AggregateResult> {
58 // the accumulator storing the aggregands
59 protected Accumulator accumulator;
60 // the aggregate result at the timestamp where this cumulative aggregate is stored
61 protected AggregateResult result;
62
63 private CumulativeAggregate(final Accumulator accumulator, final AggregateResult result) {
64 this.accumulator = accumulator;
65 this.result = result;
66 }
67
68 }
69
70 public Collection<Tuple> getGroups() {
71 return this.memory.keySet();
72 }
73
74 public AggregateResult getLastResult(final Tuple group) {
75 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> groupMap = this.memory.get(group);
76 if (groupMap == null) {
77 return null;
78 } else {
79 return groupMap.lastEntry().getValue().result;
80 }
81 }
82
83 public Timestamp getLastTimestamp(final Tuple group) {
84 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> groupMap = this.memory.get(group);
85 if (groupMap == null) {
86 return null;
87 } else {
88 return groupMap.lastEntry().getKey();
89 }
90 }
91
92 @Override
93 protected Mailbox instantiateMailbox() {
94 return new TimelyMailbox(this, this.reteContainer);
95 }
96
97 @Override
98 public void clear() {
99 this.mailbox.clear();
100 this.memory.clear();
101 this.children.clear();
102 this.childMailboxes.clear();
103 }
104
105 protected void propagateWithChecks(final Tuple group, final Timestamp timestamp,
106 final AggregateResult previousOldResult, final AggregateResult previousNewResult,
107 final AggregateResult currentOldResult, final AggregateResult currentNewResult) {
108 final boolean jumpDown = Objects.equals(previousNewResult, currentOldResult);
109 final boolean jumpUp = Objects.equals(previousOldResult, currentNewResult);
110 final boolean resultsDiffer = !Objects.equals(currentOldResult, currentNewResult);
111
112 // uniqueness enforcement is happening here
113 if ((resultsDiffer || jumpDown) && !Objects.equals(previousOldResult, currentOldResult)) {
114 propagate(Direction.DELETE, group, currentOldResult, timestamp);
115 }
116 if ((resultsDiffer || jumpUp) && !Objects.equals(previousNewResult, currentNewResult)) {
117 propagate(Direction.INSERT, group, currentNewResult, timestamp);
118 }
119 }
120
121 /**
122 * Returns the aggregation architecture-specific accumulator at the specified timestamp for the given group.
123 */
124 protected abstract Accumulator getAccumulator(final Tuple group, final Timestamp timestamp);
125
126 protected AggregateResult getResultRaw(final Tuple group, final Timestamp timestamp, final boolean lower) {
127 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group);
128 if (entryMap == null) {
129 return null;
130 } else {
131 CumulativeAggregate<Accumulator, AggregateResult> entry = null;
132 if (lower) {
133 final Entry<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> lowerEntry = entryMap
134 .lowerEntry(timestamp);
135 if (lowerEntry != null) {
136 entry = lowerEntry.getValue();
137 }
138 } else {
139 entry = entryMap.get(timestamp);
140 }
141 if (entry == null) {
142 return null;
143 } else {
144 return entry.result;
145 }
146 }
147 }
148
149 protected AggregateResult getResult(final Tuple group, final Timestamp timestamp, final boolean lower) {
150 final AggregateResult result = getResultRaw(group, timestamp, lower);
151 if (result == null) {
152 return NEUTRAL;
153 } else {
154 return result;
155 }
156 }
157
158 protected AggregateResult getResult(final Tuple group, final Timestamp timestamp) {
159 return getResult(group, timestamp, false);
160 }
161
162 protected void storeIfNotNeutral(final Tuple group, final Accumulator accumulator, final AggregateResult value,
163 final Timestamp timestamp) {
164 TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group);
165 if (operator.isNeutral(accumulator)) {
166 if (entryMap != null) {
167 entryMap.remove(timestamp);
168 if (entryMap.isEmpty()) {
169 this.memory.remove(group);
170 }
171 }
172 } else {
173 if (entryMap == null) {
174 entryMap = CollectionsFactory.createTreeMap();
175 this.memory.put(group, entryMap);
176 }
177 entryMap.put(timestamp, new CumulativeAggregate<>(accumulator, value));
178 }
179 }
180
181 @Override
182 public Tuple getAggregateTuple(final Tuple group) {
183 return tupleFromAggregateResult(group, getResult(group, Timestamp.ZERO));
184 }
185
186 @Override
187 public AggregateResult getAggregateResult(final Tuple group) {
188 return getResult(group, Timestamp.ZERO);
189 }
190
191 @Override
192 public Map<AggregateResult, Timeline<Timestamp>> getAggregateResultTimeline(final Tuple group) {
193 final TreeMap<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entryMap = this.memory.get(group);
194 if (entryMap == null) {
195 return Collections.emptyMap();
196 } else {
197 final Map<AggregateResult, Timeline<Timestamp>> result = CollectionsFactory.createMap();
198 for (final Entry<Timestamp, CumulativeAggregate<Accumulator, AggregateResult>> entry : entryMap
199 .descendingMap().entrySet()) {
200 result.put(entry.getValue().result, Timelines.createFrom(entry.getKey()));
201 }
202 return result;
203 }
204 }
205
206 @Override
207 public Map<Tuple, Timeline<Timestamp>> getAggregateTupleTimeline(final Tuple group) {
208 final Map<AggregateResult, Timeline<Timestamp>> resultTimelines = getAggregateResultTimeline(group);
209 return new GroupedMap<AggregateResult, Timeline<Timestamp>>(group, resultTimelines, this.runtimeContext);
210 }
211
212}