aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java209
1 files changed, 209 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java
new file mode 100644
index 00000000..51f89b52
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/boundary/ExternalInputEnumeratorNode.java
@@ -0,0 +1,209 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2015, Bergmann Gabor, Istvan Rath and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9package tools.refinery.viatra.runtime.rete.boundary;
10
11import java.util.Collection;
12import java.util.Collections;
13import java.util.Map;
14
15import tools.refinery.viatra.runtime.matchers.context.IInputKey;
16import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext;
17import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContext;
18import tools.refinery.viatra.runtime.matchers.context.IQueryRuntimeContextListener;
19import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
20import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
21import tools.refinery.viatra.runtime.matchers.tuple.Tuples;
22import tools.refinery.viatra.runtime.matchers.util.Direction;
23import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
24import tools.refinery.viatra.runtime.rete.matcher.ReteEngine;
25import tools.refinery.viatra.runtime.rete.network.Network;
26import tools.refinery.viatra.runtime.rete.network.Receiver;
27import tools.refinery.viatra.runtime.rete.network.ReteContainer;
28import tools.refinery.viatra.runtime.rete.network.StandardNode;
29import tools.refinery.viatra.runtime.rete.network.Supplier;
30import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
31import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox;
32import tools.refinery.viatra.runtime.rete.network.mailbox.timeless.BehaviorChangingMailbox;
33import tools.refinery.viatra.runtime.rete.network.mailbox.timely.TimelyMailbox;
34import tools.refinery.viatra.runtime.rete.remote.Address;
35
36/**
37 * An input node representing an enumerable extensional input relation and receiving external updates.
38 *
39 * <p>
40 * Contains those tuples that are in the extensional relation identified by the input key, and also conform to the
41 * global seed (if any).
42 *
43 * @author Bergmann Gabor
44 *
45 */
46public class ExternalInputEnumeratorNode extends StandardNode
47 implements Disconnectable, Receiver, IQueryRuntimeContextListener {
48
49 private IQueryRuntimeContext context = null;
50 private IInputKey inputKey;
51 private Tuple globalSeed;
52 private InputConnector inputConnector;
53 private Network network;
54 private Address<? extends Receiver> myAddress;
55 private boolean parallelExecutionEnabled;
56 /**
57 * @since 1.6
58 */
59 protected final Mailbox mailbox;
60 private final IQueryBackendContext qBackendContext;
61
62 public ExternalInputEnumeratorNode(ReteContainer reteContainer) {
63 super(reteContainer);
64 myAddress = Address.of(this);
65 network = reteContainer.getNetwork();
66 inputConnector = network.getInputConnector();
67 qBackendContext = network.getEngine().getBackendContext();
68 mailbox = instantiateMailbox();
69 reteContainer.registerClearable(mailbox);
70 }
71
72 /**
73 * Instantiates the {@link Mailbox} of this receiver. Subclasses may override this method to provide their own
74 * mailbox implementation.
75 *
76 * @return the mailbox
77 * @since 2.0
78 */
79 protected Mailbox instantiateMailbox() {
80 if (this.reteContainer.isTimelyEvaluation()) {
81 return new TimelyMailbox(this, this.reteContainer);
82 } else {
83 return new BehaviorChangingMailbox(this, this.reteContainer);
84 }
85 }
86
87 @Override
88 public Mailbox getMailbox() {
89 return this.mailbox;
90 }
91
92 public void connectThroughContext(ReteEngine engine, IInputKey inputKey, Tuple globalSeed) {
93 this.inputKey = inputKey;
94 this.globalSeed = globalSeed;
95 setTag(inputKey);
96
97 final IQueryRuntimeContext context = engine.getRuntimeContext();
98 if (!context.getMetaContext().isEnumerable(inputKey))
99 throw new IllegalArgumentException(this.getClass().getSimpleName()
100 + " only applicable for enumerable input keys; received instead " + inputKey);
101
102 this.context = context;
103 this.parallelExecutionEnabled = engine.isParallelExecutionEnabled();
104
105 engine.addDisconnectable(this);
106 context.addUpdateListener(inputKey, globalSeed, this);
107 }
108
109 @Override
110 public void disconnect() {
111 if (context != null) { // if connected
112 context.removeUpdateListener(inputKey, globalSeed, this);
113 context = null;
114 }
115 }
116
117 /**
118 * @since 2.2
119 */
120 protected Iterable<Tuple> getTuplesInternal() {
121 Iterable<Tuple> tuples = null;
122
123 if (context != null) { // if connected
124 if (globalSeed == null) {
125 tuples = context.enumerateTuples(inputKey, TupleMask.empty(inputKey.getArity()),
126 Tuples.staticArityFlatTupleOf());
127 } else {
128 final TupleMask mask = TupleMask.fromNonNullIndices(globalSeed);
129 tuples = context.enumerateTuples(inputKey, mask, mask.transform(globalSeed));
130 }
131 }
132
133 return tuples;
134 }
135
136 @Override
137 public void pullInto(final Collection<Tuple> collector, final boolean flush) {
138 final Iterable<Tuple> tuples = getTuplesInternal();
139 if (tuples != null) {
140 for (final Tuple tuple : tuples) {
141 collector.add(tuple);
142 }
143 }
144 }
145
146 @Override
147 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
148 final Iterable<Tuple> tuples = getTuplesInternal();
149 if (tuples != null) {
150 for (final Tuple tuple : tuples) {
151 collector.put(tuple, Timestamp.INSERT_AT_ZERO_TIMELINE);
152 }
153 }
154 }
155
156 /* Update from runtime context */
157 @Override
158 public void update(IInputKey key, Tuple update, boolean isInsertion) {
159 if (parallelExecutionEnabled) {
160 // send back to myself as an official external update, and then propagate it transparently
161 network.sendExternalUpdate(myAddress, direction(isInsertion), update);
162 } else {
163 if (qBackendContext.areUpdatesDelayed()) {
164 // post the update into the mailbox of the node
165 mailbox.postMessage(direction(isInsertion), update, Timestamp.ZERO);
166 } else {
167 // just propagate the input
168 update(direction(isInsertion), update, Timestamp.ZERO);
169 }
170 // if the the update method is called from within a delayed execution,
171 // the following invocation will be a no-op
172 network.waitForReteTermination();
173 }
174 }
175
176 private static Direction direction(boolean isInsertion) {
177 return isInsertion ? Direction.INSERT : Direction.DELETE;
178 }
179
180 /* Self-addressed from network */
181 @Override
182 public void update(Direction direction, Tuple updateElement, Timestamp timestamp) {
183 propagateUpdate(direction, updateElement, timestamp);
184 }
185
186 @Override
187 public void appendParent(Supplier supplier) {
188 throw new UnsupportedOperationException("Input nodes can't have parents");
189 }
190
191 @Override
192 public void removeParent(Supplier supplier) {
193 throw new UnsupportedOperationException("Input nodes can't have parents");
194 }
195
196 @Override
197 public Collection<Supplier> getParents() {
198 return Collections.emptySet();
199 }
200
201 public IInputKey getInputKey() {
202 return inputKey;
203 }
204
205 public Tuple getGlobalSeed() {
206 return globalSeed;
207 }
208
209}