diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/single/DiscriminatorDispatcherNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/single/DiscriminatorDispatcherNode.java | 154 |
1 files changed, 154 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/single/DiscriminatorDispatcherNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/single/DiscriminatorDispatcherNode.java new file mode 100644 index 00000000..a8e11fcd --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/single/DiscriminatorDispatcherNode.java | |||
@@ -0,0 +1,154 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2016, Gabor Bergmann, IncQueryLabs Ltd. | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.single; | ||
10 | |||
11 | import java.util.ArrayList; | ||
12 | import java.util.Collection; | ||
13 | import java.util.HashMap; | ||
14 | import java.util.Map; | ||
15 | import java.util.Map.Entry; | ||
16 | |||
17 | import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext; | ||
18 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
21 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
22 | import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; | ||
23 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
24 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
26 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
27 | |||
28 | /** | ||
29 | * Node that sends tuples off to different buckets (attached as children of type {@link DiscriminatorBucketNode}), based | ||
30 | * on the value of a given column. | ||
31 | * | ||
32 | * <p> | ||
33 | * Tuple contents and bucket keys have already been wrapped using {@link IQueryRuntimeContext#wrapElement(Object)} | ||
34 | * | ||
35 | * @author Gabor Bergmann | ||
36 | * @since 1.5 | ||
37 | */ | ||
38 | public class DiscriminatorDispatcherNode extends SingleInputNode implements NetworkStructureChangeSensitiveNode { | ||
39 | |||
40 | private int discriminationColumnIndex; | ||
41 | private Map<Object, DiscriminatorBucketNode> buckets = new HashMap<>(); | ||
42 | private Map<Object, Mailbox> bucketMailboxes = new HashMap<>(); | ||
43 | |||
44 | /** | ||
45 | * @param reteContainer | ||
46 | */ | ||
47 | public DiscriminatorDispatcherNode(ReteContainer reteContainer, int discriminationColumnIndex) { | ||
48 | super(reteContainer); | ||
49 | this.discriminationColumnIndex = discriminationColumnIndex; | ||
50 | } | ||
51 | |||
52 | @Override | ||
53 | public void update(Direction direction, Tuple updateElement, Timestamp timestamp) { | ||
54 | Object dispatchKey = updateElement.get(discriminationColumnIndex); | ||
55 | Mailbox bucketMailBox = bucketMailboxes.get(dispatchKey); | ||
56 | if (bucketMailBox != null) { | ||
57 | bucketMailBox.postMessage(direction, updateElement, timestamp); | ||
58 | } | ||
59 | } | ||
60 | |||
61 | public int getDiscriminationColumnIndex() { | ||
62 | return discriminationColumnIndex; | ||
63 | } | ||
64 | |||
65 | @Override | ||
66 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
67 | propagatePullInto(collector, flush); | ||
68 | } | ||
69 | |||
70 | @Override | ||
71 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
72 | propagatePullIntoWithTimestamp(collector, flush); | ||
73 | } | ||
74 | |||
75 | /** | ||
76 | * @since 2.3 | ||
77 | */ | ||
78 | public void pullIntoFiltered(final Collection<Tuple> collector, final Object bucketKey, final boolean flush) { | ||
79 | final ArrayList<Tuple> unfiltered = new ArrayList<Tuple>(); | ||
80 | propagatePullInto(unfiltered, flush); | ||
81 | for (Tuple tuple : unfiltered) { | ||
82 | if (bucketKey.equals(tuple.get(discriminationColumnIndex))) { | ||
83 | collector.add(tuple); | ||
84 | } | ||
85 | } | ||
86 | } | ||
87 | |||
88 | /** | ||
89 | * @since 2.3 | ||
90 | */ | ||
91 | public void pullIntoWithTimestampFiltered(final Map<Tuple, Timeline<Timestamp>> collector, final Object bucketKey, | ||
92 | final boolean flush) { | ||
93 | final Map<Tuple, Timeline<Timestamp>> unfiltered = CollectionsFactory.createMap(); | ||
94 | propagatePullIntoWithTimestamp(unfiltered, flush); | ||
95 | for (final Entry<Tuple, Timeline<Timestamp>> entry : unfiltered.entrySet()) { | ||
96 | if (bucketKey.equals(entry.getKey().get(discriminationColumnIndex))) { | ||
97 | collector.put(entry.getKey(), entry.getValue()); | ||
98 | } | ||
99 | } | ||
100 | } | ||
101 | |||
102 | @Override | ||
103 | public void appendChild(Receiver receiver) { | ||
104 | super.appendChild(receiver); | ||
105 | if (receiver instanceof DiscriminatorBucketNode) { | ||
106 | DiscriminatorBucketNode bucket = (DiscriminatorBucketNode) receiver; | ||
107 | Object bucketKey = bucket.getBucketKey(); | ||
108 | DiscriminatorBucketNode old = buckets.put(bucketKey, bucket); | ||
109 | if (old != null) { | ||
110 | throw new IllegalStateException(); | ||
111 | } | ||
112 | bucketMailboxes.put(bucketKey, this.getCommunicationTracker().proxifyMailbox(this, bucket.getMailbox())); | ||
113 | } | ||
114 | } | ||
115 | |||
116 | /** | ||
117 | * @since 2.2 | ||
118 | */ | ||
119 | public Map<Object, Mailbox> getBucketMailboxes() { | ||
120 | return this.bucketMailboxes; | ||
121 | } | ||
122 | |||
123 | @Override | ||
124 | public void networkStructureChanged() { | ||
125 | bucketMailboxes.clear(); | ||
126 | for (Receiver receiver : children) { | ||
127 | if (receiver instanceof DiscriminatorBucketNode) { | ||
128 | DiscriminatorBucketNode bucket = (DiscriminatorBucketNode) receiver; | ||
129 | Object bucketKey = bucket.getBucketKey(); | ||
130 | bucketMailboxes.put(bucketKey, | ||
131 | this.getCommunicationTracker().proxifyMailbox(this, bucket.getMailbox())); | ||
132 | } | ||
133 | } | ||
134 | } | ||
135 | |||
136 | @Override | ||
137 | public void removeChild(Receiver receiver) { | ||
138 | super.removeChild(receiver); | ||
139 | if (receiver instanceof DiscriminatorBucketNode) { | ||
140 | DiscriminatorBucketNode bucket = (DiscriminatorBucketNode) receiver; | ||
141 | Object bucketKey = bucket.getBucketKey(); | ||
142 | DiscriminatorBucketNode old = buckets.remove(bucketKey); | ||
143 | if (old != bucket) | ||
144 | throw new IllegalStateException(); | ||
145 | bucketMailboxes.remove(bucketKey); | ||
146 | } | ||
147 | } | ||
148 | |||
149 | @Override | ||
150 | protected String toStringCore() { | ||
151 | return super.toStringCore() + '<' + discriminationColumnIndex + '>'; | ||
152 | } | ||
153 | |||
154 | } | ||