aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java
blob: bf3b8e1423b70d7fe8298030ad7c21894e6a1eb0 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/*******************************************************************************
 * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-v20.html.
 * 
 * SPDX-License-Identifier: EPL-2.0
 *******************************************************************************/
package tools.refinery.viatra.runtime.rete.network.mailbox.timely;

import java.util.Map;
import java.util.TreeMap;

import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
import tools.refinery.viatra.runtime.matchers.util.Direction;
import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation;
import tools.refinery.viatra.runtime.rete.network.Receiver;
import tools.refinery.viatra.runtime.rete.network.ReteContainer;
import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector;
import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode;
import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;

public class TimelyMailbox implements Mailbox {

    protected TreeMap<Timestamp, Map<Tuple, Integer>> queue;
    protected final Receiver receiver;
    protected final ReteContainer container;
    protected CommunicationGroup group;
    protected boolean fallThrough;

    public TimelyMailbox(final Receiver receiver, final ReteContainer container) {
        this.receiver = receiver;
        this.container = container;
        this.queue = CollectionsFactory.createTreeMap();
    }

    protected TreeMap<Timestamp, Map<Tuple, Integer>> getActiveQueue() {
        return this.queue;
    }

    @Override
    public boolean isEmpty() {
        return getActiveQueue().isEmpty();
    }

    @Override
    public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) {
        final TreeMap<Timestamp, Map<Tuple, Integer>> activeQueue = getActiveQueue();

        Map<Tuple, Integer> tupleMap = activeQueue.get(timestamp);
        final boolean wasEmpty = tupleMap == null;
        boolean significantChange = false;

        if (tupleMap == null) {
            tupleMap = CollectionsFactory.createMap();
            activeQueue.put(timestamp, tupleMap);
            significantChange = true;
        }

        Integer count = tupleMap.get(update);
        if (count == null) {
            count = 0;
            significantChange = true;
        }

        if (direction == Direction.DELETE) {
            count--;
        } else {
            count++;
        }

        if (count == 0) {
            tupleMap.remove(update);
            if (tupleMap.isEmpty()) {
                activeQueue.remove(timestamp);
            }
            significantChange = true;
        } else {
            tupleMap.put(update, count);
        }

        if (significantChange) {
            if (wasEmpty) {
                this.group.notifyHasMessage(this, timestamp);
            } else if (tupleMap.isEmpty()) {
                final Timestamp resumableTimestamp = (this.receiver instanceof ResumableNode)
                        ? ((ResumableNode) this.receiver).getResumableTimestamp()
                        : null;
                // check if there is folding left to do before unsubscribing just based on the message queue being empty
                if (resumableTimestamp == null || resumableTimestamp.compareTo(timestamp) != 0) {
                    this.group.notifyLostAllMessages(this, timestamp);
                }
            }
        }
    }

    @Override
    public void deliverAll(final MessageSelector selector) {
        if (selector instanceof Timestamp) {
            final Timestamp timestamp = (Timestamp) selector;
            // REMOVE the tuples associated with the selector, dont just query them
            final Map<Tuple, Integer> tupleMap = this.queue.remove(timestamp);

            // tupleMap may be empty if we only have lazy folding to do
            if (tupleMap != null) {
                this.receiver.batchUpdate(tupleMap.entrySet(), timestamp);
            }

            if (this.container.getTimelyConfiguration()
                    .getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) {
                // (1) either normal delivery, which ended up being a lazy folding state
                // (2) and/or lazy folding needs to be resumed
                if (this.receiver instanceof ResumableNode) {
                    ((ResumableNode) this.receiver).resumeAt(timestamp);
                }
            }
        } else {
            throw new IllegalArgumentException("Unsupported message selector " + selector);
        }
    }

    @Override
    public String toString() {
        return "DDF_MBOX (" + this.receiver + ") " + this.getActiveQueue();
    }

    @Override
    public Receiver getReceiver() {
        return this.receiver;
    }

    @Override
    public void clear() {
        this.queue.clear();
    }

    @Override
    public CommunicationGroup getCurrentGroup() {
        return this.group;
    }

    @Override
    public void setCurrentGroup(final CommunicationGroup group) {
        this.group = group;
    }

}