aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/aggregation/IndexerBasedAggregatorNode.java
blob: d9a94a82b525d2fa91a01fda120df013f21920ea (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/*******************************************************************************
 * Copyright (c) 2004-2009 Gabor Bergmann and Daniel Varro
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-v20.html.
 * 
 * SPDX-License-Identifier: EPL-2.0
 *******************************************************************************/

package tools.refinery.viatra.runtime.rete.aggregation;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;

import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
import tools.refinery.viatra.runtime.matchers.util.Direction;
import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
import tools.refinery.viatra.runtime.rete.index.DefaultIndexerListener;
import tools.refinery.viatra.runtime.rete.index.Indexer;
import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
import tools.refinery.viatra.runtime.rete.index.StandardIndexer;
import tools.refinery.viatra.runtime.rete.network.Node;
import tools.refinery.viatra.runtime.rete.network.ReteContainer;
import tools.refinery.viatra.runtime.rete.network.StandardNode;
import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
import tools.refinery.viatra.runtime.rete.traceability.TraceInfo;

/**
 * A special node depending on a projection indexer to aggregate tuple groups with the same projection. Only propagates
 * the aggregates of non-empty groups. Use the outer indexers to circumvent.
 * <p>
 * This node cannot be used in recursive differential dataflow evaluation. 
 * 
 * @author Gabor Bergmann
 * @since 1.4
 */
public abstract class IndexerBasedAggregatorNode extends StandardNode implements IAggregatorNode {

    ProjectionIndexer projection;
    IndexerBasedAggregatorNode me;
    int sourceWidth;
    Map<Tuple, Object> mainAggregates;

    AggregatorOuterIndexer aggregatorOuterIndexer = null;
    AggregatorOuterIdentityIndexer[] aggregatorOuterIdentityIndexers = null;

    /**
     * MUST call initializeWith() afterwards!
     */
    public IndexerBasedAggregatorNode(ReteContainer reteContainer) {
        super(reteContainer);
        this.me = this;
        mainAggregates = CollectionsFactory.createMap();
    }
    
    @Override
    public void networkStructureChanged() {
        if (this.reteContainer.isTimelyEvaluation() && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) {
            throw new IllegalStateException(this.toString() + " cannot be used in recursive differential dataflow evaluation!");
        }
        super.networkStructureChanged();
    }
    
    /**
     * @param projection
     *            the projection indexer whose tuple groups should be aggregated
     */
    public void initializeWith(ProjectionIndexer projection) {
        this.projection = projection;
        this.sourceWidth = projection.getMask().indices.length;
        
        for (Tuple signature : projection.getSignatures()) {
            mainAggregates.put(signature, aggregateGroup(signature, projection.get(signature)));
        }
        projection.attachListener(new DefaultIndexerListener(this) {
            @Override
            public void notifyIndexerUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change, Timestamp timestamp) {
                aggregateUpdate(direction, updateElement, signature, change);
            }
        });
    }

    /**
     * Aggregates (reduces) a group of tuples. The group can be null.
     */
    public abstract Object aggregateGroup(Tuple signature, Collection<Tuple> group);

    
    /**
     * Aggregates (reduces) a group of tuples, having access to the previous aggregated value (before the update) and
     * the update definition. Defaults to aggregateGroup(). Override to increase performance.
     * @since 2.4
     */
    public Object aggregateGroupAfterUpdate(Tuple signature, Collection<Tuple> currentGroup, Object oldAggregate,
            Direction direction, Tuple updateElement, boolean change) {
        return aggregateGroup(signature, currentGroup);
    }

    protected Tuple aggregateAndPack(Tuple signature, Collection<Tuple> group) {
        return packResult(signature, aggregateGroup(signature, group));
    }

    @Override
    public Indexer getAggregatorOuterIndexer() {
        if (aggregatorOuterIndexer == null) {
            aggregatorOuterIndexer = new AggregatorOuterIndexer();
            this.getCommunicationTracker().registerDependency(this, aggregatorOuterIndexer);
            // reteContainer.connectAndSynchronize(this, aggregatorOuterIndexer);
        }
        return aggregatorOuterIndexer;
    }

    @Override
    public Indexer getAggregatorOuterIdentityIndexer(int resultPositionInSignature) {
        if (aggregatorOuterIdentityIndexers == null)
            aggregatorOuterIdentityIndexers = new AggregatorOuterIdentityIndexer[sourceWidth + 1];
        if (aggregatorOuterIdentityIndexers[resultPositionInSignature] == null) {
            aggregatorOuterIdentityIndexers[resultPositionInSignature] = new AggregatorOuterIdentityIndexer(
                    resultPositionInSignature);
            this.getCommunicationTracker().registerDependency(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]);
            // reteContainer.connectAndSynchronize(this, aggregatorOuterIdentityIndexers[resultPositionInSignature]);
        }
        return aggregatorOuterIdentityIndexers[resultPositionInSignature];
    }

    @Override
    public void pullInto(final Collection<Tuple> collector, final boolean flush) {
        for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) {
            collector.add(packResult(aggregateEntry.getKey(), aggregateEntry.getValue()));
        }
    }
    
    @Override
    public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
        // use all zero timestamps because this node cannot be used in recursive groups anyway
        for (final Entry<Tuple, Object> aggregateEntry : mainAggregates.entrySet()) {
            collector.put(packResult(aggregateEntry.getKey(), aggregateEntry.getValue()), Timestamp.INSERT_AT_ZERO_TIMELINE);
        }
    }

    protected Tuple packResult(Tuple signature, Object result) {
        return Tuples.staticArityLeftInheritanceTupleOf(signature, result);
    }

    /**
     * @since 2.4
     */
    protected void aggregateUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change) {
        Collection<Tuple> currentGroup = projection.get(signature);
        // these will be null if group is empty
        Object oldAggregate = mainAggregates.get(signature);
        Object safeOldAggregate = oldAggregate == null ? aggregateGroup(signature, null) : oldAggregate;
        boolean empty = currentGroup == null || currentGroup.isEmpty();
        Object newAggregate = empty ? null : aggregateGroupAfterUpdate(signature, currentGroup, safeOldAggregate/*
                                                                                                                 * non-null
                                                                                                                 */,
                direction, updateElement, change);
        if (!empty)
            mainAggregates.put(signature, newAggregate);
        else
            mainAggregates.remove(signature);
        Tuple oldTuple = packResult(signature, safeOldAggregate);
        Tuple newTuple = packResult(signature, newAggregate == null ? aggregateGroup(signature, null) : newAggregate);
        if (oldAggregate != null)
            propagateUpdate(Direction.DELETE, oldTuple, Timestamp.ZERO); // direct outputs lack non-empty groups
        if (newAggregate != null)
            propagateUpdate(Direction.INSERT, newTuple, Timestamp.ZERO); // direct outputs lack non-empty groups
        if (aggregatorOuterIndexer != null)
            aggregatorOuterIndexer.propagate(signature, oldTuple, newTuple);
        if (aggregatorOuterIdentityIndexers != null)
            for (AggregatorOuterIdentityIndexer aggregatorOuterIdentityIndexer : aggregatorOuterIdentityIndexers)
                if (aggregatorOuterIdentityIndexer != null)
                    aggregatorOuterIdentityIndexer.propagate(signature, oldTuple, newTuple);
    }

    private Object getAggregate(Tuple signature) {
        Object aggregate = mainAggregates.get(signature);
        return aggregate == null ? aggregateGroup(signature, null) : aggregate;
    }
    
    @Override
    public void assignTraceInfo(TraceInfo traceInfo) {
        super.assignTraceInfo(traceInfo);
        if (traceInfo.propagateToIndexerParent() && projection != null)
            projection.acceptPropagatedTraceInfo(traceInfo);
    }

    /**
     * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original
     * signature.
     * 
     * @author Gabor Bergmann
     */
    class AggregatorOuterIndexer extends StandardIndexer {

        public AggregatorOuterIndexer() {
            super(me.reteContainer, TupleMask.omit(sourceWidth, sourceWidth + 1));
            this.parent = me;
        }

        @Override
        public Collection<Tuple> get(Tuple signature) {
            return Collections.singleton(packResult(signature, getAggregate(signature)));
        }

        public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) {
            propagate(Direction.INSERT, newTuple, signature, false, Timestamp.ZERO);
            propagate(Direction.DELETE, oldTuple, signature, false, Timestamp.ZERO);
        }

        @Override
        public Node getActiveNode() {
            return projection.getActiveNode();
        }

    }

    /**
     * A special non-iterable index that checks a suspected aggregate value for a given signature. The signature for
     * this index is the original signature of the projection index, with the suspected result inserted at position
     * resultPositionInSignature.
     * 
     * @author Gabor Bergmann
     */

    class AggregatorOuterIdentityIndexer extends StandardIndexer {
        int resultPositionInSignature;
        TupleMask pruneResult;
        TupleMask reorderMask;

        public AggregatorOuterIdentityIndexer(int resultPositionInSignature) {
            super(me.reteContainer, TupleMask.displace(sourceWidth, resultPositionInSignature, sourceWidth + 1));
            this.parent = me;
            // this.localAggregates = new HashMap<Tuple, Tuple>();
            this.resultPositionInSignature = resultPositionInSignature;
            this.pruneResult = TupleMask.omit(resultPositionInSignature, sourceWidth + 1);
            if (resultPositionInSignature == sourceWidth)
                this.reorderMask = null;
            else
                this.reorderMask = mask;
        }

        @Override
        public Collection<Tuple> get(Tuple signatureWithResult) {
            Tuple prunedSignature = pruneResult.transform(signatureWithResult);
            Object result = getAggregate(prunedSignature);
            if (signatureWithResult.get(resultPositionInSignature).equals(result))
                return Collections.singleton(signatureWithResult);
            else
                return null;
        }

        public void propagate(Tuple signature, Tuple oldTuple, Tuple newTuple) {
            propagate(Direction.INSERT, reorder(newTuple), signature, true, Timestamp.ZERO);
            propagate(Direction.DELETE, reorder(oldTuple), signature, true, Timestamp.ZERO);
        }

        private Tuple reorder(Tuple signatureWithResult) {
            Tuple transformed;
            if (reorderMask == null)
                transformed = signatureWithResult;
            else
                transformed = reorderMask.transform(signatureWithResult);
            return transformed;
        }

        @Override
        public Node getActiveNode() {
            return projection.getActiveNode();
        }
    }

}