diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java new file mode 100644 index 00000000..afa155b2 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/mailbox/timeless/UpdateSplittingMailbox.java | |||
@@ -0,0 +1,135 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; | ||
10 | |||
11 | import java.util.Map.Entry; | ||
12 | |||
13 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
14 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
15 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
16 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
17 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
18 | import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; | ||
19 | import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; | ||
20 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
21 | import tools.refinery.viatra.runtime.rete.network.indexer.DefaultMessageIndexer; | ||
22 | import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox; | ||
23 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
24 | |||
25 | /** | ||
26 | * A mailbox implementation that splits updates messages according to the standard subset ordering into anti-monotonic | ||
27 | * (deletions) and monotonic (insertions) updates. | ||
28 | * | ||
29 | * @author Tamas Szabo | ||
30 | * @since 2.0 | ||
31 | */ | ||
32 | public class UpdateSplittingMailbox extends AbstractUpdateSplittingMailbox<DefaultMessageIndexer, Receiver> | ||
33 | implements AdaptableMailbox { | ||
34 | |||
35 | protected Mailbox adapter; | ||
36 | |||
37 | public UpdateSplittingMailbox(final Receiver receiver, final ReteContainer container) { | ||
38 | super(receiver, container, DefaultMessageIndexer::new); | ||
39 | this.adapter = this; | ||
40 | } | ||
41 | |||
42 | @Override | ||
43 | public Mailbox getAdapter() { | ||
44 | return this.adapter; | ||
45 | } | ||
46 | |||
47 | @Override | ||
48 | public void setAdapter(final Mailbox adapter) { | ||
49 | this.adapter = adapter; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { | ||
54 | final DefaultMessageIndexer monotoneQueue = getActiveMonotoneQueue(); | ||
55 | final DefaultMessageIndexer antiMonotoneQueue = getActiveAntiMonotoneQueue(); | ||
56 | final boolean wasPresentAsMonotone = monotoneQueue.getCount(update) != 0; | ||
57 | final boolean wasPresentAsAntiMonotone = antiMonotoneQueue.getCount(update) != 0; | ||
58 | |||
59 | // it cannot happen that it was present in both | ||
60 | assert !(wasPresentAsMonotone && wasPresentAsAntiMonotone); | ||
61 | |||
62 | if (direction == Direction.INSERT) { | ||
63 | if (wasPresentAsAntiMonotone) { | ||
64 | // it was an anti-monotone one before | ||
65 | antiMonotoneQueue.insert(update); | ||
66 | } else { | ||
67 | // it was a monotone one before or did not exist at all | ||
68 | monotoneQueue.insert(update); | ||
69 | } | ||
70 | } else { | ||
71 | if (wasPresentAsMonotone) { | ||
72 | // it was a monotone one before | ||
73 | monotoneQueue.delete(update); | ||
74 | } else { | ||
75 | // it was an anti-monotone one before or did not exist at all | ||
76 | antiMonotoneQueue.delete(update); | ||
77 | } | ||
78 | } | ||
79 | |||
80 | final Mailbox targetMailbox = this.adapter; | ||
81 | final CommunicationGroup targetGroup = this.adapter.getCurrentGroup(); | ||
82 | |||
83 | if (antiMonotoneQueue.isEmpty()) { | ||
84 | targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.ANTI_MONOTONE); | ||
85 | } else { | ||
86 | targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.ANTI_MONOTONE); | ||
87 | } | ||
88 | |||
89 | if (monotoneQueue.isEmpty()) { | ||
90 | targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.MONOTONE); | ||
91 | } else { | ||
92 | targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.MONOTONE); | ||
93 | } | ||
94 | } | ||
95 | |||
96 | @Override | ||
97 | public void deliverAll(final MessageSelector kind) { | ||
98 | if (kind == PhasedSelector.ANTI_MONOTONE) { | ||
99 | // deliver anti-monotone | ||
100 | this.deliveringAntiMonotone = true; | ||
101 | for (final Entry<Tuple, Integer> entry : this.antiMonotoneQueue.getTuples().entrySet()) { | ||
102 | final Tuple update = entry.getKey(); | ||
103 | final int count = entry.getValue(); | ||
104 | assert count < 0; | ||
105 | for (int i = 0; i < Math.abs(count); i++) { | ||
106 | this.receiver.update(Direction.DELETE, update, Timestamp.ZERO); | ||
107 | } | ||
108 | } | ||
109 | this.deliveringAntiMonotone = false; | ||
110 | swapAndClearAntiMonotone(); | ||
111 | } else if (kind == PhasedSelector.MONOTONE) { | ||
112 | // deliver monotone | ||
113 | this.deliveringMonotone = true; | ||
114 | for (final Entry<Tuple, Integer> entry : this.monotoneQueue.getTuples().entrySet()) { | ||
115 | final Tuple update = entry.getKey(); | ||
116 | final int count = entry.getValue(); | ||
117 | assert count > 0; | ||
118 | for (int i = 0; i < count; i++) { | ||
119 | this.receiver.update(Direction.INSERT, update, Timestamp.ZERO); | ||
120 | } | ||
121 | } | ||
122 | this.deliveringMonotone = false; | ||
123 | swapAndClearMonotone(); | ||
124 | } else { | ||
125 | throw new IllegalArgumentException("Unsupported message kind " + kind); | ||
126 | } | ||
127 | } | ||
128 | |||
129 | @Override | ||
130 | public String toString() { | ||
131 | return "US_MBOX (" + this.receiver + ") " + this.getActiveMonotoneQueue() + " " | ||
132 | + this.getActiveAntiMonotoneQueue(); | ||
133 | } | ||
134 | |||
135 | } | ||