diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java new file mode 100644 index 00000000..bf3b8e14 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timely/TimelyMailbox.java | |||
@@ -0,0 +1,150 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timely; | ||
10 | |||
11 | import java.util.Map; | ||
12 | import java.util.TreeMap; | ||
13 | |||
14 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
15 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
16 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
17 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; | ||
18 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
19 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
22 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.timely.ResumableNode; | ||
24 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
25 | |||
26 | public class TimelyMailbox implements Mailbox { | ||
27 | |||
28 | protected TreeMap<Timestamp, Map<Tuple, Integer>> queue; | ||
29 | protected final Receiver receiver; | ||
30 | protected final ReteContainer container; | ||
31 | protected CommunicationGroup group; | ||
32 | protected boolean fallThrough; | ||
33 | |||
34 | public TimelyMailbox(final Receiver receiver, final ReteContainer container) { | ||
35 | this.receiver = receiver; | ||
36 | this.container = container; | ||
37 | this.queue = CollectionsFactory.createTreeMap(); | ||
38 | } | ||
39 | |||
40 | protected TreeMap<Timestamp, Map<Tuple, Integer>> getActiveQueue() { | ||
41 | return this.queue; | ||
42 | } | ||
43 | |||
44 | @Override | ||
45 | public boolean isEmpty() { | ||
46 | return getActiveQueue().isEmpty(); | ||
47 | } | ||
48 | |||
49 | @Override | ||
50 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
51 | final TreeMap<Timestamp, Map<Tuple, Integer>> activeQueue = getActiveQueue(); | ||
52 | |||
53 | Map<Tuple, Integer> tupleMap = activeQueue.get(timestamp); | ||
54 | final boolean wasEmpty = tupleMap == null; | ||
55 | boolean significantChange = false; | ||
56 | |||
57 | if (tupleMap == null) { | ||
58 | tupleMap = CollectionsFactory.createMap(); | ||
59 | activeQueue.put(timestamp, tupleMap); | ||
60 | significantChange = true; | ||
61 | } | ||
62 | |||
63 | Integer count = tupleMap.get(update); | ||
64 | if (count == null) { | ||
65 | count = 0; | ||
66 | significantChange = true; | ||
67 | } | ||
68 | |||
69 | if (direction == Direction.DELETE) { | ||
70 | count--; | ||
71 | } else { | ||
72 | count++; | ||
73 | } | ||
74 | |||
75 | if (count == 0) { | ||
76 | tupleMap.remove(update); | ||
77 | if (tupleMap.isEmpty()) { | ||
78 | activeQueue.remove(timestamp); | ||
79 | } | ||
80 | significantChange = true; | ||
81 | } else { | ||
82 | tupleMap.put(update, count); | ||
83 | } | ||
84 | |||
85 | if (significantChange) { | ||
86 | if (wasEmpty) { | ||
87 | this.group.notifyHasMessage(this, timestamp); | ||
88 | } else if (tupleMap.isEmpty()) { | ||
89 | final Timestamp resumableTimestamp = (this.receiver instanceof ResumableNode) | ||
90 | ? ((ResumableNode) this.receiver).getResumableTimestamp() | ||
91 | : null; | ||
92 | // check if there is folding left to do before unsubscribing just based on the message queue being empty | ||
93 | if (resumableTimestamp == null || resumableTimestamp.compareTo(timestamp) != 0) { | ||
94 | this.group.notifyLostAllMessages(this, timestamp); | ||
95 | } | ||
96 | } | ||
97 | } | ||
98 | } | ||
99 | |||
100 | @Override | ||
101 | public void deliverAll(final MessageSelector selector) { | ||
102 | if (selector instanceof Timestamp) { | ||
103 | final Timestamp timestamp = (Timestamp) selector; | ||
104 | // REMOVE the tuples associated with the selector, dont just query them | ||
105 | final Map<Tuple, Integer> tupleMap = this.queue.remove(timestamp); | ||
106 | |||
107 | // tupleMap may be empty if we only have lazy folding to do | ||
108 | if (tupleMap != null) { | ||
109 | this.receiver.batchUpdate(tupleMap.entrySet(), timestamp); | ||
110 | } | ||
111 | |||
112 | if (this.container.getTimelyConfiguration() | ||
113 | .getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { | ||
114 | // (1) either normal delivery, which ended up being a lazy folding state | ||
115 | // (2) and/or lazy folding needs to be resumed | ||
116 | if (this.receiver instanceof ResumableNode) { | ||
117 | ((ResumableNode) this.receiver).resumeAt(timestamp); | ||
118 | } | ||
119 | } | ||
120 | } else { | ||
121 | throw new IllegalArgumentException("Unsupported message selector " + selector); | ||
122 | } | ||
123 | } | ||
124 | |||
125 | @Override | ||
126 | public String toString() { | ||
127 | return "DDF_MBOX (" + this.receiver + ") " + this.getActiveQueue(); | ||
128 | } | ||
129 | |||
130 | @Override | ||
131 | public Receiver getReceiver() { | ||
132 | return this.receiver; | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void clear() { | ||
137 | this.queue.clear(); | ||
138 | } | ||
139 | |||
140 | @Override | ||
141 | public CommunicationGroup getCurrentGroup() { | ||
142 | return this.group; | ||
143 | } | ||
144 | |||
145 | @Override | ||
146 | public void setCurrentGroup(final CommunicationGroup group) { | ||
147 | this.group = group; | ||
148 | } | ||
149 | |||
150 | } | ||