diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java | 729 |
1 files changed, 729 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java new file mode 100644 index 00000000..79e0526d --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java | |||
@@ -0,0 +1,729 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | |||
11 | package tools.refinery.viatra.runtime.rete.network; | ||
12 | |||
13 | import org.apache.log4j.Logger; | ||
14 | import tools.refinery.viatra.runtime.CancellationToken; | ||
15 | import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.Clearable; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
20 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
21 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
22 | import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; | ||
23 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; | ||
24 | import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; | ||
25 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
26 | import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker; | ||
27 | import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker; | ||
28 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand; | ||
29 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand; | ||
30 | import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand; | ||
31 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
32 | import tools.refinery.viatra.runtime.rete.single.SingleInputNode; | ||
33 | import tools.refinery.viatra.runtime.rete.single.TrimmerNode; | ||
34 | import tools.refinery.viatra.runtime.rete.util.Options; | ||
35 | |||
36 | import java.util.*; | ||
37 | import java.util.function.Function; | ||
38 | |||
39 | /** | ||
40 | * @author Gabor Bergmann | ||
41 | * | ||
42 | * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue | ||
43 | * @since 2.2 | ||
44 | */ | ||
45 | public final class ReteContainer { | ||
46 | |||
47 | protected Thread consumerThread = null; | ||
48 | protected boolean killed = false; | ||
49 | |||
50 | protected Network network; | ||
51 | |||
52 | protected LinkedList<Clearable> clearables; | ||
53 | protected Map<Long, Node> nodesById; | ||
54 | protected long nextId = 0; | ||
55 | |||
56 | protected ConnectionFactory connectionFactory; | ||
57 | protected NodeProvisioner nodeProvisioner; | ||
58 | |||
59 | protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
60 | protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>(); | ||
61 | protected Object externalMessageLock = new Object(); | ||
62 | protected Long clock = 1L; // even: steady state, odd: active queue; access | ||
63 | // ONLY with messageQueue locked! | ||
64 | protected Map<ReteContainer, Long> terminationCriteria = null; | ||
65 | protected final Logger logger; | ||
66 | protected final CommunicationTracker tracker; | ||
67 | |||
68 | protected final IQueryBackendContext backendContext; | ||
69 | |||
70 | protected Set<DelayedCommand> delayedCommandQueue; | ||
71 | protected Set<DelayedCommand> delayedCommandBuffer; | ||
72 | protected boolean executingDelayedCommands; | ||
73 | |||
74 | protected final TimelyConfiguration timelyConfiguration; | ||
75 | |||
76 | private final CancellationToken cancellationToken; | ||
77 | |||
78 | /** | ||
79 | * @param threaded | ||
80 | * false if operating in a single-threaded environment | ||
81 | */ | ||
82 | public ReteContainer(Network network, boolean threaded) { | ||
83 | super(); | ||
84 | this.network = network; | ||
85 | this.backendContext = network.getEngine().getBackendContext(); | ||
86 | this.timelyConfiguration = network.getEngine().getTimelyConfiguration(); | ||
87 | cancellationToken = backendContext.getRuntimeContext().getCancellationToken(); | ||
88 | |||
89 | this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>(); | ||
90 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
91 | this.executingDelayedCommands = false; | ||
92 | |||
93 | if (this.isTimelyEvaluation()) { | ||
94 | this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration()); | ||
95 | } else { | ||
96 | this.tracker = new TimelessCommunicationTracker(); | ||
97 | } | ||
98 | |||
99 | this.nodesById = CollectionsFactory.createMap(); | ||
100 | this.clearables = new LinkedList<Clearable>(); | ||
101 | this.logger = network.getEngine().getLogger(); | ||
102 | |||
103 | this.connectionFactory = new ConnectionFactory(this); | ||
104 | this.nodeProvisioner = new NodeProvisioner(this); | ||
105 | |||
106 | if (threaded) { | ||
107 | this.terminationCriteria = CollectionsFactory.createMap(); | ||
108 | this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) { | ||
109 | @Override | ||
110 | public void run() { | ||
111 | messageConsumptionCycle(); | ||
112 | } | ||
113 | }; | ||
114 | this.consumerThread.start(); | ||
115 | } | ||
116 | } | ||
117 | |||
118 | /** | ||
119 | * @since 2.4 | ||
120 | */ | ||
121 | public boolean isTimelyEvaluation() { | ||
122 | return this.timelyConfiguration != null; | ||
123 | } | ||
124 | |||
125 | /** | ||
126 | * @since 2.4 | ||
127 | */ | ||
128 | public TimelyConfiguration getTimelyConfiguration() { | ||
129 | return this.timelyConfiguration; | ||
130 | } | ||
131 | |||
132 | /** | ||
133 | * @since 1.6 | ||
134 | * @return the communication graph of the nodes, incl. message scheduling | ||
135 | */ | ||
136 | public CommunicationTracker getCommunicationTracker() { | ||
137 | return tracker; | ||
138 | } | ||
139 | |||
140 | /** | ||
141 | * Stops this container. To be called by Network.kill() | ||
142 | */ | ||
143 | public void kill() { | ||
144 | killed = true; | ||
145 | if (consumerThread != null) | ||
146 | consumerThread.interrupt(); | ||
147 | } | ||
148 | |||
149 | /** | ||
150 | * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption | ||
151 | * is that this container is the home of the receiver, but it is not strictly necessary. | ||
152 | * | ||
153 | * @param synchronise | ||
154 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
155 | */ | ||
156 | public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
157 | boolean synchronise) { | ||
158 | if (!isLocal(receiver)) | ||
159 | receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise); | ||
160 | else { | ||
161 | Receiver child = resolveLocal(receiver); | ||
162 | connectRemoteSupplier(supplier, child, synchronise); | ||
163 | } | ||
164 | } | ||
165 | |||
166 | /** | ||
167 | * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is | ||
168 | * that this container is the home of the receiver, but it is not strictly necessary. | ||
169 | * | ||
170 | * @param desynchronise | ||
171 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
172 | */ | ||
173 | public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver, | ||
174 | boolean desynchronise) { | ||
175 | if (!isLocal(receiver)) | ||
176 | receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise); | ||
177 | else { | ||
178 | Receiver child = resolveLocal(receiver); | ||
179 | disconnectRemoteSupplier(supplier, child, desynchronise); | ||
180 | } | ||
181 | } | ||
182 | |||
183 | /** | ||
184 | * Establishes connection between a remote supplier and a local receiver node. | ||
185 | * | ||
186 | * @param synchronise | ||
187 | * indicates whether the receiver should be synchronised to the current contents of the supplier | ||
188 | */ | ||
189 | public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) { | ||
190 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
191 | if (synchronise) | ||
192 | connectAndSynchronize(parent, receiver); | ||
193 | else | ||
194 | connect(parent, receiver); | ||
195 | } | ||
196 | |||
197 | /** | ||
198 | * Severs connection between a remote supplier and a local receiver node. | ||
199 | * | ||
200 | * @param desynchronise | ||
201 | * indicates whether the current contents of the supplier should be subtracted from the receiver | ||
202 | */ | ||
203 | public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, | ||
204 | boolean desynchronise) { | ||
205 | Supplier parent = nodeProvisioner.asSupplier(supplier); | ||
206 | if (desynchronise) | ||
207 | disconnectAndDesynchronize(parent, receiver); | ||
208 | else | ||
209 | disconnect(parent, receiver); | ||
210 | } | ||
211 | |||
212 | /** | ||
213 | * Connects a receiver to a supplier | ||
214 | */ | ||
215 | public void connect(Supplier supplier, Receiver receiver) { | ||
216 | supplier.appendChild(receiver); | ||
217 | receiver.appendParent(supplier); | ||
218 | tracker.registerDependency(supplier, receiver); | ||
219 | } | ||
220 | |||
221 | /** | ||
222 | * Disconnects a receiver from a supplier | ||
223 | */ | ||
224 | public void disconnect(Supplier supplier, Receiver receiver) { | ||
225 | supplier.removeChild(receiver); | ||
226 | receiver.removeParent(supplier); | ||
227 | tracker.unregisterDependency(supplier, receiver); | ||
228 | } | ||
229 | |||
230 | /** | ||
231 | * @since 2.3 | ||
232 | */ | ||
233 | public boolean isExecutingDelayedCommands() { | ||
234 | return this.executingDelayedCommands; | ||
235 | } | ||
236 | |||
237 | /** | ||
238 | * @since 2.3 | ||
239 | */ | ||
240 | public Set<DelayedCommand> getDelayedCommandQueue() { | ||
241 | if (this.executingDelayedCommands) { | ||
242 | return this.delayedCommandBuffer; | ||
243 | } else { | ||
244 | return this.delayedCommandQueue; | ||
245 | } | ||
246 | } | ||
247 | |||
248 | /** | ||
249 | * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier | ||
250 | */ | ||
251 | public void connectAndSynchronize(Supplier supplier, Receiver receiver) { | ||
252 | supplier.appendChild(receiver); | ||
253 | receiver.appendParent(supplier); | ||
254 | tracker.registerDependency(supplier, receiver); | ||
255 | getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this)); | ||
256 | } | ||
257 | |||
258 | /** | ||
259 | * Disconnects a receiver from a supplier | ||
260 | */ | ||
261 | public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) { | ||
262 | final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver); | ||
263 | supplier.removeChild(receiver); | ||
264 | receiver.removeParent(supplier); | ||
265 | tracker.unregisterDependency(supplier, receiver); | ||
266 | getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC)); | ||
267 | } | ||
268 | |||
269 | /** | ||
270 | * @since 2.3 | ||
271 | */ | ||
272 | public void executeDelayedCommands() { | ||
273 | if (!this.delayedCommandQueue.isEmpty()) { | ||
274 | flushUpdates(); | ||
275 | this.executingDelayedCommands = true; | ||
276 | for (final DelayedCommand command : this.delayedCommandQueue) { | ||
277 | command.run(); | ||
278 | } | ||
279 | this.delayedCommandQueue = this.delayedCommandBuffer; | ||
280 | this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>(); | ||
281 | flushUpdates(); | ||
282 | this.executingDelayedCommands = false; | ||
283 | } | ||
284 | } | ||
285 | |||
286 | /** | ||
287 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
288 | * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre: | ||
289 | * address.container == this, e.g. address MUST be local | ||
290 | * | ||
291 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
292 | */ | ||
293 | long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) { | ||
294 | long timestamp; | ||
295 | Receiver receiver = resolveLocal(address); | ||
296 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
297 | synchronized (externalMessageLock) { | ||
298 | externalMessageQueue.add(message); | ||
299 | timestamp = clock; | ||
300 | externalMessageLock.notifyAll(); | ||
301 | } | ||
302 | |||
303 | return timestamp; | ||
304 | |||
305 | } | ||
306 | |||
307 | /** | ||
308 | * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial | ||
309 | * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other | ||
310 | * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty! | ||
311 | * | ||
312 | * @return the value of the container's clock at the time when the message was accepted into the local message queue | ||
313 | */ | ||
314 | long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction, | ||
315 | Collection<Tuple> updateElements) { | ||
316 | |||
317 | long timestamp; | ||
318 | Receiver receiver = resolveLocal(address); | ||
319 | // UpdateMessage message = new UpdateMessage(receiver, direction, | ||
320 | // updateElement); | ||
321 | synchronized (externalMessageLock) { | ||
322 | for (Tuple ps : updateElements) | ||
323 | externalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
324 | // messageQueue.add(new UpdateMessage(resolveLocal(address), | ||
325 | // direction, updateElement)); | ||
326 | // this.sendUpdateInternal(resolveLocal(address), direction, | ||
327 | // updateElement); | ||
328 | timestamp = clock; | ||
329 | externalMessageLock.notifyAll(); | ||
330 | } | ||
331 | |||
332 | return timestamp; | ||
333 | } | ||
334 | |||
335 | /** | ||
336 | * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is | ||
337 | * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any | ||
338 | * other way. | ||
339 | */ | ||
340 | void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
341 | Tuple updateElement) { | ||
342 | Receiver receiver = resolveLocal(address); | ||
343 | UpdateMessage message = new UpdateMessage(receiver, direction, updateElement); | ||
344 | internalMessageQueue.add(message); | ||
345 | } | ||
346 | |||
347 | /** | ||
348 | * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The | ||
349 | * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT | ||
350 | * use in any other way. | ||
351 | * | ||
352 | * @pre: address.container == this, e.g. address MUST be local | ||
353 | */ | ||
354 | void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction, | ||
355 | Collection<Tuple> updateElements) { | ||
356 | Receiver receiver = resolveLocal(address); | ||
357 | for (Tuple ps : updateElements) | ||
358 | internalMessageQueue.add(new UpdateMessage(receiver, direction, ps)); | ||
359 | } | ||
360 | |||
361 | /** | ||
362 | * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to | ||
363 | * be called by RemoteReceivers, DO NOT use in any other way. | ||
364 | * | ||
365 | * @since 2.4 | ||
366 | */ | ||
367 | public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction, | ||
368 | Tuple updateElement) { | ||
369 | ReteContainer otherContainer = address.getContainer(); | ||
370 | long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement); | ||
371 | // Long criterion = terminationCriteria.get(otherContainer); | ||
372 | // if (criterion==null || otherClock > criterion) | ||
373 | terminationCriteria.put(otherContainer, otherClock); | ||
374 | } | ||
375 | |||
376 | /** | ||
377 | * Finalises all update sequences and returns. To be called from user threads (e.g. network construction). | ||
378 | */ | ||
379 | public void flushUpdates() { | ||
380 | network.waitForReteTermination(); | ||
381 | // synchronized (messageQueue) | ||
382 | // { | ||
383 | // while (!messageQueue.isEmpty()) | ||
384 | // { | ||
385 | // try { | ||
386 | // UpdateMessage message = messageQueue.take(); | ||
387 | // message.receiver.update(message.direction, message.updateElement); | ||
388 | // } catch (InterruptedException e) {} | ||
389 | // } | ||
390 | // } | ||
391 | } | ||
392 | |||
393 | /** | ||
394 | * Retrieves a safe copy of the contents of a supplier. | ||
395 | * | ||
396 | * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set. | ||
397 | * | ||
398 | * @param flush if true, a flush is performed before pulling the contents | ||
399 | * @since 2.3 | ||
400 | */ | ||
401 | public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) { | ||
402 | if (flush) { | ||
403 | flushUpdates(); | ||
404 | } | ||
405 | final Collection<Tuple> collector = new ArrayList<Tuple>(); | ||
406 | supplier.pullInto(collector, flush); | ||
407 | return collector; | ||
408 | } | ||
409 | |||
410 | /** | ||
411 | * @since 2.4 | ||
412 | */ | ||
413 | public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) { | ||
414 | if (flush) { | ||
415 | flushUpdates(); | ||
416 | } | ||
417 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
418 | supplier.pullIntoWithTimeline(collector, flush); | ||
419 | return collector; | ||
420 | } | ||
421 | |||
422 | /** | ||
423 | * Retrieves the contents of a SingleInputNode's parentage. | ||
424 | * | ||
425 | * @since 2.3 | ||
426 | */ | ||
427 | public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) { | ||
428 | if (flush) { | ||
429 | flushUpdates(); | ||
430 | } | ||
431 | final Collection<Tuple> collector = new LinkedList<Tuple>(); | ||
432 | supplier.propagatePullInto(collector, flush); | ||
433 | return collector; | ||
434 | } | ||
435 | |||
436 | /** | ||
437 | * Retrieves the timestamp-aware contents of a SingleInputNode's parentage. | ||
438 | * | ||
439 | * @since 2.3 | ||
440 | */ | ||
441 | public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier, | ||
442 | final boolean flush) { | ||
443 | if (flush) { | ||
444 | flushUpdates(); | ||
445 | } | ||
446 | final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap(); | ||
447 | supplier.propagatePullIntoWithTimestamp(collector, flush); | ||
448 | return collector; | ||
449 | } | ||
450 | |||
451 | /** | ||
452 | * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the | ||
453 | * supplier, but it is not strictly necessary. | ||
454 | * | ||
455 | * @param supplier | ||
456 | * the address of the supplier to be pulled. | ||
457 | * @since 2.3 | ||
458 | */ | ||
459 | public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) { | ||
460 | if (!isLocal(supplier)) | ||
461 | return supplier.getContainer().remotePull(supplier, flush); | ||
462 | return pullContents(resolveLocal(supplier), flush); | ||
463 | } | ||
464 | |||
465 | /** | ||
466 | * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to | ||
467 | * a remote or local caller. | ||
468 | */ | ||
469 | public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) { | ||
470 | if (!isLocal(production)) | ||
471 | return production.getContainer().remotePosMapping(production); | ||
472 | return resolveLocal(production).getPosMapping(); | ||
473 | } | ||
474 | |||
475 | /** | ||
476 | * Continually consumes update messages. Should be run on a dedicated thread. | ||
477 | */ | ||
478 | void messageConsumptionCycle() { | ||
479 | while (!killed) // deliver messages on and on and on.... | ||
480 | { | ||
481 | long incrementedClock = 0; | ||
482 | UpdateMessage message = null; | ||
483 | |||
484 | if (!internalMessageQueue.isEmpty()) // take internal messages first | ||
485 | message = internalMessageQueue.removeFirst(); | ||
486 | else | ||
487 | // no internal message, take an incoming message | ||
488 | synchronized (externalMessageLock) { // no sleeping allowed, | ||
489 | // because external | ||
490 | // queue is locked for | ||
491 | // precise clocking of | ||
492 | // termination point! | ||
493 | if (!externalMessageQueue.isEmpty()) { // if external queue | ||
494 | // is non-empty, | ||
495 | // retrieve the next | ||
496 | // message instantly | ||
497 | message = takeExternalMessage(); | ||
498 | } else { // if external queue is found empty (and this is | ||
499 | // the first time in a row) | ||
500 | incrementedClock = ++clock; // local termination point | ||
501 | // synchronized(clock){incrementedClock = ++clock;} | ||
502 | } | ||
503 | } | ||
504 | |||
505 | if (message == null) // both queues were empty | ||
506 | { | ||
507 | localUpdateTermination(incrementedClock); // report local | ||
508 | // termination point | ||
509 | while (message == null) // wait for a message while external | ||
510 | // queue is still empty | ||
511 | { | ||
512 | synchronized (externalMessageLock) { | ||
513 | while (externalMessageQueue.isEmpty()) { | ||
514 | try { | ||
515 | externalMessageLock.wait(); | ||
516 | } catch (InterruptedException e) { | ||
517 | if (killed) | ||
518 | return; | ||
519 | } | ||
520 | } | ||
521 | message = takeExternalMessage(); | ||
522 | } | ||
523 | |||
524 | } | ||
525 | } | ||
526 | |||
527 | // now we have a message to deliver | ||
528 | // NOTE: this method is not compatible with differential dataflow | ||
529 | message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO); | ||
530 | } | ||
531 | } | ||
532 | |||
533 | /** | ||
534 | * @since 1.6 | ||
535 | */ | ||
536 | public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0, | ||
537 | Math.min(30, input.toString().length())); | ||
538 | |||
539 | /** | ||
540 | * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker. | ||
541 | * | ||
542 | * @since 1.6 | ||
543 | */ | ||
544 | public void deliverMessagesSingleThreaded() { | ||
545 | if (!backendContext.areUpdatesDelayed()) { | ||
546 | if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) { | ||
547 | // known unreachable; enable for debugging only | ||
548 | |||
549 | CommunicationGroup lastGroup = null; | ||
550 | Set<CommunicationGroup> seenInThisCycle = new HashSet<>(); | ||
551 | |||
552 | while (!tracker.isEmpty()) { | ||
553 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
554 | |||
555 | /** | ||
556 | * The current group does not violate the communication schema iff (1) it was not seen before OR (2) | ||
557 | * the last one that was seen is exactly the same as the current one this can happen if the group | ||
558 | * was added back because of in-group message passing | ||
559 | */ | ||
560 | boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group); | ||
561 | |||
562 | if (!okGroup) { | ||
563 | logger.error( | ||
564 | "[INTERNAL ERROR] Violation of communication schema! The communication component with representative " | ||
565 | + group.getRepresentative() + " has already been processed!"); | ||
566 | } | ||
567 | |||
568 | group.deliverMessages(); | ||
569 | |||
570 | lastGroup = group; | ||
571 | } | ||
572 | |||
573 | } else { | ||
574 | while (!tracker.isEmpty()) { | ||
575 | final CommunicationGroup group = tracker.getAndRemoveFirstGroup(); | ||
576 | group.deliverMessages(); | ||
577 | } | ||
578 | } | ||
579 | } | ||
580 | } | ||
581 | |||
582 | private void localUpdateTermination(long incrementedClock) { | ||
583 | network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria); | ||
584 | terminationCriteria.clear(); | ||
585 | |||
586 | // synchronized(clock){++clock;} // +1 incrementing for parity and easy | ||
587 | // comparison | ||
588 | } | ||
589 | |||
590 | // @pre: externalMessageQueue synchronized && nonempty | ||
591 | private UpdateMessage takeExternalMessage() { | ||
592 | UpdateMessage message = externalMessageQueue.removeFirst(); | ||
593 | if (!externalMessageQueue.isEmpty()) { // copy the whole queue over | ||
594 | // for speedup | ||
595 | Deque<UpdateMessage> temp = externalMessageQueue; | ||
596 | externalMessageQueue = internalMessageQueue; | ||
597 | internalMessageQueue = temp; | ||
598 | } | ||
599 | return message; | ||
600 | } | ||
601 | |||
602 | /** | ||
603 | * Provides an external address for the selected node. | ||
604 | * | ||
605 | * @pre node belongs to this container. | ||
606 | */ | ||
607 | public <N extends Node> Address<N> makeAddress(N node) { | ||
608 | return new Address<N>(node); | ||
609 | } | ||
610 | |||
611 | /** | ||
612 | * Checks whether a certain address points to a node at this container. | ||
613 | */ | ||
614 | public boolean isLocal(Address<? extends Node> address) { | ||
615 | return address.getContainer() == this; | ||
616 | } | ||
617 | |||
618 | /** | ||
619 | * Returns an addressed node at this container. | ||
620 | * | ||
621 | * @pre: address.container == this, e.g. address MUST be local | ||
622 | * @throws IllegalArgumentException | ||
623 | * if address is non-local | ||
624 | */ | ||
625 | @SuppressWarnings("unchecked") | ||
626 | public <N extends Node> N resolveLocal(Address<N> address) { | ||
627 | if (this != address.getContainer()) | ||
628 | throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this)); | ||
629 | |||
630 | N cached = address.getNodeCache(); | ||
631 | if (cached != null) | ||
632 | return cached; | ||
633 | else { | ||
634 | N node = (N) nodesById.get(address.getNodeId()); | ||
635 | address.setNodeCache(node); | ||
636 | return node; | ||
637 | } | ||
638 | } | ||
639 | |||
640 | /** | ||
641 | * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its | ||
642 | * constructor. | ||
643 | * | ||
644 | * @return the unique nodeId issued to the node. | ||
645 | */ | ||
646 | public long registerNode(Node n) { | ||
647 | long id = nextId++; | ||
648 | nodesById.put(id, n); | ||
649 | return id; | ||
650 | } | ||
651 | |||
652 | /** | ||
653 | * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or | ||
654 | * otherwise referenced. | ||
655 | */ | ||
656 | public void unregisterNode(Node n) { | ||
657 | nodesById.remove(n.getNodeId()); | ||
658 | } | ||
659 | |||
660 | /** | ||
661 | * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node. | ||
662 | */ | ||
663 | public void registerClearable(Clearable c) { | ||
664 | clearables.addFirst(c); | ||
665 | } | ||
666 | |||
667 | /** | ||
668 | * Unregisters a pattern memory from the rete network. | ||
669 | */ | ||
670 | public void unregisterClearable(Clearable c) { | ||
671 | clearables.remove(c); | ||
672 | } | ||
673 | |||
674 | /** | ||
675 | * Clears all memory contents in the network. Reverts to initial state. | ||
676 | */ | ||
677 | public void clearAll() { | ||
678 | for (Clearable c : clearables) { | ||
679 | c.clear(); | ||
680 | } | ||
681 | } | ||
682 | |||
683 | public NodeFactory getNodeFactory() { | ||
684 | return network.getNodeFactory(); | ||
685 | } | ||
686 | |||
687 | public ConnectionFactory getConnectionFactory() { | ||
688 | return connectionFactory; | ||
689 | } | ||
690 | |||
691 | public NodeProvisioner getProvisioner() { | ||
692 | return nodeProvisioner; | ||
693 | } | ||
694 | |||
695 | public Network getNetwork() { | ||
696 | return network; | ||
697 | } | ||
698 | |||
699 | @Override | ||
700 | public String toString() { | ||
701 | StringBuilder sb = new StringBuilder(); | ||
702 | String separator = System.getProperty("line.separator"); | ||
703 | sb.append(super.toString() + "[[[" + separator); | ||
704 | java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet()); | ||
705 | java.util.Collections.sort(keys); | ||
706 | for (Long key : keys) { | ||
707 | sb.append(key + " -> " + nodesById.get(key) + separator); | ||
708 | } | ||
709 | sb.append("]]] of " + network); | ||
710 | return sb.toString(); | ||
711 | } | ||
712 | |||
713 | /** | ||
714 | * Access all the Rete nodes inside this container. | ||
715 | * | ||
716 | * @return the collection of {@link Node} instances | ||
717 | */ | ||
718 | public Collection<Node> getAllNodes() { | ||
719 | return nodesById.values(); | ||
720 | } | ||
721 | |||
722 | public InputConnector getInputConnectionFactory() { | ||
723 | return network.getInputConnector(); | ||
724 | } | ||
725 | |||
726 | public void checkCancelled() { | ||
727 | cancellationToken.checkCancelled(); | ||
728 | } | ||
729 | } | ||