aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java729
1 files changed, 729 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
new file mode 100644
index 00000000..79e0526d
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
@@ -0,0 +1,729 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10
11package tools.refinery.viatra.runtime.rete.network;
12
13import org.apache.log4j.Logger;
14import tools.refinery.viatra.runtime.CancellationToken;
15import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext;
16import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
17import tools.refinery.viatra.runtime.matchers.util.Clearable;
18import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
19import tools.refinery.viatra.runtime.matchers.util.Direction;
20import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
21import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
22import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
23import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
24import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
25import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
26import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker;
27import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker;
28import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand;
29import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand;
30import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand;
31import tools.refinery.viatra.runtime.rete.remote.Address;
32import tools.refinery.viatra.runtime.rete.single.SingleInputNode;
33import tools.refinery.viatra.runtime.rete.single.TrimmerNode;
34import tools.refinery.viatra.runtime.rete.util.Options;
35
36import java.util.*;
37import java.util.function.Function;
38
39/**
40 * @author Gabor Bergmann
41 *
42 * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue
43 * @since 2.2
44 */
45public final class ReteContainer {
46
47 protected Thread consumerThread = null;
48 protected boolean killed = false;
49
50 protected Network network;
51
52 protected LinkedList<Clearable> clearables;
53 protected Map<Long, Node> nodesById;
54 protected long nextId = 0;
55
56 protected ConnectionFactory connectionFactory;
57 protected NodeProvisioner nodeProvisioner;
58
59 protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>();
60 protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>();
61 protected Object externalMessageLock = new Object();
62 protected Long clock = 1L; // even: steady state, odd: active queue; access
63 // ONLY with messageQueue locked!
64 protected Map<ReteContainer, Long> terminationCriteria = null;
65 protected final Logger logger;
66 protected final CommunicationTracker tracker;
67
68 protected final IQueryBackendContext backendContext;
69
70 protected Set<DelayedCommand> delayedCommandQueue;
71 protected Set<DelayedCommand> delayedCommandBuffer;
72 protected boolean executingDelayedCommands;
73
74 protected final TimelyConfiguration timelyConfiguration;
75
76 private final CancellationToken cancellationToken;
77
78 /**
79 * @param threaded
80 * false if operating in a single-threaded environment
81 */
82 public ReteContainer(Network network, boolean threaded) {
83 super();
84 this.network = network;
85 this.backendContext = network.getEngine().getBackendContext();
86 this.timelyConfiguration = network.getEngine().getTimelyConfiguration();
87 cancellationToken = backendContext.getRuntimeContext().getCancellationToken();
88
89 this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>();
90 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
91 this.executingDelayedCommands = false;
92
93 if (this.isTimelyEvaluation()) {
94 this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration());
95 } else {
96 this.tracker = new TimelessCommunicationTracker();
97 }
98
99 this.nodesById = CollectionsFactory.createMap();
100 this.clearables = new LinkedList<Clearable>();
101 this.logger = network.getEngine().getLogger();
102
103 this.connectionFactory = new ConnectionFactory(this);
104 this.nodeProvisioner = new NodeProvisioner(this);
105
106 if (threaded) {
107 this.terminationCriteria = CollectionsFactory.createMap();
108 this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) {
109 @Override
110 public void run() {
111 messageConsumptionCycle();
112 }
113 };
114 this.consumerThread.start();
115 }
116 }
117
118 /**
119 * @since 2.4
120 */
121 public boolean isTimelyEvaluation() {
122 return this.timelyConfiguration != null;
123 }
124
125 /**
126 * @since 2.4
127 */
128 public TimelyConfiguration getTimelyConfiguration() {
129 return this.timelyConfiguration;
130 }
131
132 /**
133 * @since 1.6
134 * @return the communication graph of the nodes, incl. message scheduling
135 */
136 public CommunicationTracker getCommunicationTracker() {
137 return tracker;
138 }
139
140 /**
141 * Stops this container. To be called by Network.kill()
142 */
143 public void kill() {
144 killed = true;
145 if (consumerThread != null)
146 consumerThread.interrupt();
147 }
148
149 /**
150 * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption
151 * is that this container is the home of the receiver, but it is not strictly necessary.
152 *
153 * @param synchronise
154 * indicates whether the receiver should be synchronised to the current contents of the supplier
155 */
156 public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
157 boolean synchronise) {
158 if (!isLocal(receiver))
159 receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise);
160 else {
161 Receiver child = resolveLocal(receiver);
162 connectRemoteSupplier(supplier, child, synchronise);
163 }
164 }
165
166 /**
167 * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is
168 * that this container is the home of the receiver, but it is not strictly necessary.
169 *
170 * @param desynchronise
171 * indicates whether the current contents of the supplier should be subtracted from the receiver
172 */
173 public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
174 boolean desynchronise) {
175 if (!isLocal(receiver))
176 receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise);
177 else {
178 Receiver child = resolveLocal(receiver);
179 disconnectRemoteSupplier(supplier, child, desynchronise);
180 }
181 }
182
183 /**
184 * Establishes connection between a remote supplier and a local receiver node.
185 *
186 * @param synchronise
187 * indicates whether the receiver should be synchronised to the current contents of the supplier
188 */
189 public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) {
190 Supplier parent = nodeProvisioner.asSupplier(supplier);
191 if (synchronise)
192 connectAndSynchronize(parent, receiver);
193 else
194 connect(parent, receiver);
195 }
196
197 /**
198 * Severs connection between a remote supplier and a local receiver node.
199 *
200 * @param desynchronise
201 * indicates whether the current contents of the supplier should be subtracted from the receiver
202 */
203 public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver,
204 boolean desynchronise) {
205 Supplier parent = nodeProvisioner.asSupplier(supplier);
206 if (desynchronise)
207 disconnectAndDesynchronize(parent, receiver);
208 else
209 disconnect(parent, receiver);
210 }
211
212 /**
213 * Connects a receiver to a supplier
214 */
215 public void connect(Supplier supplier, Receiver receiver) {
216 supplier.appendChild(receiver);
217 receiver.appendParent(supplier);
218 tracker.registerDependency(supplier, receiver);
219 }
220
221 /**
222 * Disconnects a receiver from a supplier
223 */
224 public void disconnect(Supplier supplier, Receiver receiver) {
225 supplier.removeChild(receiver);
226 receiver.removeParent(supplier);
227 tracker.unregisterDependency(supplier, receiver);
228 }
229
230 /**
231 * @since 2.3
232 */
233 public boolean isExecutingDelayedCommands() {
234 return this.executingDelayedCommands;
235 }
236
237 /**
238 * @since 2.3
239 */
240 public Set<DelayedCommand> getDelayedCommandQueue() {
241 if (this.executingDelayedCommands) {
242 return this.delayedCommandBuffer;
243 } else {
244 return this.delayedCommandQueue;
245 }
246 }
247
248 /**
249 * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier
250 */
251 public void connectAndSynchronize(Supplier supplier, Receiver receiver) {
252 supplier.appendChild(receiver);
253 receiver.appendParent(supplier);
254 tracker.registerDependency(supplier, receiver);
255 getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this));
256 }
257
258 /**
259 * Disconnects a receiver from a supplier
260 */
261 public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) {
262 final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver);
263 supplier.removeChild(receiver);
264 receiver.removeParent(supplier);
265 tracker.unregisterDependency(supplier, receiver);
266 getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC));
267 }
268
269 /**
270 * @since 2.3
271 */
272 public void executeDelayedCommands() {
273 if (!this.delayedCommandQueue.isEmpty()) {
274 flushUpdates();
275 this.executingDelayedCommands = true;
276 for (final DelayedCommand command : this.delayedCommandQueue) {
277 command.run();
278 }
279 this.delayedCommandQueue = this.delayedCommandBuffer;
280 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
281 flushUpdates();
282 this.executingDelayedCommands = false;
283 }
284 }
285
286 /**
287 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
288 * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre:
289 * address.container == this, e.g. address MUST be local
290 *
291 * @return the value of the container's clock at the time when the message was accepted into the local message queue
292 */
293 long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) {
294 long timestamp;
295 Receiver receiver = resolveLocal(address);
296 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
297 synchronized (externalMessageLock) {
298 externalMessageQueue.add(message);
299 timestamp = clock;
300 externalMessageLock.notifyAll();
301 }
302
303 return timestamp;
304
305 }
306
307 /**
308 * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial
309 * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other
310 * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty!
311 *
312 * @return the value of the container's clock at the time when the message was accepted into the local message queue
313 */
314 long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction,
315 Collection<Tuple> updateElements) {
316
317 long timestamp;
318 Receiver receiver = resolveLocal(address);
319 // UpdateMessage message = new UpdateMessage(receiver, direction,
320 // updateElement);
321 synchronized (externalMessageLock) {
322 for (Tuple ps : updateElements)
323 externalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
324 // messageQueue.add(new UpdateMessage(resolveLocal(address),
325 // direction, updateElement));
326 // this.sendUpdateInternal(resolveLocal(address), direction,
327 // updateElement);
328 timestamp = clock;
329 externalMessageLock.notifyAll();
330 }
331
332 return timestamp;
333 }
334
335 /**
336 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
337 * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any
338 * other way.
339 */
340 void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
341 Tuple updateElement) {
342 Receiver receiver = resolveLocal(address);
343 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
344 internalMessageQueue.add(message);
345 }
346
347 /**
348 * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The
349 * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT
350 * use in any other way.
351 *
352 * @pre: address.container == this, e.g. address MUST be local
353 */
354 void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
355 Collection<Tuple> updateElements) {
356 Receiver receiver = resolveLocal(address);
357 for (Tuple ps : updateElements)
358 internalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
359 }
360
361 /**
362 * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to
363 * be called by RemoteReceivers, DO NOT use in any other way.
364 *
365 * @since 2.4
366 */
367 public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction,
368 Tuple updateElement) {
369 ReteContainer otherContainer = address.getContainer();
370 long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement);
371 // Long criterion = terminationCriteria.get(otherContainer);
372 // if (criterion==null || otherClock > criterion)
373 terminationCriteria.put(otherContainer, otherClock);
374 }
375
376 /**
377 * Finalises all update sequences and returns. To be called from user threads (e.g. network construction).
378 */
379 public void flushUpdates() {
380 network.waitForReteTermination();
381 // synchronized (messageQueue)
382 // {
383 // while (!messageQueue.isEmpty())
384 // {
385 // try {
386 // UpdateMessage message = messageQueue.take();
387 // message.receiver.update(message.direction, message.updateElement);
388 // } catch (InterruptedException e) {}
389 // }
390 // }
391 }
392
393 /**
394 * Retrieves a safe copy of the contents of a supplier.
395 *
396 * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set.
397 *
398 * @param flush if true, a flush is performed before pulling the contents
399 * @since 2.3
400 */
401 public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) {
402 if (flush) {
403 flushUpdates();
404 }
405 final Collection<Tuple> collector = new ArrayList<Tuple>();
406 supplier.pullInto(collector, flush);
407 return collector;
408 }
409
410 /**
411 * @since 2.4
412 */
413 public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) {
414 if (flush) {
415 flushUpdates();
416 }
417 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
418 supplier.pullIntoWithTimeline(collector, flush);
419 return collector;
420 }
421
422 /**
423 * Retrieves the contents of a SingleInputNode's parentage.
424 *
425 * @since 2.3
426 */
427 public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) {
428 if (flush) {
429 flushUpdates();
430 }
431 final Collection<Tuple> collector = new LinkedList<Tuple>();
432 supplier.propagatePullInto(collector, flush);
433 return collector;
434 }
435
436 /**
437 * Retrieves the timestamp-aware contents of a SingleInputNode's parentage.
438 *
439 * @since 2.3
440 */
441 public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier,
442 final boolean flush) {
443 if (flush) {
444 flushUpdates();
445 }
446 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
447 supplier.propagatePullIntoWithTimestamp(collector, flush);
448 return collector;
449 }
450
451 /**
452 * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the
453 * supplier, but it is not strictly necessary.
454 *
455 * @param supplier
456 * the address of the supplier to be pulled.
457 * @since 2.3
458 */
459 public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) {
460 if (!isLocal(supplier))
461 return supplier.getContainer().remotePull(supplier, flush);
462 return pullContents(resolveLocal(supplier), flush);
463 }
464
465 /**
466 * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to
467 * a remote or local caller.
468 */
469 public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) {
470 if (!isLocal(production))
471 return production.getContainer().remotePosMapping(production);
472 return resolveLocal(production).getPosMapping();
473 }
474
475 /**
476 * Continually consumes update messages. Should be run on a dedicated thread.
477 */
478 void messageConsumptionCycle() {
479 while (!killed) // deliver messages on and on and on....
480 {
481 long incrementedClock = 0;
482 UpdateMessage message = null;
483
484 if (!internalMessageQueue.isEmpty()) // take internal messages first
485 message = internalMessageQueue.removeFirst();
486 else
487 // no internal message, take an incoming message
488 synchronized (externalMessageLock) { // no sleeping allowed,
489 // because external
490 // queue is locked for
491 // precise clocking of
492 // termination point!
493 if (!externalMessageQueue.isEmpty()) { // if external queue
494 // is non-empty,
495 // retrieve the next
496 // message instantly
497 message = takeExternalMessage();
498 } else { // if external queue is found empty (and this is
499 // the first time in a row)
500 incrementedClock = ++clock; // local termination point
501 // synchronized(clock){incrementedClock = ++clock;}
502 }
503 }
504
505 if (message == null) // both queues were empty
506 {
507 localUpdateTermination(incrementedClock); // report local
508 // termination point
509 while (message == null) // wait for a message while external
510 // queue is still empty
511 {
512 synchronized (externalMessageLock) {
513 while (externalMessageQueue.isEmpty()) {
514 try {
515 externalMessageLock.wait();
516 } catch (InterruptedException e) {
517 if (killed)
518 return;
519 }
520 }
521 message = takeExternalMessage();
522 }
523
524 }
525 }
526
527 // now we have a message to deliver
528 // NOTE: this method is not compatible with differential dataflow
529 message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO);
530 }
531 }
532
533 /**
534 * @since 1.6
535 */
536 public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0,
537 Math.min(30, input.toString().length()));
538
539 /**
540 * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker.
541 *
542 * @since 1.6
543 */
544 public void deliverMessagesSingleThreaded() {
545 if (!backendContext.areUpdatesDelayed()) {
546 if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) {
547 // known unreachable; enable for debugging only
548
549 CommunicationGroup lastGroup = null;
550 Set<CommunicationGroup> seenInThisCycle = new HashSet<>();
551
552 while (!tracker.isEmpty()) {
553 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
554
555 /**
556 * The current group does not violate the communication schema iff (1) it was not seen before OR (2)
557 * the last one that was seen is exactly the same as the current one this can happen if the group
558 * was added back because of in-group message passing
559 */
560 boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group);
561
562 if (!okGroup) {
563 logger.error(
564 "[INTERNAL ERROR] Violation of communication schema! The communication component with representative "
565 + group.getRepresentative() + " has already been processed!");
566 }
567
568 group.deliverMessages();
569
570 lastGroup = group;
571 }
572
573 } else {
574 while (!tracker.isEmpty()) {
575 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
576 group.deliverMessages();
577 }
578 }
579 }
580 }
581
582 private void localUpdateTermination(long incrementedClock) {
583 network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria);
584 terminationCriteria.clear();
585
586 // synchronized(clock){++clock;} // +1 incrementing for parity and easy
587 // comparison
588 }
589
590 // @pre: externalMessageQueue synchronized && nonempty
591 private UpdateMessage takeExternalMessage() {
592 UpdateMessage message = externalMessageQueue.removeFirst();
593 if (!externalMessageQueue.isEmpty()) { // copy the whole queue over
594 // for speedup
595 Deque<UpdateMessage> temp = externalMessageQueue;
596 externalMessageQueue = internalMessageQueue;
597 internalMessageQueue = temp;
598 }
599 return message;
600 }
601
602 /**
603 * Provides an external address for the selected node.
604 *
605 * @pre node belongs to this container.
606 */
607 public <N extends Node> Address<N> makeAddress(N node) {
608 return new Address<N>(node);
609 }
610
611 /**
612 * Checks whether a certain address points to a node at this container.
613 */
614 public boolean isLocal(Address<? extends Node> address) {
615 return address.getContainer() == this;
616 }
617
618 /**
619 * Returns an addressed node at this container.
620 *
621 * @pre: address.container == this, e.g. address MUST be local
622 * @throws IllegalArgumentException
623 * if address is non-local
624 */
625 @SuppressWarnings("unchecked")
626 public <N extends Node> N resolveLocal(Address<N> address) {
627 if (this != address.getContainer())
628 throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this));
629
630 N cached = address.getNodeCache();
631 if (cached != null)
632 return cached;
633 else {
634 N node = (N) nodesById.get(address.getNodeId());
635 address.setNodeCache(node);
636 return node;
637 }
638 }
639
640 /**
641 * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its
642 * constructor.
643 *
644 * @return the unique nodeId issued to the node.
645 */
646 public long registerNode(Node n) {
647 long id = nextId++;
648 nodesById.put(id, n);
649 return id;
650 }
651
652 /**
653 * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or
654 * otherwise referenced.
655 */
656 public void unregisterNode(Node n) {
657 nodesById.remove(n.getNodeId());
658 }
659
660 /**
661 * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node.
662 */
663 public void registerClearable(Clearable c) {
664 clearables.addFirst(c);
665 }
666
667 /**
668 * Unregisters a pattern memory from the rete network.
669 */
670 public void unregisterClearable(Clearable c) {
671 clearables.remove(c);
672 }
673
674 /**
675 * Clears all memory contents in the network. Reverts to initial state.
676 */
677 public void clearAll() {
678 for (Clearable c : clearables) {
679 c.clear();
680 }
681 }
682
683 public NodeFactory getNodeFactory() {
684 return network.getNodeFactory();
685 }
686
687 public ConnectionFactory getConnectionFactory() {
688 return connectionFactory;
689 }
690
691 public NodeProvisioner getProvisioner() {
692 return nodeProvisioner;
693 }
694
695 public Network getNetwork() {
696 return network;
697 }
698
699 @Override
700 public String toString() {
701 StringBuilder sb = new StringBuilder();
702 String separator = System.getProperty("line.separator");
703 sb.append(super.toString() + "[[[" + separator);
704 java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet());
705 java.util.Collections.sort(keys);
706 for (Long key : keys) {
707 sb.append(key + " -> " + nodesById.get(key) + separator);
708 }
709 sb.append("]]] of " + network);
710 return sb.toString();
711 }
712
713 /**
714 * Access all the Rete nodes inside this container.
715 *
716 * @return the collection of {@link Node} instances
717 */
718 public Collection<Node> getAllNodes() {
719 return nodesById.values();
720 }
721
722 public InputConnector getInputConnectionFactory() {
723 return network.getInputConnector();
724 }
725
726 public void checkCancelled() {
727 cancellationToken.checkCancelled();
728 }
729}