diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java new file mode 100644 index 00000000..64f59ff3 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/Network.java | |||
@@ -0,0 +1,408 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.ArrayList; | ||
13 | import java.util.Collection; | ||
14 | import java.util.Collections; | ||
15 | import java.util.List; | ||
16 | import java.util.Map; | ||
17 | import java.util.Map.Entry; | ||
18 | import java.util.Set; | ||
19 | import java.util.concurrent.locks.Lock; | ||
20 | import java.util.concurrent.locks.ReadWriteLock; | ||
21 | import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
22 | |||
23 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
24 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
25 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
26 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
27 | import tools.refinery.viatra.runtime.rete.matcher.ReteEngine; | ||
28 | import tools.refinery.viatra.runtime.rete.recipes.ReteNodeRecipe; | ||
29 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
30 | import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo; | ||
31 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
32 | |||
33 | /** | ||
34 | * @author Gabor Bergmann | ||
35 | * | ||
36 | */ | ||
37 | public class Network { | ||
38 | final int threads; | ||
39 | |||
40 | protected ArrayList<ReteContainer> containers; | ||
41 | ReteContainer headContainer; | ||
42 | private int firstContainer = 0; | ||
43 | private int nextContainer = 0; | ||
44 | |||
45 | // the following fields exist only if threads > 0 | ||
46 | protected Map<ReteContainer, Long> globalTerminationCriteria = null; | ||
47 | protected Map<ReteContainer, Long> reportedClocks = null; | ||
48 | protected Lock updateLock = null; // grab during normal update operations | ||
49 | protected Lock structuralChangeLock = null; // grab if the network structure | ||
50 | // is to | ||
51 | // be changed | ||
52 | |||
53 | // Knowledge of the outside world | ||
54 | private ReteEngine engine; | ||
55 | protected NodeFactory nodeFactory; | ||
56 | protected InputConnector inputConnector; | ||
57 | |||
58 | // Node and recipe administration | ||
59 | // incl. addresses for existing nodes by recipe (where available) | ||
60 | // Maintained by NodeProvisioner of each container | ||
61 | Map<ReteNodeRecipe, Address<? extends Node>> nodesByRecipe = CollectionsFactory.createMap(); | ||
62 | Set<RecipeTraceInfo> recipeTraces = CollectionsFactory.createSet(); | ||
63 | |||
64 | /** | ||
65 | * @throws IllegalStateException | ||
66 | * if no node has been constructed for the recipe | ||
67 | */ | ||
68 | public synchronized Address<? extends Node> getExistingNodeByRecipe(ReteNodeRecipe recipe) { | ||
69 | final Address<? extends Node> node = nodesByRecipe.get(recipe); | ||
70 | if (node == null) | ||
71 | throw new IllegalStateException(String.format("Rete node for recipe %s not constructed yet.", recipe)); | ||
72 | return node; | ||
73 | } | ||
74 | |||
75 | /** | ||
76 | * @return null if no node has been constructed for the recipe | ||
77 | */ | ||
78 | public synchronized Address<? extends Node> getNodeByRecipeIfExists(ReteNodeRecipe recipe) { | ||
79 | final Address<? extends Node> node = nodesByRecipe.get(recipe); | ||
80 | return node; | ||
81 | } | ||
82 | |||
83 | /** | ||
84 | * @param threads | ||
85 | * the number of threads to operate the network with; 0 means single-threaded operation, 1 starts an | ||
86 | * asynchronous thread to operate the RETE net, >1 uses multiple RETE containers. | ||
87 | */ | ||
88 | public Network(int threads, ReteEngine engine) { | ||
89 | super(); | ||
90 | this.threads = threads; | ||
91 | this.engine = engine; | ||
92 | this.inputConnector = new InputConnector(this); | ||
93 | this.nodeFactory = new NodeFactory(engine.getLogger()); | ||
94 | |||
95 | containers = new ArrayList<ReteContainer>(); | ||
96 | firstContainer = (threads > 1) ? Options.firstFreeContainer : 0; // NOPMD | ||
97 | nextContainer = firstContainer; | ||
98 | |||
99 | if (threads > 0) { | ||
100 | globalTerminationCriteria = CollectionsFactory.createMap(); | ||
101 | reportedClocks = CollectionsFactory.createMap(); | ||
102 | ReadWriteLock rwl = new ReentrantReadWriteLock(); | ||
103 | updateLock = rwl.readLock(); | ||
104 | structuralChangeLock = rwl.writeLock(); | ||
105 | for (int i = 0; i < threads; ++i) | ||
106 | containers.add(new ReteContainer(this, true)); | ||
107 | } else | ||
108 | containers.add(new ReteContainer(this, false)); | ||
109 | |||
110 | headContainer = containers.get(0); | ||
111 | } | ||
112 | |||
113 | /** | ||
114 | * Kills this Network along with all containers and message consumption cycles. | ||
115 | */ | ||
116 | public void kill() { | ||
117 | for (ReteContainer container : containers) { | ||
118 | container.kill(); | ||
119 | } | ||
120 | containers.clear(); | ||
121 | } | ||
122 | |||
123 | /** | ||
124 | * Returns the head container, that is guaranteed to reside in the same JVM as the Network object. | ||
125 | */ | ||
126 | public ReteContainer getHeadContainer() { | ||
127 | return headContainer; | ||
128 | } | ||
129 | |||
130 | /** | ||
131 | * Returns the next container in round-robin fashion. Configurable not to yield head container. | ||
132 | */ | ||
133 | public ReteContainer getNextContainer() { | ||
134 | if (nextContainer >= containers.size()) | ||
135 | nextContainer = firstContainer; | ||
136 | return containers.get(nextContainer++); | ||
137 | } | ||
138 | |||
139 | /** | ||
140 | * Internal message delivery method. | ||
141 | * | ||
142 | * @pre threads > 0 | ||
143 | */ | ||
144 | private void sendUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
145 | ReteContainer affectedContainer = receiver.getContainer(); | ||
146 | synchronized (globalTerminationCriteria) { | ||
147 | long newCriterion = affectedContainer.sendUpdateToLocalAddress(receiver, direction, updateElement); | ||
148 | terminationCriterion(affectedContainer, newCriterion); | ||
149 | } | ||
150 | } | ||
151 | |||
152 | /** | ||
153 | * Internal message delivery method for single-threaded operation | ||
154 | * | ||
155 | * @pre threads == 0 | ||
156 | */ | ||
157 | private void sendUpdateSingleThreaded(Address<? extends Receiver> receiver, Direction direction, | ||
158 | Tuple updateElement) { | ||
159 | ReteContainer affectedContainer = receiver.getContainer(); | ||
160 | affectedContainer.sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement); | ||
161 | } | ||
162 | |||
163 | /** | ||
164 | * Internal message delivery method. | ||
165 | * | ||
166 | * @pre threads > 0 | ||
167 | */ | ||
168 | private void sendUpdates(Address<? extends Receiver> receiver, Direction direction, | ||
169 | Collection<Tuple> updateElements) { | ||
170 | if (updateElements.isEmpty()) | ||
171 | return; | ||
172 | ReteContainer affectedContainer = receiver.getContainer(); | ||
173 | synchronized (globalTerminationCriteria) { | ||
174 | long newCriterion = affectedContainer.sendUpdatesToLocalAddress(receiver, direction, updateElements); | ||
175 | terminationCriterion(affectedContainer, newCriterion); | ||
176 | } | ||
177 | } | ||
178 | |||
179 | /** | ||
180 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may | ||
181 | * reside in any of the containers associated with this network. To be called from a user thread during normal | ||
182 | * operation, NOT during construction. | ||
183 | * | ||
184 | * @since 2.4 | ||
185 | */ | ||
186 | public void sendExternalUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
187 | if (threads > 0) { | ||
188 | try { | ||
189 | updateLock.lock(); | ||
190 | sendUpdate(receiver, direction, updateElement); | ||
191 | } finally { | ||
192 | updateLock.unlock(); | ||
193 | } | ||
194 | } else { | ||
195 | sendUpdateSingleThreaded(receiver, direction, updateElement); | ||
196 | // getHeadContainer(). | ||
197 | } | ||
198 | } | ||
199 | |||
200 | /** | ||
201 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The node may | ||
202 | * reside in any of the containers associated with this network. To be called from a user thread during | ||
203 | * construction. | ||
204 | * | ||
205 | * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may | ||
206 | * span through network calls, that's why it's not enforced here ) | ||
207 | * | ||
208 | * @return the value of the target container's clock at the time when the message was accepted into its message | ||
209 | * queue | ||
210 | * @since 2.4 | ||
211 | */ | ||
212 | public void sendConstructionUpdate(Address<? extends Receiver> receiver, Direction direction, Tuple updateElement) { | ||
213 | // structuralChangeLock.lock(); | ||
214 | if (threads > 0) | ||
215 | sendUpdate(receiver, direction, updateElement); | ||
216 | else | ||
217 | receiver.getContainer().sendUpdateToLocalAddressSingleThreaded(receiver, direction, updateElement); | ||
218 | // structuralChangeLock.unlock(); | ||
219 | } | ||
220 | |||
221 | /** | ||
222 | * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial | ||
223 | * matching. The node may reside in any of the containers associated with this network. To be called from a user | ||
224 | * thread during construction. | ||
225 | * | ||
226 | * @pre: structuralChangeLock MUST be grabbed by the sequence (but not necessarily this thread, as the sequence may | ||
227 | * span through network calls, that's why it's not enforced here ) | ||
228 | * | ||
229 | * @since 2.4 | ||
230 | */ | ||
231 | public void sendConstructionUpdates(Address<? extends Receiver> receiver, Direction direction, | ||
232 | Collection<Tuple> updateElements) { | ||
233 | // structuralChangeLock.lock(); | ||
234 | if (threads > 0) | ||
235 | sendUpdates(receiver, direction, updateElements); | ||
236 | else | ||
237 | receiver.getContainer().sendUpdatesToLocalAddressSingleThreaded(receiver, direction, updateElements); | ||
238 | // structuralChangeLock.unlock(); | ||
239 | } | ||
240 | |||
241 | /** | ||
242 | * Establishes connection between a supplier and a receiver node, regardless which container they are in. Not to be | ||
243 | * called remotely, because this method enforces the structural lock. | ||
244 | * | ||
245 | * @param supplier | ||
246 | * @param receiver | ||
247 | * @param synchronise | ||
248 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
249 | */ | ||
250 | public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
251 | boolean synchronise) { | ||
252 | try { | ||
253 | if (threads > 0) | ||
254 | structuralChangeLock.lock(); | ||
255 | receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise); | ||
256 | } finally { | ||
257 | if (threads > 0) | ||
258 | structuralChangeLock.unlock(); | ||
259 | } | ||
260 | } | ||
261 | |||
262 | /** | ||
263 | * Severs connection between a supplier and a receiver node, regardless which container they are in. Not to be | ||
264 | * called remotely, because this method enforces the structural lock. | ||
265 | * | ||
266 | * @param supplier | ||
267 | * @param receiver | ||
268 | * @param desynchronise | ||
269 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
270 | */ | ||
271 | public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
272 | boolean desynchronise) { | ||
273 | try { | ||
274 | if (threads > 0) | ||
275 | structuralChangeLock.lock(); | ||
276 | receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise); | ||
277 | } finally { | ||
278 | if (threads > 0) | ||
279 | structuralChangeLock.unlock(); | ||
280 | } | ||
281 | } | ||
282 | |||
283 | /** | ||
284 | * Containers use this method to report whenever they run out of messages in their queue. | ||
285 | * | ||
286 | * To be called from the thread of the reporting container. | ||
287 | * | ||
288 | * @pre threads > 0. | ||
289 | * @param reportingContainer | ||
290 | * the container reporting the emptiness of its message queue. | ||
291 | * @param clock | ||
292 | * the value of the container's clock when reporting. | ||
293 | * @param localTerminationCriteria | ||
294 | * the latest clock values this container has received from other containers since the last time it | ||
295 | * reported termination. | ||
296 | */ | ||
297 | void reportLocalUpdateTermination(ReteContainer reportingContainer, long clock, | ||
298 | Map<ReteContainer, Long> localTerminationCriteria) { | ||
299 | synchronized (globalTerminationCriteria) { | ||
300 | for (Entry<ReteContainer, Long> criterion : localTerminationCriteria.entrySet()) { | ||
301 | terminationCriterion(criterion.getKey(), criterion.getValue()); | ||
302 | } | ||
303 | |||
304 | reportedClocks.put(reportingContainer, clock); | ||
305 | Long criterion = globalTerminationCriteria.get(reportingContainer); | ||
306 | if (criterion != null && criterion < clock) | ||
307 | globalTerminationCriteria.remove(reportingContainer); | ||
308 | |||
309 | if (globalTerminationCriteria.isEmpty()) | ||
310 | globalTerminationCriteria.notifyAll(); | ||
311 | } | ||
312 | } | ||
313 | |||
314 | /** | ||
315 | * @pre threads > 0 | ||
316 | */ | ||
317 | private void terminationCriterion(ReteContainer affectedContainer, long newCriterion) { | ||
318 | synchronized (globalTerminationCriteria) { | ||
319 | Long oldCriterion = globalTerminationCriteria.get(affectedContainer); | ||
320 | Long oldClock = reportedClocks.get(affectedContainer); | ||
321 | long relevantClock = oldClock == null ? 0 : oldClock; | ||
322 | if ((relevantClock <= newCriterion) && (oldCriterion == null || oldCriterion < newCriterion)) { | ||
323 | globalTerminationCriteria.put(affectedContainer, newCriterion); | ||
324 | } | ||
325 | } | ||
326 | } | ||
327 | |||
328 | /** | ||
329 | * Waits until all rete update operations are settled in all containers. Returns immediately, if no updates are | ||
330 | * pending. | ||
331 | * | ||
332 | * To be called from any user thread. | ||
333 | */ | ||
334 | public void waitForReteTermination() { | ||
335 | if (threads > 0) { | ||
336 | synchronized (globalTerminationCriteria) { | ||
337 | while (!globalTerminationCriteria.isEmpty()) { | ||
338 | try { | ||
339 | globalTerminationCriteria.wait(); | ||
340 | } catch (InterruptedException e) { | ||
341 | |||
342 | } | ||
343 | } | ||
344 | } | ||
345 | } else | ||
346 | headContainer.deliverMessagesSingleThreaded(); | ||
347 | } | ||
348 | |||
349 | /** | ||
350 | * Waits to execute action until all rete update operations are settled in all containers. Runs action and returns | ||
351 | * immediately, if no updates are pending. The given action is guaranteed to be run when the terminated state still | ||
352 | * persists. | ||
353 | * | ||
354 | * @param action | ||
355 | * the action to be run when reaching the steady-state. | ||
356 | * | ||
357 | * To be called from any user thread. | ||
358 | */ | ||
359 | public void waitForReteTermination(Runnable action) { | ||
360 | if (threads > 0) { | ||
361 | synchronized (globalTerminationCriteria) { | ||
362 | while (!globalTerminationCriteria.isEmpty()) { | ||
363 | try { | ||
364 | globalTerminationCriteria.wait(); | ||
365 | } catch (InterruptedException e) { | ||
366 | |||
367 | } | ||
368 | } | ||
369 | action.run(); | ||
370 | } | ||
371 | } else { | ||
372 | headContainer.deliverMessagesSingleThreaded(); | ||
373 | action.run(); | ||
374 | } | ||
375 | |||
376 | } | ||
377 | |||
378 | /** | ||
379 | * @return an unmodifiable set of known recipe traces | ||
380 | */ | ||
381 | public Set<RecipeTraceInfo> getRecipeTraces() { | ||
382 | return Collections.unmodifiableSet(recipeTraces); | ||
383 | } | ||
384 | |||
385 | /** | ||
386 | * @return an unmodifiable list of containers | ||
387 | */ | ||
388 | public List<ReteContainer> getContainers() { | ||
389 | return Collections.unmodifiableList(containers); | ||
390 | } | ||
391 | |||
392 | public Lock getStructuralChangeLock() { | ||
393 | return structuralChangeLock; | ||
394 | } | ||
395 | |||
396 | public NodeFactory getNodeFactory() { | ||
397 | return nodeFactory; | ||
398 | } | ||
399 | |||
400 | public InputConnector getInputConnector() { | ||
401 | return inputConnector; | ||
402 | } | ||
403 | |||
404 | public ReteEngine getEngine() { | ||
405 | return engine; | ||
406 | } | ||
407 | |||
408 | } | ||