diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java new file mode 100644 index 00000000..51f89b52 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java | |||
@@ -0,0 +1,209 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2015, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.viatra.runtime.rete.boundary; | ||
10 | |||
11 | import java.util.Collection; | ||
12 | import java.util.Collections; | ||
13 | import java.util.Map; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.context.IInputKey; | ||
16 | import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext; | ||
17 | import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext; | ||
18 | import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContextListener; | ||
19 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
20 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
21 | import tools.refinery.viatra.runtime.matchers.tuple.Tuples; | ||
22 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
23 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
24 | import tools.refinery.viatra.runtime.rete.matcher.ReteEngine; | ||
25 | import tools.refinery.viatra.runtime.rete.network.Network; | ||
26 | import tools.refinery.viatra.runtime.rete.network.Receiver; | ||
27 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
28 | import tools.refinery.viatra.runtime.rete.network.StandardNode; | ||
29 | import tools.refinery.viatra.runtime.rete.network.Supplier; | ||
30 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
31 | import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; | ||
32 | import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox; | ||
33 | import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox; | ||
34 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
35 | |||
36 | /** | ||
37 | * An input node representing an enumerable extensional input relation and receiving external updates. | ||
38 | * | ||
39 | * <p> | ||
40 | * Contains those tuples that are in the extensional relation identified by the input key, and also conform to the | ||
41 | * global seed (if any). | ||
42 | * | ||
43 | * @author Bergmann Gabor | ||
44 | * | ||
45 | */ | ||
46 | public class ExternalInputEnumeratorNode extends StandardNode | ||
47 | implements Disconnectable, Receiver, IQueryRuntimeContextListener { | ||
48 | |||
49 | private IQueryRuntimeContext context = null; | ||
50 | private IInputKey inputKey; | ||
51 | private Tuple globalSeed; | ||
52 | private InputConnector inputConnector; | ||
53 | private Network network; | ||
54 | private Address<? extends Receiver> myAddress; | ||
55 | private boolean parallelExecutionEnabled; | ||
56 | /** | ||
57 | * @since 1.6 | ||
58 | */ | ||
59 | protected final Mailbox mailbox; | ||
60 | private final IQueryBackendContext qBackendContext; | ||
61 | |||
62 | public ExternalInputEnumeratorNode(ReteContainer reteContainer) { | ||
63 | super(reteContainer); | ||
64 | myAddress = Address.of(this); | ||
65 | network = reteContainer.getNetwork(); | ||
66 | inputConnector = network.getInputConnector(); | ||
67 | qBackendContext = network.getEngine().getBackendContext(); | ||
68 | mailbox = instantiateMailbox(); | ||
69 | reteContainer.registerClearable(mailbox); | ||
70 | } | ||
71 | |||
72 | /** | ||
73 | * Instantiates the {@link Mailbox} of this receiver. Subclasses may override this method to provide their own | ||
74 | * mailbox implementation. | ||
75 | * | ||
76 | * @return the mailbox | ||
77 | * @since 2.0 | ||
78 | */ | ||
79 | protected Mailbox instantiateMailbox() { | ||
80 | if (this.reteContainer.isTimelyEvaluation()) { | ||
81 | return new TimelyMailbox(this, this.reteContainer); | ||
82 | } else { | ||
83 | return new BehaviorChangingMailbox(this, this.reteContainer); | ||
84 | } | ||
85 | } | ||
86 | |||
87 | @Override | ||
88 | public Mailbox getMailbox() { | ||
89 | return this.mailbox; | ||
90 | } | ||
91 | |||
92 | public void connectThroughContext(ReteEngine engine, IInputKey inputKey, Tuple globalSeed) { | ||
93 | this.inputKey = inputKey; | ||
94 | this.globalSeed = globalSeed; | ||
95 | setTag(inputKey); | ||
96 | |||
97 | final IQueryRuntimeContext context = engine.getRuntimeContext(); | ||
98 | if (!context.getMetaContext().isEnumerable(inputKey)) | ||
99 | throw new IllegalArgumentException(this.getClass().getSimpleName() | ||
100 | + " only applicable for enumerable input keys; received instead " + inputKey); | ||
101 | |||
102 | this.context = context; | ||
103 | this.parallelExecutionEnabled = engine.isParallelExecutionEnabled(); | ||
104 | |||
105 | engine.addDisconnectable(this); | ||
106 | context.addUpdateListener(inputKey, globalSeed, this); | ||
107 | } | ||
108 | |||
109 | @Override | ||
110 | public void disconnect() { | ||
111 | if (context != null) { // if connected | ||
112 | context.removeUpdateListener(inputKey, globalSeed, this); | ||
113 | context = null; | ||
114 | } | ||
115 | } | ||
116 | |||
117 | /** | ||
118 | * @since 2.2 | ||
119 | */ | ||
120 | protected Iterable<Tuple> getTuplesInternal() { | ||
121 | Iterable<Tuple> tuples = null; | ||
122 | |||
123 | if (context != null) { // if connected | ||
124 | if (globalSeed == null) { | ||
125 | tuples = context.enumerateTuples(inputKey, TupleMask.empty(inputKey.getArity()), | ||
126 | Tuples.staticArityFlatTupleOf()); | ||
127 | } else { | ||
128 | final TupleMask mask = TupleMask.fromNonNullIndices(globalSeed); | ||
129 | tuples = context.enumerateTuples(inputKey, mask, mask.transform(globalSeed)); | ||
130 | } | ||
131 | } | ||
132 | |||
133 | return tuples; | ||
134 | } | ||
135 | |||
136 | @Override | ||
137 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
138 | final Iterable<Tuple> tuples = getTuplesInternal(); | ||
139 | if (tuples != null) { | ||
140 | for (final Tuple tuple : tuples) { | ||
141 | collector.add(tuple); | ||
142 | } | ||
143 | } | ||
144 | } | ||
145 | |||
146 | @Override | ||
147 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
148 | final Iterable<Tuple> tuples = getTuplesInternal(); | ||
149 | if (tuples != null) { | ||
150 | for (final Tuple tuple : tuples) { | ||
151 | collector.put(tuple, Timestamp.INSERT_AT_ZERO_TIMELINE); | ||
152 | } | ||
153 | } | ||
154 | } | ||
155 | |||
156 | /* Update from runtime context */ | ||
157 | @Override | ||
158 | public void update(IInputKey key, Tuple update, boolean isInsertion) { | ||
159 | if (parallelExecutionEnabled) { | ||
160 | // send back to myself as an official external update, and then propagate it transparently | ||
161 | network.sendExternalUpdate(myAddress, direction(isInsertion), update); | ||
162 | } else { | ||
163 | if (qBackendContext.areUpdatesDelayed()) { | ||
164 | // post the update into the mailbox of the node | ||
165 | mailbox.postMessage(direction(isInsertion), update, Timestamp.ZERO); | ||
166 | } else { | ||
167 | // just propagate the input | ||
168 | update(direction(isInsertion), update, Timestamp.ZERO); | ||
169 | } | ||
170 | // if the the update method is called from within a delayed execution, | ||
171 | // the following invocation will be a no-op | ||
172 | network.waitForReteTermination(); | ||
173 | } | ||
174 | } | ||
175 | |||
176 | private static Direction direction(boolean isInsertion) { | ||
177 | return isInsertion ? Direction.INSERT : Direction.DELETE; | ||
178 | } | ||
179 | |||
180 | /* Self-addressed from network */ | ||
181 | @Override | ||
182 | public void update(Direction direction, Tuple updateElement, Timestamp timestamp) { | ||
183 | propagateUpdate(direction, updateElement, timestamp); | ||
184 | } | ||
185 | |||
186 | @Override | ||
187 | public void appendParent(Supplier supplier) { | ||
188 | throw new UnsupportedOperationException("Input nodes can't have parents"); | ||
189 | } | ||
190 | |||
191 | @Override | ||
192 | public void removeParent(Supplier supplier) { | ||
193 | throw new UnsupportedOperationException("Input nodes can't have parents"); | ||
194 | } | ||
195 | |||
196 | @Override | ||
197 | public Collection<Supplier> getParents() { | ||
198 | return Collections.emptySet(); | ||
199 | } | ||
200 | |||
201 | public IInputKey getInputKey() { | ||
202 | return inputKey; | ||
203 | } | ||
204 | |||
205 | public Tuple getGlobalSeed() { | ||
206 | return globalSeed; | ||
207 | } | ||
208 | |||
209 | } | ||