aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java408
1 files changed, 408 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java
new file mode 100644
index 00000000..64f59ff3
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java
@@ -0,0 +1,408 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.ArrayList;
13import java.util.Collection;
14import java.util.Collections;
15import java.util.List;
16import java.util.Map;
17import java.util.Map.Entry;
18import java.util.Set;
19import java.util.concurrent.locks.Lock;
20import java.util.concurrent.locks.ReadWriteLock;
21import java.util.concurrent.locks.ReentrantReadWriteLock;
22
23import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
24import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
25import tools.refinery.viatra.runtime.matchers.util.Direction;
26import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
27import tools.refinery.viatra.runtime.rete.matcher.ReteEngine;
28import tools.refinery.viatra.runtime.rete.recipes.ReteNodeRecipe;
29import tools.refinery.viatra.runtime.rete.remote.Address;
30import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo;
31import tools.refinery.viatra.runtime.rete.util.Options;
32
33/**
34 * @author Gabor Bergmann
35 *
36 */
37public class Network {
38 final int threads;
39
40 protected ArrayList<ReteContainer> containers;
41 ReteContainer headContainer;
42 private int firstContainer = 0;
43 private int nextContainer = 0;
44
45 // the following fields exist only if threads > 0
46 protected Map<ReteContainer, Long> globalTerminationCriteria = null;
47 protected Map<ReteContainer, Long> reportedClocks = null;
48 protected Lock updateLock = null; // grab during normal update operations
49 protected Lock structuralChangeLock = null; // grab if the network structure
50 // is to
51 // be changed
52
53 // Knowledge of the outside world
54 private ReteEngine engine;
55 protected NodeFactory nodeFactory;
56 protected InputConnector inputConnector;
57
58 // Node and recipe administration
59 // incl. addresses for existing nodes by recipe (where available)
60 // Maintained by NodeProvisioner of each container
61 Map<ReteNodeRecipe, Address<? extends Node>> nodesByRecipe = CollectionsFactory.createMap();
62 Set<RecipeTraceInfo> recipeTraces = CollectionsFactory.createSet();
63
64 /**
65 * @throws IllegalStateException
66 * if no node has been constructed for the recipe
67 */
68 public synchronized Address<? extends Node> getExistingNodeByRecipe(ReteNodeRecipe recipe) {
69 final Address<? extends Node> node = nodesByRecipe.get(recipe);
70 if (node == null)
71 throw new IllegalStateException(String.format("Rete node for recipe %s not constructed yet.", recipe));
72 return node;
73 }
74
75 /**
76 * @return null if no node has been constructed for the recipe
77 */
78 public synchronized Address<? extends Node> getNodeByRecipeIfExists(ReteNodeRecipe recipe) {
79 final Address<? extends Node> node = nodesByRecipe.get(recipe);
80 return node;
81 }
82
83 /**
84 * @param threads
85 * the number of threads to operate the network with; 0 means single-threaded operation, 1 starts an
86 * asynchronous thread to operate the RETE net, >1 uses multiple RETE containers.
87 */
88 public Network(int threads, ReteEngine engine) {
89 super();
90 this.threads = threads;
91 this.engine = engine;
92 this.inputConnector = new InputConnector(this);
93 this.nodeFactory = new NodeFactory(engine.getLogger());
94
95 containers = new ArrayList<ReteContainer>();
96 firstContainer = (threads > 1) ? Options.firstFreeContainer : 0; // NOPMD
97 nextContainer = firstContainer;
98
99 if (threads > 0) {
100 globalTerminationCriteria = CollectionsFactory.createMap();
101 reportedClocks = CollectionsFactory.createMap();
102 ReadWriteLock rwl = new ReentrantReadWriteLock();
103 updateLock = rwl.readLock();
104 structuralChangeLock = rwl.writeLock();
105 for (int i = 0; i < threads; ++i)
106 containers.add(new ReteContainer(this, true));
107 } else
108 containers.add(new ReteContainer(this, false));
109
110 headContainer = containers.get(0);
111 }
112
113 /**
114 * Kills this Network along with all containers and message consumption cycles.
115 */
116 public void kill() {
117 for (ReteContainer container : containers) {
118 container.kill();
119 }
120 containers.clear();
121 }
122
123 /**
124 * Returns the head container, that is guaranteed to reside in the same JVM as the Network object.
125 */
126 public ReteContainer getHeadContainer() {
127 return headContainer;
128 }
129
130 /**
131 * Returns the next container in round-robin fashion. Configurable not to yield head container.
132 */
133 public ReteContainer getNextContainer() {
134 if (nextContainer >= containers.size())
135 nextContainer = firstContainer;
136 return containers.get(nextContainer++);
137 }
138
139 /**
140 * Internal message delivery method.
141 *
142 * @pre threads > 0
143 */
144 private void sendUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
145 ReteContainer affectedContainer = receiver.getContainer();
146 synchronized (globalTerminationCriteria) {
147 long newCriterion = affectedContainer.sendUpdateToLocalAddress(receiver, direction, updateElement);
148 terminationCriterion(affectedContainer, newCriterion);
149 }
150 }
151
152 /**
153 * Internal message delivery method for single-threaded operation
154 *
155 * @pre threads == 0
156 */
157 private void sendUpdateSingleThreaded(Address<? extends Receiver> receiver, Direction direction,
158 Tuple updateElement) {
159 ReteContainer affectedContainer = receiver.getContainer();
160 affectedContainer.sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement);
161 }
162
163 /**
164 * Internal message delivery method.
165 *
166 * @pre threads > 0
167 */
168 private void sendUpdates(Address<? extends Receiver> receiver, Direction direction,
169 Collection<Tuple> updateElements) {
170 if (updateElements.isEmpty())
171 return;
172 ReteContainer affectedContainer = receiver.getContainer();
173 synchronized (globalTerminationCriteria) {
174 long newCriterion = affectedContainer.sendUpdatesToLocalAddress(receiver, direction, updateElements);
175 terminationCriterion(affectedContainer, newCriterion);
176 }
177 }
178
179 /**
180 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may
181 * reside in any of the containers associated with this network. To be called from a user thread during normal
182 * operation, NOT during construction.
183 *
184 * @since 2.4
185 */
186 public void sendExternalUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
187 if (threads > 0) {
188 try {
189 updateLock.lock();
190 sendUpdate(receiver, direction, updateElement);
191 } finally {
192 updateLock.unlock();
193 }
194 } else {
195 sendUpdateSingleThreaded(receiver, direction, updateElement);
196 // getHeadContainer().
197 }
198 }
199
200 /**
201 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may
202 * reside in any of the containers associated with this network. To be called from a user thread during
203 * construction.
204 *
205 * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may
206 * span through network calls, that's why it's not enforced here )
207 *
208 * @return the value of the target container's clock at the time when the message was accepted into its message
209 * queue
210 * @since 2.4
211 */
212 public void sendConstructionUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) {
213 // structuralChangeLock.lock();
214 if (threads > 0)
215 sendUpdate(receiver, direction, updateElement);
216 else
217 receiver.getContainer().sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement);
218 // structuralChangeLock.unlock();
219 }
220
221 /**
222 * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial
223 * matching. The node may reside in any of the containers associated with this network. To be called from a user
224 * thread during construction.
225 *
226 * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may
227 * span through network calls, that's why it's not enforced here )
228 *
229 * @since 2.4
230 */
231 public void sendConstructionUpdates(Address<? extends Receiver> receiver, Direction direction,
232 Collection<Tuple> updateElements) {
233 // structuralChangeLock.lock();
234 if (threads > 0)
235 sendUpdates(receiver, direction, updateElements);
236 else
237 receiver.getContainer().sendUpdatesToLocalAddressSingleThreaded(receiver, direction, updateElements);
238 // structuralChangeLock.unlock();
239 }
240
241 /**
242 * Establishes connection between a supplier and a receiver node, regardless which container they are in. Not to be
243 * called remotely, because this method enforces the structural lock.
244 *
245 * @param supplier
246 * @param receiver
247 * @param synchronise
248 * indicates whether the receiver should be synchronised to the current contents of the supplier
249 */
250 public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
251 boolean synchronise) {
252 try {
253 if (threads > 0)
254 structuralChangeLock.lock();
255 receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise);
256 } finally {
257 if (threads > 0)
258 structuralChangeLock.unlock();
259 }
260 }
261
262 /**
263 * Severs connection between a supplier and a receiver node, regardless which container they are in. Not to be
264 * called remotely, because this method enforces the structural lock.
265 *
266 * @param supplier
267 * @param receiver
268 * @param desynchronise
269 * indicates whether the current contents of the supplier should be subtracted from the receiver
270 */
271 public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
272 boolean desynchronise) {
273 try {
274 if (threads > 0)
275 structuralChangeLock.lock();
276 receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise);
277 } finally {
278 if (threads > 0)
279 structuralChangeLock.unlock();
280 }
281 }
282
283 /**
284 * Containers use this method to report whenever they run out of messages in their queue.
285 *
286 * To be called from the thread of the reporting container.
287 *
288 * @pre threads > 0.
289 * @param reportingContainer
290 * the container reporting the emptiness of its message queue.
291 * @param clock
292 * the value of the container's clock when reporting.
293 * @param localTerminationCriteria
294 * the latest clock values this container has received from other containers since the last time it
295 * reported termination.
296 */
297 void reportLocalUpdateTermination(ReteContainer reportingContainer, long clock,
298 Map<ReteContainer, Long> localTerminationCriteria) {
299 synchronized (globalTerminationCriteria) {
300 for (Entry<ReteContainer, Long> criterion : localTerminationCriteria.entrySet()) {
301 terminationCriterion(criterion.getKey(), criterion.getValue());
302 }
303
304 reportedClocks.put(reportingContainer, clock);
305 Long criterion = globalTerminationCriteria.get(reportingContainer);
306 if (criterion != null && criterion < clock)
307 globalTerminationCriteria.remove(reportingContainer);
308
309 if (globalTerminationCriteria.isEmpty())
310 globalTerminationCriteria.notifyAll();
311 }
312 }
313
314 /**
315 * @pre threads > 0
316 */
317 private void terminationCriterion(ReteContainer affectedContainer, long newCriterion) {
318 synchronized (globalTerminationCriteria) {
319 Long oldCriterion = globalTerminationCriteria.get(affectedContainer);
320 Long oldClock = reportedClocks.get(affectedContainer);
321 long relevantClock = oldClock == null ? 0 : oldClock;
322 if ((relevantClock <= newCriterion) && (oldCriterion == null || oldCriterion < newCriterion)) {
323 globalTerminationCriteria.put(affectedContainer, newCriterion);
324 }
325 }
326 }
327
328 /**
329 * Waits until all rete update operations are settled in all containers. Returns immediately, if no updates are
330 * pending.
331 *
332 * To be called from any user thread.
333 */
334 public void waitForReteTermination() {
335 if (threads > 0) {
336 synchronized (globalTerminationCriteria) {
337 while (!globalTerminationCriteria.isEmpty()) {
338 try {
339 globalTerminationCriteria.wait();
340 } catch (InterruptedException e) {
341
342 }
343 }
344 }
345 } else
346 headContainer.deliverMessagesSingleThreaded();
347 }
348
349 /**
350 * Waits to execute action until all rete update operations are settled in all containers. Runs action and returns
351 * immediately, if no updates are pending. The given action is guaranteed to be run when the terminated state still
352 * persists.
353 *
354 * @param action
355 * the action to be run when reaching the steady-state.
356 *
357 * To be called from any user thread.
358 */
359 public void waitForReteTermination(Runnable action) {
360 if (threads > 0) {
361 synchronized (globalTerminationCriteria) {
362 while (!globalTerminationCriteria.isEmpty()) {
363 try {
364 globalTerminationCriteria.wait();
365 } catch (InterruptedException e) {
366
367 }
368 }
369 action.run();
370 }
371 } else {
372 headContainer.deliverMessagesSingleThreaded();
373 action.run();
374 }
375
376 }
377
378 /**
379 * @return an unmodifiable set of known recipe traces
380 */
381 public Set<RecipeTraceInfo> getRecipeTraces() {
382 return Collections.unmodifiableSet(recipeTraces);
383 }
384
385 /**
386 * @return an unmodifiable list of containers
387 */
388 public List<ReteContainer> getContainers() {
389 return Collections.unmodifiableList(containers);
390 }
391
392 public Lock getStructuralChangeLock() {
393 return structuralChangeLock;
394 }
395
396 public NodeFactory getNodeFactory() {
397 return nodeFactory;
398 }
399
400 public InputConnector getInputConnector() {
401 return inputConnector;
402 }
403
404 public ReteEngine getEngine() {
405 return engine;
406 }
407
408}