aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/IndexerWithMemory.java
blob: a31562e96b4f0f3e92749c11eac5e632b4ee6034 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
/*******************************************************************************
 * Copyright (c) 2004-2009 Gabor Bergmann and Daniel Varro
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-v20.html.
 * 
 * SPDX-License-Identifier: EPL-2.0
 *******************************************************************************/

package tools.refinery.viatra.runtime.rete.index;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;

import tools.refinery.viatra.runtime.matchers.memories.MaskedTupleMemory;
import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory.MemoryType;
import tools.refinery.viatra.runtime.matchers.util.Direction;
import tools.refinery.viatra.runtime.matchers.util.Signed;
import tools.refinery.viatra.runtime.matchers.util.timeline.Diff;
import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode;
import tools.refinery.viatra.runtime.rete.network.Receiver;
import tools.refinery.viatra.runtime.rete.network.ReteContainer;
import tools.refinery.viatra.runtime.rete.network.Supplier;
import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;

/**
 * @author Gabor Bergmann
 * @author Tamas Szabo
 */
public abstract class IndexerWithMemory extends StandardIndexer
        implements Receiver, NetworkStructureChangeSensitiveNode, ResumableNode {

    protected MaskedTupleMemory<Timestamp> memory;

    /**
     * @since 2.3
     */
    protected NetworkStructureChangeSensitiveLogic logic;

    /**
     * @since 1.6
     */
    protected final Mailbox mailbox;

    /**
     * @since 2.4
     */
    protected CommunicationGroup group;

    public IndexerWithMemory(final ReteContainer reteContainer, final TupleMask mask) {
        super(reteContainer, mask);
        final boolean isTimely = reteContainer.isTimelyEvaluation()
                && reteContainer.getCommunicationTracker().isInRecursiveGroup(this);
        memory = MaskedTupleMemory.create(mask, MemoryType.SETS, this, isTimely, isTimely && reteContainer
                .getTimelyConfiguration().getTimelineRepresentation() == TimelineRepresentation.FAITHFUL);
        reteContainer.registerClearable(memory);
        mailbox = instantiateMailbox();
        reteContainer.registerClearable(mailbox);
        this.logic = createLogic();
    }

    @Override
    public CommunicationGroup getCurrentGroup() {
        return this.group;
    }

    @Override
    public void setCurrentGroup(final CommunicationGroup group) {
        this.group = group;
    }

    @Override
    public void networkStructureChanged() {
        super.networkStructureChanged();
        final boolean wasTimely = this.memory.isTimely();
        final boolean isTimely = this.reteContainer.isTimelyEvaluation()
                && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this);
        if (wasTimely != isTimely) {
            final MaskedTupleMemory<Timestamp> newMemory = MaskedTupleMemory.create(mask, MemoryType.SETS, this,
                    isTimely, isTimely && reteContainer.getTimelyConfiguration()
                            .getTimelineRepresentation() == TimelineRepresentation.FAITHFUL);
            newMemory.initializeWith(this.memory, Timestamp.ZERO);
            memory.clear();
            memory = newMemory;
        }
        this.logic = createLogic();
    }

    /**
     * Instantiates the {@link Mailbox} of this receiver. Subclasses may override this method to provide their own
     * mailbox implementation.
     * 
     * @return the mailbox
     * @since 2.0
     */
    protected Mailbox instantiateMailbox() {
        if (this.reteContainer.isTimelyEvaluation()) {
            return new TimelyMailbox(this, this.reteContainer);
        } else {
            return new BehaviorChangingMailbox(this, this.reteContainer);
        }
    }

    @Override
    public Mailbox getMailbox() {
        return this.mailbox;
    }

    /**
     * @since 2.0
     */
    public MaskedTupleMemory<Timestamp> getMemory() {
        return memory;
    }

    @Override
    public void update(final Direction direction, final Tuple updateElement, final Timestamp timestamp) {
        this.logic.update(direction, updateElement, timestamp);
    }

    /**
     * Refined version of update
     * 
     * @since 2.4
     */
    protected abstract void update(final Direction direction, final Tuple updateElement, final Tuple signature,
            final boolean change, final Timestamp timestamp);

    @Override
    public void appendParent(final Supplier supplier) {
        if (parent == null) {
            parent = supplier;
        } else {
            throw new UnsupportedOperationException("Illegal RETE edge: " + this + " already has a parent (" + parent
                    + ") and cannot connect to additional parent (" + supplier + "). ");
        }
    }

    @Override
    public void removeParent(final Supplier supplier) {
        if (parent == supplier) {
            parent = null;
        } else {
            throw new IllegalArgumentException(
                    "Illegal RETE edge removal: the parent of " + this + " is not " + supplier);
        }
    }

    /**
     * @since 2.4
     */
    @Override
    public Collection<Supplier> getParents() {
        return Collections.singleton(parent);
    }

    /**
     * @since 2.4
     */
    @Override
    public void resumeAt(final Timestamp timestamp) {
        this.logic.resumeAt(timestamp);
    }

    /**
     * @since 2.4
     */
    @Override
    public Timestamp getResumableTimestamp() {
        return this.memory.getResumableTimestamp();
    }

    /**
     * @since 2.3
     */
    protected static abstract class NetworkStructureChangeSensitiveLogic {

        /**
         * @since 2.4
         */
        public abstract void update(final Direction direction, final Tuple updateElement, final Timestamp timestamp);

        /**
         * @since 2.4
         */
        public abstract void resumeAt(final Timestamp timestamp);

    }

    /**
     * @since 2.3
     */
    protected NetworkStructureChangeSensitiveLogic createLogic() {
        if (this.reteContainer.isTimelyEvaluation()
                && this.reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) {
            return TIMELY;
        } else {
            return TIMELESS;
        }
    }

    private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() {

        @Override
        public void resumeAt(final Timestamp timestamp) {
            final Iterable<Tuple> signatures = memory.getResumableSignatures();

            final Map<Tuple, Boolean> wasPresent = CollectionsFactory.createMap();
            for (final Tuple signature : signatures) {
                wasPresent.put(signature, memory.isPresentAtInfinity(signature));
            }

            final Map<Tuple, Map<Tuple, Diff<Timestamp>>> signatureMap = memory.resumeAt(timestamp);

            for (final Entry<Tuple, Map<Tuple, Diff<Timestamp>>> outerEntry : signatureMap.entrySet()) {
                final Tuple signature = outerEntry.getKey();
                final Map<Tuple, Diff<Timestamp>> diffMap = outerEntry.getValue();
                final boolean isPresent = memory.isPresentAtInfinity(signature);
                // only send out a potential true value the first time for a given signature, then set it to false
                boolean change = wasPresent.get(signature) ^ isPresent;

                for (final Entry<Tuple, Diff<Timestamp>> innerEntry : diffMap.entrySet()) {
                    final Tuple tuple = innerEntry.getKey();
                    final Diff<Timestamp> diffs = innerEntry.getValue();
                    for (final Signed<Timestamp> signed : diffs) {
                        IndexerWithMemory.this.update(signed.getDirection(), tuple, signature, change,
                                signed.getPayload());
                    }
                    // change is a signature-wise flag, so it is ok to "try" to signal it for the first tuple only
                    change = false;
                }
            }

            final Timestamp nextTimestamp = memory.getResumableTimestamp();
            if (nextTimestamp != null) {
                group.notifyHasMessage(mailbox, nextTimestamp);
            }
        }

        @Override
        public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
            final Tuple signature = mask.transform(update);
            final boolean wasPresent = memory.isPresentAtInfinity(signature);
            final Diff<Timestamp> resultDiff = direction == Direction.INSERT
                    ? memory.addWithTimestamp(update, signature, timestamp)
                    : memory.removeWithTimestamp(update, signature, timestamp);
            final boolean isPresent = memory.isPresentAtInfinity(signature);
            final boolean change = wasPresent ^ isPresent;
            for (final Signed<Timestamp> signed : resultDiff) {
                IndexerWithMemory.this.update(signed.getDirection(), update, signature, change, signed.getPayload());
            }
        }

    };

    private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() {

        @Override
        public void update(final Direction direction, final Tuple update, final Timestamp timestamp) {
            final Tuple signature = mask.transform(update);
            final boolean change = direction == Direction.INSERT ? memory.add(update, signature)
                    : memory.remove(update, signature);
            IndexerWithMemory.this.update(direction, update, signature, change, timestamp);
        }

        @Override
        public void resumeAt(final Timestamp timestamp) {
            // there is nothing to resume in the timeless case because we do not even care about timestamps
        }

    };

}