diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java | 729 |
1 files changed, 729 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java new file mode 100644 index 00000000..16e290fd --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java | |||
@@ -0,0 +1,729 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import java.util.ArrayDeque; | ||
13 | import java.util.ArrayList; | ||
14 | import java.util.Collection; | ||
15 | import java.util.Deque; | ||
16 | import java.util.HashSet; | ||
17 | import java.util.LinkedHashSet; | ||
18 | import java.util.LinkedList; | ||
19 | import java.util.Map; | ||
20 | import java.util.Set; | ||
21 | import java.util.function.Function; | ||
22 | |||
23 | import org.apache.log4j.Logger; | ||
24 | import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext; | ||
25 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
26 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
27 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
28 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
29 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
30 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
31 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
32 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
33 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
34 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
35 | import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker; | ||
36 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker; | ||
37 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand; | ||
38 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand; | ||
39 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand; | ||
40 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
41 | import tools.refinery.viatra.runtime.rete.single.SingleInputNode; | ||
42 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
43 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
44 | |||
45 | /** | ||
46 | * @author Gabor Bergmann | ||
47 | * | ||
48 | * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue | ||
49 | * @since 2.2 | ||
50 | */ | ||
51 | public final class ReteContainer { | ||
52 | |||
53 | protected Thread consumerThread = null; | ||
54 | protected boolean killed = false; | ||
55 | |||
56 | protected Network network; | ||
57 | |||
58 | protected LinkedList<Clearable> clearables; | ||
59 | protected Map<Long, Node> nodesById; | ||
60 | protected long nextId = 0; | ||
61 | |||
62 | protected ConnectionFactory connectionFactory; | ||
63 | protected NodeProvisioner nodeProvisioner; | ||
64 | |||
65 | protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
66 | protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
67 | protected Object externalMessageLock = new Object(); | ||
68 | protected Long clock = 1L; // even: steady state, odd: active queue; access | ||
69 | // ONLY with messageQueue locked! | ||
70 | protected Map<ReteContainer, Long> terminationCriteria = null; | ||
71 | protected final Logger logger; | ||
72 | protected final CommunicationTracker tracker; | ||
73 | |||
74 | protected final IQueryBackendContext backendContext; | ||
75 | |||
76 | protected Set<DelayedCommand> delayedCommandQueue; | ||
77 | protected Set<DelayedCommand> delayedCommandBuffer; | ||
78 | protected boolean executingDelayedCommands; | ||
79 | |||
80 | protected final TimelyConfiguration timelyConfiguration; | ||
81 | |||
82 | /** | ||
83 | * @param threaded | ||
84 | * false if operating in a single-threaded environment | ||
85 | */ | ||
86 | public ReteContainer(Network network, boolean threaded) { | ||
87 | super(); | ||
88 | this.network = network; | ||
89 | this.backendContext = network.getEngine().getBackendContext(); | ||
90 | this.timelyConfiguration = network.getEngine().getTimelyConfiguration(); | ||
91 | |||
92 | this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>(); | ||
93 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
94 | this.executingDelayedCommands = false; | ||
95 | |||
96 | if (this.isTimelyEvaluation()) { | ||
97 | this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration()); | ||
98 | } else { | ||
99 | this.tracker = new TimelessCommunicationTracker(); | ||
100 | } | ||
101 | |||
102 | this.nodesById = CollectionsFactory.createMap(); | ||
103 | this.clearables = new LinkedList<Clearable>(); | ||
104 | this.logger = network.getEngine().getLogger(); | ||
105 | |||
106 | this.connectionFactory = new ConnectionFactory(this); | ||
107 | this.nodeProvisioner = new NodeProvisioner(this); | ||
108 | |||
109 | if (threaded) { | ||
110 | this.terminationCriteria = CollectionsFactory.createMap(); | ||
111 | this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) { | ||
112 | @Override | ||
113 | public void run() { | ||
114 | messageConsumptionCycle(); | ||
115 | } | ||
116 | }; | ||
117 | this.consumerThread.start(); | ||
118 | } | ||
119 | } | ||
120 | |||
121 | /** | ||
122 | * @since 2.4 | ||
123 | */ | ||
124 | public boolean isTimelyEvaluation() { | ||
125 | return this.timelyConfiguration != null; | ||
126 | } | ||
127 | |||
128 | /** | ||
129 | * @since 2.4 | ||
130 | */ | ||
131 | public TimelyConfiguration getTimelyConfiguration() { | ||
132 | return this.timelyConfiguration; | ||
133 | } | ||
134 | |||
135 | /** | ||
136 | * @since 1.6 | ||
137 | * @return the communication graph of the nodes, incl. message scheduling | ||
138 | */ | ||
139 | public CommunicationTracker getCommunicationTracker() { | ||
140 | return tracker; | ||
141 | } | ||
142 | |||
143 | /** | ||
144 | * Stops this container. To be called by Network.kill() | ||
145 | */ | ||
146 | public void kill() { | ||
147 | killed = true; | ||
148 | if (consumerThread != null) | ||
149 | consumerThread.interrupt(); | ||
150 | } | ||
151 | |||
152 | /** | ||
153 | * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption | ||
154 | * is that this container is the home of the receiver, but it is not strictly necessary. | ||
155 | * | ||
156 | * @param synchronise | ||
157 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
158 | */ | ||
159 | public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
160 | boolean synchronise) { | ||
161 | if (!isLocal(receiver)) | ||
162 | receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise); | ||
163 | else { | ||
164 | Receiver child = resolveLocal(receiver); | ||
165 | connectRemoteSupplier(supplier, child, synchronise); | ||
166 | } | ||
167 | } | ||
168 | |||
169 | /** | ||
170 | * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is | ||
171 | * that this container is the home of the receiver, but it is not strictly necessary. | ||
172 | * | ||
173 | * @param desynchronise | ||
174 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
175 | */ | ||
176 | public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
177 | boolean desynchronise) { | ||
178 | if (!isLocal(receiver)) | ||
179 | receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise); | ||
180 | else { | ||
181 | Receiver child = resolveLocal(receiver); | ||
182 | disconnectRemoteSupplier(supplier, child, desynchronise); | ||
183 | } | ||
184 | } | ||
185 | |||
186 | /** | ||
187 | * Establishes connection between a remote supplier and a local receiver node. | ||
188 | * | ||
189 | * @param synchronise | ||
190 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
191 | */ | ||
192 | public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) { | ||
193 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
194 | if (synchronise) | ||
195 | connectAndSynchronize(parent, receiver); | ||
196 | else | ||
197 | connect(parent, receiver); | ||
198 | } | ||
199 | |||
200 | /** | ||
201 | * Severs connection between a remote supplier and a local receiver node. | ||
202 | * | ||
203 | * @param desynchronise | ||
204 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
205 | */ | ||
206 | public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, | ||
207 | boolean desynchronise) { | ||
208 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
209 | if (desynchronise) | ||
210 | disconnectAndDesynchronize(parent, receiver); | ||
211 | else | ||
212 | disconnect(parent, receiver); | ||
213 | } | ||
214 | |||
215 | /** | ||
216 | * Connects a receiver to a supplier | ||
217 | */ | ||
218 | public void connect(Supplier supplier, Receiver receiver) { | ||
219 | supplier.appendChild(receiver); | ||
220 | receiver.appendParent(supplier); | ||
221 | tracker.registerDependency(supplier, receiver); | ||
222 | } | ||
223 | |||
224 | /** | ||
225 | * Disconnects a receiver from a supplier | ||
226 | */ | ||
227 | public void disconnect(Supplier supplier, Receiver receiver) { | ||
228 | supplier.removeChild(receiver); | ||
229 | receiver.removeParent(supplier); | ||
230 | tracker.unregisterDependency(supplier, receiver); | ||
231 | } | ||
232 | |||
233 | /** | ||
234 | * @since 2.3 | ||
235 | */ | ||
236 | public boolean isExecutingDelayedCommands() { | ||
237 | return this.executingDelayedCommands; | ||
238 | } | ||
239 | |||
240 | /** | ||
241 | * @since 2.3 | ||
242 | */ | ||
243 | public Set<DelayedCommand> getDelayedCommandQueue() { | ||
244 | if (this.executingDelayedCommands) { | ||
245 | return this.delayedCommandBuffer; | ||
246 | } else { | ||
247 | return this.delayedCommandQueue; | ||
248 | } | ||
249 | } | ||
250 | |||
251 | /** | ||
252 | * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier | ||
253 | */ | ||
254 | public void connectAndSynchronize(Supplier supplier, Receiver receiver) { | ||
255 | supplier.appendChild(receiver); | ||
256 | receiver.appendParent(supplier); | ||
257 | tracker.registerDependency(supplier, receiver); | ||
258 | getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this)); | ||
259 | } | ||
260 | |||
261 | /** | ||
262 | * Disconnects a receiver from a supplier | ||
263 | */ | ||
264 | public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) { | ||
265 | final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver); | ||
266 | supplier.removeChild(receiver); | ||
267 | receiver.removeParent(supplier); | ||
268 | tracker.unregisterDependency(supplier, receiver); | ||
269 | getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC)); | ||
270 | } | ||
271 | |||
272 | /** | ||
273 | * @since 2.3 | ||
274 | */ | ||
275 | public void executeDelayedCommands() { | ||
276 | if (!this.delayedCommandQueue.isEmpty()) { | ||
277 | flushUpdates(); | ||
278 | this.executingDelayedCommands = true; | ||
279 | for (final DelayedCommand command : this.delayedCommandQueue) { | ||
280 | command.run(); | ||
281 | } | ||
282 | this.delayedCommandQueue = this.delayedCommandBuffer; | ||
283 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
284 | flushUpdates(); | ||
285 | this.executingDelayedCommands = false; | ||
286 | } | ||
287 | } | ||
288 | |||
289 | /** | ||
290 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
291 | * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre: | ||
292 | * address.container == this, e.g. address MUST be local | ||
293 | * | ||
294 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
295 | */ | ||
296 | long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) { | ||
297 | long timestamp; | ||
298 | Receiver receiver = resolveLocal(address); | ||
299 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
300 | synchronized (externalMessageLock) { | ||
301 | externalMessageQueue.add(message); | ||
302 | timestamp = clock; | ||
303 | externalMessageLock.notifyAll(); | ||
304 | } | ||
305 | |||
306 | return timestamp; | ||
307 | |||
308 | } | ||
309 | |||
310 | /** | ||
311 | * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial | ||
312 | * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other | ||
313 | * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty! | ||
314 | * | ||
315 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
316 | */ | ||
317 | long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction, | ||
318 | Collection<Tuple> updateElements) { | ||
319 | |||
320 | long timestamp; | ||
321 | Receiver receiver = resolveLocal(address); | ||
322 | // UpdateMessage message = new UpdateMessage(receiver, direction, | ||
323 | // updateElement); | ||
324 | synchronized (externalMessageLock) { | ||
325 | for (Tuple ps : updateElements) | ||
326 | externalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
327 | // messageQueue.add(new UpdateMessage(resolveLocal(address), | ||
328 | // direction, updateElement)); | ||
329 | // this.sendUpdateInternal(resolveLocal(address), direction, | ||
330 | // updateElement); | ||
331 | timestamp = clock; | ||
332 | externalMessageLock.notifyAll(); | ||
333 | } | ||
334 | |||
335 | return timestamp; | ||
336 | } | ||
337 | |||
338 | /** | ||
339 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
340 | * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any | ||
341 | * other way. | ||
342 | */ | ||
343 | void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
344 | Tuple updateElement) { | ||
345 | Receiver receiver = resolveLocal(address); | ||
346 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
347 | internalMessageQueue.add(message); | ||
348 | } | ||
349 | |||
350 | /** | ||
351 | * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The | ||
352 | * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT | ||
353 | * use in any other way. | ||
354 | * | ||
355 | * @pre: address.container == this, e.g. address MUST be local | ||
356 | */ | ||
357 | void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
358 | Collection<Tuple> updateElements) { | ||
359 | Receiver receiver = resolveLocal(address); | ||
360 | for (Tuple ps : updateElements) | ||
361 | internalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
362 | } | ||
363 | |||
364 | /** | ||
365 | * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to | ||
366 | * be called by RemoteReceivers, DO NOT use in any other way. | ||
367 | * | ||
368 | * @since 2.4 | ||
369 | */ | ||
370 | public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction, | ||
371 | Tuple updateElement) { | ||
372 | ReteContainer otherContainer = address.getContainer(); | ||
373 | long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement); | ||
374 | // Long criterion = terminationCriteria.get(otherContainer); | ||
375 | // if (criterion==null || otherClock > criterion) | ||
376 | terminationCriteria.put(otherContainer, otherClock); | ||
377 | } | ||
378 | |||
379 | /** | ||
380 | * Finalises all update sequences and returns. To be called from user threads (e.g. network construction). | ||
381 | */ | ||
382 | public void flushUpdates() { | ||
383 | network.waitForReteTermination(); | ||
384 | // synchronized (messageQueue) | ||
385 | // { | ||
386 | // while (!messageQueue.isEmpty()) | ||
387 | // { | ||
388 | // try { | ||
389 | // UpdateMessage message = messageQueue.take(); | ||
390 | // message.receiver.update(message.direction, message.updateElement); | ||
391 | // } catch (InterruptedException e) {} | ||
392 | // } | ||
393 | // } | ||
394 | } | ||
395 | |||
396 | /** | ||
397 | * Retrieves a safe copy of the contents of a supplier. | ||
398 | * | ||
399 | * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set. | ||
400 | * | ||
401 | * @param flush if true, a flush is performed before pulling the contents | ||
402 | * @since 2.3 | ||
403 | */ | ||
404 | public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) { | ||
405 | if (flush) { | ||
406 | flushUpdates(); | ||
407 | } | ||
408 | final Collection<Tuple> collector = new ArrayList<Tuple>(); | ||
409 | supplier.pullInto(collector, flush); | ||
410 | return collector; | ||
411 | } | ||
412 | |||
413 | /** | ||
414 | * @since 2.4 | ||
415 | */ | ||
416 | public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) { | ||
417 | if (flush) { | ||
418 | flushUpdates(); | ||
419 | } | ||
420 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
421 | supplier.pullIntoWithTimeline(collector, flush); | ||
422 | return collector; | ||
423 | } | ||
424 | |||
425 | /** | ||
426 | * Retrieves the contents of a SingleInputNode's parentage. | ||
427 | * | ||
428 | * @since 2.3 | ||
429 | */ | ||
430 | public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) { | ||
431 | if (flush) { | ||
432 | flushUpdates(); | ||
433 | } | ||
434 | final Collection<Tuple> collector = new LinkedList<Tuple>(); | ||
435 | supplier.propagatePullInto(collector, flush); | ||
436 | return collector; | ||
437 | } | ||
438 | |||
439 | /** | ||
440 | * Retrieves the timestamp-aware contents of a SingleInputNode's parentage. | ||
441 | * | ||
442 | * @since 2.3 | ||
443 | */ | ||
444 | public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier, | ||
445 | final boolean flush) { | ||
446 | if (flush) { | ||
447 | flushUpdates(); | ||
448 | } | ||
449 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
450 | supplier.propagatePullIntoWithTimestamp(collector, flush); | ||
451 | return collector; | ||
452 | } | ||
453 | |||
454 | /** | ||
455 | * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the | ||
456 | * supplier, but it is not strictly necessary. | ||
457 | * | ||
458 | * @param supplier | ||
459 | * the address of the supplier to be pulled. | ||
460 | * @since 2.3 | ||
461 | */ | ||
462 | public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) { | ||
463 | if (!isLocal(supplier)) | ||
464 | return supplier.getContainer().remotePull(supplier, flush); | ||
465 | return pullContents(resolveLocal(supplier), flush); | ||
466 | } | ||
467 | |||
468 | /** | ||
469 | * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to | ||
470 | * a remote or local caller. | ||
471 | */ | ||
472 | public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) { | ||
473 | if (!isLocal(production)) | ||
474 | return production.getContainer().remotePosMapping(production); | ||
475 | return resolveLocal(production).getPosMapping(); | ||
476 | } | ||
477 | |||
478 | /** | ||
479 | * Continually consumes update messages. Should be run on a dedicated thread. | ||
480 | */ | ||
481 | void messageConsumptionCycle() { | ||
482 | while (!killed) // deliver messages on and on and on.... | ||
483 | { | ||
484 | long incrementedClock = 0; | ||
485 | UpdateMessage message = null; | ||
486 | |||
487 | if (!internalMessageQueue.isEmpty()) // take internal messages first | ||
488 | message = internalMessageQueue.removeFirst(); | ||
489 | else | ||
490 | // no internal message, take an incoming message | ||
491 | synchronized (externalMessageLock) { // no sleeping allowed, | ||
492 | // because external | ||
493 | // queue is locked for | ||
494 | // precise clocking of | ||
495 | // termination point! | ||
496 | if (!externalMessageQueue.isEmpty()) { // if external queue | ||
497 | // is non-empty, | ||
498 | // retrieve the next | ||
499 | // message instantly | ||
500 | message = takeExternalMessage(); | ||
501 | } else { // if external queue is found empty (and this is | ||
502 | // the first time in a row) | ||
503 | incrementedClock = ++clock; // local termination point | ||
504 | // synchronized(clock){incrementedClock = ++clock;} | ||
505 | } | ||
506 | } | ||
507 | |||
508 | if (message == null) // both queues were empty | ||
509 | { | ||
510 | localUpdateTermination(incrementedClock); // report local | ||
511 | // termination point | ||
512 | while (message == null) // wait for a message while external | ||
513 | // queue is still empty | ||
514 | { | ||
515 | synchronized (externalMessageLock) { | ||
516 | while (externalMessageQueue.isEmpty()) { | ||
517 | try { | ||
518 | externalMessageLock.wait(); | ||
519 | } catch (InterruptedException e) { | ||
520 | if (killed) | ||
521 | return; | ||
522 | } | ||
523 | } | ||
524 | message = takeExternalMessage(); | ||
525 | } | ||
526 | |||
527 | } | ||
528 | } | ||
529 | |||
530 | // now we have a message to deliver | ||
531 | // NOTE: this method is not compatible with differential dataflow | ||
532 | message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO); | ||
533 | } | ||
534 | } | ||
535 | |||
536 | /** | ||
537 | * @since 1.6 | ||
538 | */ | ||
539 | public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0, | ||
540 | Math.min(30, input.toString().length())); | ||
541 | |||
542 | /** | ||
543 | * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker. | ||
544 | * | ||
545 | * @since 1.6 | ||
546 | */ | ||
547 | public void deliverMessagesSingleThreaded() { | ||
548 | if (!backendContext.areUpdatesDelayed()) { | ||
549 | if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) { | ||
550 | // known unreachable; enable for debugging only | ||
551 | |||
552 | CommunicationGroup lastGroup = null; | ||
553 | Set<CommunicationGroup> seenInThisCycle = new HashSet<>(); | ||
554 | |||
555 | while (!tracker.isEmpty()) { | ||
556 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
557 | |||
558 | /** | ||
559 | * The current group does not violate the communication schema iff (1) it was not seen before OR (2) | ||
560 | * the last one that was seen is exactly the same as the current one this can happen if the group | ||
561 | * was added back because of in-group message passing | ||
562 | */ | ||
563 | boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group); | ||
564 | |||
565 | if (!okGroup) { | ||
566 | logger.error( | ||
567 | "[INTERNAL ERROR] Violation of communication schema! The communication component with representative " | ||
568 | + group.getRepresentative() + " has already been processed!"); | ||
569 | } | ||
570 | |||
571 | group.deliverMessages(); | ||
572 | |||
573 | lastGroup = group; | ||
574 | } | ||
575 | |||
576 | } else { | ||
577 | while (!tracker.isEmpty()) { | ||
578 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
579 | group.deliverMessages(); | ||
580 | } | ||
581 | } | ||
582 | } | ||
583 | } | ||
584 | |||
585 | private void localUpdateTermination(long incrementedClock) { | ||
586 | network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria); | ||
587 | terminationCriteria.clear(); | ||
588 | |||
589 | // synchronized(clock){++clock;} // +1 incrementing for parity and easy | ||
590 | // comparison | ||
591 | } | ||
592 | |||
593 | // @pre: externalMessageQueue synchronized && nonempty | ||
594 | private UpdateMessage takeExternalMessage() { | ||
595 | UpdateMessage message = externalMessageQueue.removeFirst(); | ||
596 | if (!externalMessageQueue.isEmpty()) { // copy the whole queue over | ||
597 | // for speedup | ||
598 | Deque<UpdateMessage> temp = externalMessageQueue; | ||
599 | externalMessageQueue = internalMessageQueue; | ||
600 | internalMessageQueue = temp; | ||
601 | } | ||
602 | return message; | ||
603 | } | ||
604 | |||
605 | /** | ||
606 | * Provides an external address for the selected node. | ||
607 | * | ||
608 | * @pre node belongs to this container. | ||
609 | */ | ||
610 | public <N extends Node> Address<N> makeAddress(N node) { | ||
611 | return new Address<N>(node); | ||
612 | } | ||
613 | |||
614 | /** | ||
615 | * Checks whether a certain address points to a node at this container. | ||
616 | */ | ||
617 | public boolean isLocal(Address<? extends Node> address) { | ||
618 | return address.getContainer() == this; | ||
619 | } | ||
620 | |||
621 | /** | ||
622 | * Returns an addressed node at this container. | ||
623 | * | ||
624 | * @pre: address.container == this, e.g. address MUST be local | ||
625 | * @throws IllegalArgumentException | ||
626 | * if address is non-local | ||
627 | */ | ||
628 | @SuppressWarnings("unchecked") | ||
629 | public <N extends Node> N resolveLocal(Address<N> address) { | ||
630 | if (this != address.getContainer()) | ||
631 | throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this)); | ||
632 | |||
633 | N cached = address.getNodeCache(); | ||
634 | if (cached != null) | ||
635 | return cached; | ||
636 | else { | ||
637 | N node = (N) nodesById.get(address.getNodeId()); | ||
638 | address.setNodeCache(node); | ||
639 | return node; | ||
640 | } | ||
641 | } | ||
642 | |||
643 | /** | ||
644 | * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its | ||
645 | * constructor. | ||
646 | * | ||
647 | * @return the unique nodeId issued to the node. | ||
648 | */ | ||
649 | public long registerNode(Node n) { | ||
650 | long id = nextId++; | ||
651 | nodesById.put(id, n); | ||
652 | return id; | ||
653 | } | ||
654 | |||
655 | /** | ||
656 | * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or | ||
657 | * otherwise referenced. | ||
658 | */ | ||
659 | public void unregisterNode(Node n) { | ||
660 | nodesById.remove(n.getNodeId()); | ||
661 | } | ||
662 | |||
663 | /** | ||
664 | * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node. | ||
665 | */ | ||
666 | public void registerClearable(Clearable c) { | ||
667 | clearables.addFirst(c); | ||
668 | } | ||
669 | |||
670 | /** | ||
671 | * Unregisters a pattern memory from the rete network. | ||
672 | */ | ||
673 | public void unregisterClearable(Clearable c) { | ||
674 | clearables.remove(c); | ||
675 | } | ||
676 | |||
677 | /** | ||
678 | * Clears all memory contents in the network. Reverts to initial state. | ||
679 | */ | ||
680 | public void clearAll() { | ||
681 | for (Clearable c : clearables) { | ||
682 | c.clear(); | ||
683 | } | ||
684 | } | ||
685 | |||
686 | public NodeFactory getNodeFactory() { | ||
687 | return network.getNodeFactory(); | ||
688 | } | ||
689 | |||
690 | public ConnectionFactory getConnectionFactory() { | ||
691 | return connectionFactory; | ||
692 | } | ||
693 | |||
694 | public NodeProvisioner getProvisioner() { | ||
695 | return nodeProvisioner; | ||
696 | } | ||
697 | |||
698 | public Network getNetwork() { | ||
699 | return network; | ||
700 | } | ||
701 | |||
702 | @Override | ||
703 | public String toString() { | ||
704 | StringBuilder sb = new StringBuilder(); | ||
705 | String separator = System.getProperty("line.separator"); | ||
706 | sb.append(super.toString() + "[[[" + separator); | ||
707 | java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet()); | ||
708 | java.util.Collections.sort(keys); | ||
709 | for (Long key : keys) { | ||
710 | sb.append(key + " -> " + nodesById.get(key) + separator); | ||
711 | } | ||
712 | sb.append("]]] of " + network); | ||
713 | return sb.toString(); | ||
714 | } | ||
715 | |||
716 | /** | ||
717 | * Access all the Rete nodes inside this container. | ||
718 | * | ||
719 | * @return the collection of {@link Node} instances | ||
720 | */ | ||
721 | public Collection<Node> getAllNodes() { | ||
722 | return nodesById.values(); | ||
723 | } | ||
724 | |||
725 | public InputConnector getInputConnectionFactory() { | ||
726 | return network.getInputConnector(); | ||
727 | } | ||
728 | |||
729 | } | ||