aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java729
1 files changed, 729 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
new file mode 100644
index 00000000..16e290fd
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ReteContainer.java
@@ -0,0 +1,729 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.network;
11
12import java.util.ArrayDeque;
13import java.util.ArrayList;
14import java.util.Collection;
15import java.util.Deque;
16import java.util.HashSet;
17import java.util.LinkedHashSet;
18import java.util.LinkedList;
19import java.util.Map;
20import java.util.Set;
21import java.util.function.Function;
22
23import org.apache.log4j.Logger;
24import tools.refinery.viatra.runtime.matchers.context.IQueryBackendContext;
25import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
26import tools.refinery.viatra.runtime.matchers.util.Clearable;
27import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory;
28import tools.refinery.viatra.runtime.matchers.util.Direction;
29import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
30import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
31import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration;
32import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup;
33import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker;
34import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
35import tools.refinery.viatra.runtime.rete.network.communication.timeless.TimelessCommunicationTracker;
36import tools.refinery.viatra.runtime.rete.network.communication.timely.TimelyCommunicationTracker;
37import tools.refinery.viatra.runtime.rete.network.delayed.DelayedCommand;
38import tools.refinery.viatra.runtime.rete.network.delayed.DelayedConnectCommand;
39import tools.refinery.viatra.runtime.rete.network.delayed.DelayedDisconnectCommand;
40import tools.refinery.viatra.runtime.rete.remote.Address;
41import tools.refinery.viatra.runtime.rete.single.SingleInputNode;
42import tools.refinery.viatra.runtime.rete.single.TrimmerNode;
43import tools.refinery.viatra.runtime.rete.util.Options;
44
45/**
46 * @author Gabor Bergmann
47 *
48 * Mutexes: externalMessageLock - enlisting messages into and retrieving from the external message queue
49 * @since 2.2
50 */
51public final class ReteContainer {
52
53 protected Thread consumerThread = null;
54 protected boolean killed = false;
55
56 protected Network network;
57
58 protected LinkedList<Clearable> clearables;
59 protected Map<Long, Node> nodesById;
60 protected long nextId = 0;
61
62 protected ConnectionFactory connectionFactory;
63 protected NodeProvisioner nodeProvisioner;
64
65 protected Deque<UpdateMessage> internalMessageQueue = new ArrayDeque<UpdateMessage>();
66 protected/* volatile */Deque<UpdateMessage> externalMessageQueue = new ArrayDeque<UpdateMessage>();
67 protected Object externalMessageLock = new Object();
68 protected Long clock = 1L; // even: steady state, odd: active queue; access
69 // ONLY with messageQueue locked!
70 protected Map<ReteContainer, Long> terminationCriteria = null;
71 protected final Logger logger;
72 protected final CommunicationTracker tracker;
73
74 protected final IQueryBackendContext backendContext;
75
76 protected Set<DelayedCommand> delayedCommandQueue;
77 protected Set<DelayedCommand> delayedCommandBuffer;
78 protected boolean executingDelayedCommands;
79
80 protected final TimelyConfiguration timelyConfiguration;
81
82 /**
83 * @param threaded
84 * false if operating in a single-threaded environment
85 */
86 public ReteContainer(Network network, boolean threaded) {
87 super();
88 this.network = network;
89 this.backendContext = network.getEngine().getBackendContext();
90 this.timelyConfiguration = network.getEngine().getTimelyConfiguration();
91
92 this.delayedCommandQueue = new LinkedHashSet<DelayedCommand>();
93 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
94 this.executingDelayedCommands = false;
95
96 if (this.isTimelyEvaluation()) {
97 this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration());
98 } else {
99 this.tracker = new TimelessCommunicationTracker();
100 }
101
102 this.nodesById = CollectionsFactory.createMap();
103 this.clearables = new LinkedList<Clearable>();
104 this.logger = network.getEngine().getLogger();
105
106 this.connectionFactory = new ConnectionFactory(this);
107 this.nodeProvisioner = new NodeProvisioner(this);
108
109 if (threaded) {
110 this.terminationCriteria = CollectionsFactory.createMap();
111 this.consumerThread = new Thread("Rete thread of " + ReteContainer.super.toString()) {
112 @Override
113 public void run() {
114 messageConsumptionCycle();
115 }
116 };
117 this.consumerThread.start();
118 }
119 }
120
121 /**
122 * @since 2.4
123 */
124 public boolean isTimelyEvaluation() {
125 return this.timelyConfiguration != null;
126 }
127
128 /**
129 * @since 2.4
130 */
131 public TimelyConfiguration getTimelyConfiguration() {
132 return this.timelyConfiguration;
133 }
134
135 /**
136 * @since 1.6
137 * @return the communication graph of the nodes, incl. message scheduling
138 */
139 public CommunicationTracker getCommunicationTracker() {
140 return tracker;
141 }
142
143 /**
144 * Stops this container. To be called by Network.kill()
145 */
146 public void kill() {
147 killed = true;
148 if (consumerThread != null)
149 consumerThread.interrupt();
150 }
151
152 /**
153 * Establishes connection between a supplier and a receiver node, regardless which container they are in. Assumption
154 * is that this container is the home of the receiver, but it is not strictly necessary.
155 *
156 * @param synchronise
157 * indicates whether the receiver should be synchronised to the current contents of the supplier
158 */
159 public void connectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
160 boolean synchronise) {
161 if (!isLocal(receiver))
162 receiver.getContainer().connectRemoteNodes(supplier, receiver, synchronise);
163 else {
164 Receiver child = resolveLocal(receiver);
165 connectRemoteSupplier(supplier, child, synchronise);
166 }
167 }
168
169 /**
170 * Severs connection between a supplier and a receiver node, regardless which container they are in. Assumption is
171 * that this container is the home of the receiver, but it is not strictly necessary.
172 *
173 * @param desynchronise
174 * indicates whether the current contents of the supplier should be subtracted from the receiver
175 */
176 public void disconnectRemoteNodes(Address<? extends Supplier> supplier, Address<? extends Receiver> receiver,
177 boolean desynchronise) {
178 if (!isLocal(receiver))
179 receiver.getContainer().disconnectRemoteNodes(supplier, receiver, desynchronise);
180 else {
181 Receiver child = resolveLocal(receiver);
182 disconnectRemoteSupplier(supplier, child, desynchronise);
183 }
184 }
185
186 /**
187 * Establishes connection between a remote supplier and a local receiver node.
188 *
189 * @param synchronise
190 * indicates whether the receiver should be synchronised to the current contents of the supplier
191 */
192 public void connectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver, boolean synchronise) {
193 Supplier parent = nodeProvisioner.asSupplier(supplier);
194 if (synchronise)
195 connectAndSynchronize(parent, receiver);
196 else
197 connect(parent, receiver);
198 }
199
200 /**
201 * Severs connection between a remote supplier and a local receiver node.
202 *
203 * @param desynchronise
204 * indicates whether the current contents of the supplier should be subtracted from the receiver
205 */
206 public void disconnectRemoteSupplier(Address<? extends Supplier> supplier, Receiver receiver,
207 boolean desynchronise) {
208 Supplier parent = nodeProvisioner.asSupplier(supplier);
209 if (desynchronise)
210 disconnectAndDesynchronize(parent, receiver);
211 else
212 disconnect(parent, receiver);
213 }
214
215 /**
216 * Connects a receiver to a supplier
217 */
218 public void connect(Supplier supplier, Receiver receiver) {
219 supplier.appendChild(receiver);
220 receiver.appendParent(supplier);
221 tracker.registerDependency(supplier, receiver);
222 }
223
224 /**
225 * Disconnects a receiver from a supplier
226 */
227 public void disconnect(Supplier supplier, Receiver receiver) {
228 supplier.removeChild(receiver);
229 receiver.removeParent(supplier);
230 tracker.unregisterDependency(supplier, receiver);
231 }
232
233 /**
234 * @since 2.3
235 */
236 public boolean isExecutingDelayedCommands() {
237 return this.executingDelayedCommands;
238 }
239
240 /**
241 * @since 2.3
242 */
243 public Set<DelayedCommand> getDelayedCommandQueue() {
244 if (this.executingDelayedCommands) {
245 return this.delayedCommandBuffer;
246 } else {
247 return this.delayedCommandQueue;
248 }
249 }
250
251 /**
252 * Connects a receiver to a remote supplier, and synchronizes it to the current contents of the supplier
253 */
254 public void connectAndSynchronize(Supplier supplier, Receiver receiver) {
255 supplier.appendChild(receiver);
256 receiver.appendParent(supplier);
257 tracker.registerDependency(supplier, receiver);
258 getDelayedCommandQueue().add(new DelayedConnectCommand(supplier, receiver, this));
259 }
260
261 /**
262 * Disconnects a receiver from a supplier
263 */
264 public void disconnectAndDesynchronize(Supplier supplier, Receiver receiver) {
265 final boolean wasInSameSCC = this.isTimelyEvaluation() && this.tracker.areInSameGroup(supplier, receiver);
266 supplier.removeChild(receiver);
267 receiver.removeParent(supplier);
268 tracker.unregisterDependency(supplier, receiver);
269 getDelayedCommandQueue().add(new DelayedDisconnectCommand(supplier, receiver, this, wasInSameSCC));
270 }
271
272 /**
273 * @since 2.3
274 */
275 public void executeDelayedCommands() {
276 if (!this.delayedCommandQueue.isEmpty()) {
277 flushUpdates();
278 this.executingDelayedCommands = true;
279 for (final DelayedCommand command : this.delayedCommandQueue) {
280 command.run();
281 }
282 this.delayedCommandQueue = this.delayedCommandBuffer;
283 this.delayedCommandBuffer = new LinkedHashSet<DelayedCommand>();
284 flushUpdates();
285 this.executingDelayedCommands = false;
286 }
287 }
288
289 /**
290 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
291 * indicated by the Address. Designed to be called by the Network, DO NOT use in any other way. @pre:
292 * address.container == this, e.g. address MUST be local
293 *
294 * @return the value of the container's clock at the time when the message was accepted into the local message queue
295 */
296 long sendUpdateToLocalAddress(Address<? extends Receiver> address, Direction direction, Tuple updateElement) {
297 long timestamp;
298 Receiver receiver = resolveLocal(address);
299 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
300 synchronized (externalMessageLock) {
301 externalMessageQueue.add(message);
302 timestamp = clock;
303 externalMessageLock.notifyAll();
304 }
305
306 return timestamp;
307
308 }
309
310 /**
311 * Sends multiple update messages atomically to the receiver node, indicating a newly found or lost partial
312 * matching. The receiver is indicated by the Address. Designed to be called by the Network, DO NOT use in any other
313 * way. @pre: address.container == this, e.g. address MUST be local @pre: updateElements is nonempty!
314 *
315 * @return the value of the container's clock at the time when the message was accepted into the local message queue
316 */
317 long sendUpdatesToLocalAddress(Address<? extends Receiver> address, Direction direction,
318 Collection<Tuple> updateElements) {
319
320 long timestamp;
321 Receiver receiver = resolveLocal(address);
322 // UpdateMessage message = new UpdateMessage(receiver, direction,
323 // updateElement);
324 synchronized (externalMessageLock) {
325 for (Tuple ps : updateElements)
326 externalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
327 // messageQueue.add(new UpdateMessage(resolveLocal(address),
328 // direction, updateElement));
329 // this.sendUpdateInternal(resolveLocal(address), direction,
330 // updateElement);
331 timestamp = clock;
332 externalMessageLock.notifyAll();
333 }
334
335 return timestamp;
336 }
337
338 /**
339 * Sends an update message to the receiver node, indicating a newly found or lost partial matching. The receiver is
340 * indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT use in any
341 * other way.
342 */
343 void sendUpdateToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
344 Tuple updateElement) {
345 Receiver receiver = resolveLocal(address);
346 UpdateMessage message = new UpdateMessage(receiver, direction, updateElement);
347 internalMessageQueue.add(message);
348 }
349
350 /**
351 * Sends multiple update messages to the receiver node, indicating a newly found or lost partial matching. The
352 * receiver is indicated by the Address. Designed to be called by the Network in single-threaded operation, DO NOT
353 * use in any other way.
354 *
355 * @pre: address.container == this, e.g. address MUST be local
356 */
357 void sendUpdatesToLocalAddressSingleThreaded(Address<? extends Receiver> address, Direction direction,
358 Collection<Tuple> updateElements) {
359 Receiver receiver = resolveLocal(address);
360 for (Tuple ps : updateElements)
361 internalMessageQueue.add(new UpdateMessage(receiver, direction, ps));
362 }
363
364 /**
365 * Sends an update message to a node in a different container. The receiver is indicated by the Address. Designed to
366 * be called by RemoteReceivers, DO NOT use in any other way.
367 *
368 * @since 2.4
369 */
370 public void sendUpdateToRemoteAddress(Address<? extends Receiver> address, Direction direction,
371 Tuple updateElement) {
372 ReteContainer otherContainer = address.getContainer();
373 long otherClock = otherContainer.sendUpdateToLocalAddress(address, direction, updateElement);
374 // Long criterion = terminationCriteria.get(otherContainer);
375 // if (criterion==null || otherClock > criterion)
376 terminationCriteria.put(otherContainer, otherClock);
377 }
378
379 /**
380 * Finalises all update sequences and returns. To be called from user threads (e.g. network construction).
381 */
382 public void flushUpdates() {
383 network.waitForReteTermination();
384 // synchronized (messageQueue)
385 // {
386 // while (!messageQueue.isEmpty())
387 // {
388 // try {
389 // UpdateMessage message = messageQueue.take();
390 // message.receiver.update(message.direction, message.updateElement);
391 // } catch (InterruptedException e) {}
392 // }
393 // }
394 }
395
396 /**
397 * Retrieves a safe copy of the contents of a supplier.
398 *
399 * <p> Note that there may be multiple copies of a Tuple in case of a {@link TrimmerNode}, so the result is not always a set.
400 *
401 * @param flush if true, a flush is performed before pulling the contents
402 * @since 2.3
403 */
404 public Collection<Tuple> pullContents(final Supplier supplier, final boolean flush) {
405 if (flush) {
406 flushUpdates();
407 }
408 final Collection<Tuple> collector = new ArrayList<Tuple>();
409 supplier.pullInto(collector, flush);
410 return collector;
411 }
412
413 /**
414 * @since 2.4
415 */
416 public Map<Tuple, Timeline<Timestamp>> pullContentsWithTimeline(final Supplier supplier, final boolean flush) {
417 if (flush) {
418 flushUpdates();
419 }
420 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
421 supplier.pullIntoWithTimeline(collector, flush);
422 return collector;
423 }
424
425 /**
426 * Retrieves the contents of a SingleInputNode's parentage.
427 *
428 * @since 2.3
429 */
430 public Collection<Tuple> pullPropagatedContents(final SingleInputNode supplier, final boolean flush) {
431 if (flush) {
432 flushUpdates();
433 }
434 final Collection<Tuple> collector = new LinkedList<Tuple>();
435 supplier.propagatePullInto(collector, flush);
436 return collector;
437 }
438
439 /**
440 * Retrieves the timestamp-aware contents of a SingleInputNode's parentage.
441 *
442 * @since 2.3
443 */
444 public Map<Tuple, Timeline<Timestamp>> pullPropagatedContentsWithTimestamp(final SingleInputNode supplier,
445 final boolean flush) {
446 if (flush) {
447 flushUpdates();
448 }
449 final Map<Tuple, Timeline<Timestamp>> collector = CollectionsFactory.createMap();
450 supplier.propagatePullIntoWithTimestamp(collector, flush);
451 return collector;
452 }
453
454 /**
455 * Retrieves the contents of a supplier for a remote caller. Assumption is that this container is the home of the
456 * supplier, but it is not strictly necessary.
457 *
458 * @param supplier
459 * the address of the supplier to be pulled.
460 * @since 2.3
461 */
462 public Collection<Tuple> remotePull(Address<? extends Supplier> supplier, boolean flush) {
463 if (!isLocal(supplier))
464 return supplier.getContainer().remotePull(supplier, flush);
465 return pullContents(resolveLocal(supplier), flush);
466 }
467
468 /**
469 * Proxies for the getPosMapping() of Production nodes. Retrieves the posmapping of a remote or local Production to
470 * a remote or local caller.
471 */
472 public Map<String, Integer> remotePosMapping(Address<? extends ProductionNode> production) {
473 if (!isLocal(production))
474 return production.getContainer().remotePosMapping(production);
475 return resolveLocal(production).getPosMapping();
476 }
477
478 /**
479 * Continually consumes update messages. Should be run on a dedicated thread.
480 */
481 void messageConsumptionCycle() {
482 while (!killed) // deliver messages on and on and on....
483 {
484 long incrementedClock = 0;
485 UpdateMessage message = null;
486
487 if (!internalMessageQueue.isEmpty()) // take internal messages first
488 message = internalMessageQueue.removeFirst();
489 else
490 // no internal message, take an incoming message
491 synchronized (externalMessageLock) { // no sleeping allowed,
492 // because external
493 // queue is locked for
494 // precise clocking of
495 // termination point!
496 if (!externalMessageQueue.isEmpty()) { // if external queue
497 // is non-empty,
498 // retrieve the next
499 // message instantly
500 message = takeExternalMessage();
501 } else { // if external queue is found empty (and this is
502 // the first time in a row)
503 incrementedClock = ++clock; // local termination point
504 // synchronized(clock){incrementedClock = ++clock;}
505 }
506 }
507
508 if (message == null) // both queues were empty
509 {
510 localUpdateTermination(incrementedClock); // report local
511 // termination point
512 while (message == null) // wait for a message while external
513 // queue is still empty
514 {
515 synchronized (externalMessageLock) {
516 while (externalMessageQueue.isEmpty()) {
517 try {
518 externalMessageLock.wait();
519 } catch (InterruptedException e) {
520 if (killed)
521 return;
522 }
523 }
524 message = takeExternalMessage();
525 }
526
527 }
528 }
529
530 // now we have a message to deliver
531 // NOTE: this method is not compatible with differential dataflow
532 message.receiver.update(message.direction, message.updateElement, Timestamp.ZERO);
533 }
534 }
535
536 /**
537 * @since 1.6
538 */
539 public static final Function<Node, String> NAME_MAPPER = input -> input.toString().substring(0,
540 Math.min(30, input.toString().length()));
541
542 /**
543 * Sends out all pending messages to their receivers. The delivery is governed by the communication tracker.
544 *
545 * @since 1.6
546 */
547 public void deliverMessagesSingleThreaded() {
548 if (!backendContext.areUpdatesDelayed()) {
549 if (Options.MONITOR_VIOLATION_OF_RETE_NODEGROUP_TOPOLOGICAL_SORTING) {
550 // known unreachable; enable for debugging only
551
552 CommunicationGroup lastGroup = null;
553 Set<CommunicationGroup> seenInThisCycle = new HashSet<>();
554
555 while (!tracker.isEmpty()) {
556 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
557
558 /**
559 * The current group does not violate the communication schema iff (1) it was not seen before OR (2)
560 * the last one that was seen is exactly the same as the current one this can happen if the group
561 * was added back because of in-group message passing
562 */
563 boolean okGroup = (group == lastGroup) || seenInThisCycle.add(group);
564
565 if (!okGroup) {
566 logger.error(
567 "[INTERNAL ERROR] Violation of communication schema! The communication component with representative "
568 + group.getRepresentative() + " has already been processed!");
569 }
570
571 group.deliverMessages();
572
573 lastGroup = group;
574 }
575
576 } else {
577 while (!tracker.isEmpty()) {
578 final CommunicationGroup group = tracker.getAndRemoveFirstGroup();
579 group.deliverMessages();
580 }
581 }
582 }
583 }
584
585 private void localUpdateTermination(long incrementedClock) {
586 network.reportLocalUpdateTermination(this, incrementedClock, terminationCriteria);
587 terminationCriteria.clear();
588
589 // synchronized(clock){++clock;} // +1 incrementing for parity and easy
590 // comparison
591 }
592
593 // @pre: externalMessageQueue synchronized && nonempty
594 private UpdateMessage takeExternalMessage() {
595 UpdateMessage message = externalMessageQueue.removeFirst();
596 if (!externalMessageQueue.isEmpty()) { // copy the whole queue over
597 // for speedup
598 Deque<UpdateMessage> temp = externalMessageQueue;
599 externalMessageQueue = internalMessageQueue;
600 internalMessageQueue = temp;
601 }
602 return message;
603 }
604
605 /**
606 * Provides an external address for the selected node.
607 *
608 * @pre node belongs to this container.
609 */
610 public <N extends Node> Address<N> makeAddress(N node) {
611 return new Address<N>(node);
612 }
613
614 /**
615 * Checks whether a certain address points to a node at this container.
616 */
617 public boolean isLocal(Address<? extends Node> address) {
618 return address.getContainer() == this;
619 }
620
621 /**
622 * Returns an addressed node at this container.
623 *
624 * @pre: address.container == this, e.g. address MUST be local
625 * @throws IllegalArgumentException
626 * if address is non-local
627 */
628 @SuppressWarnings("unchecked")
629 public <N extends Node> N resolveLocal(Address<N> address) {
630 if (this != address.getContainer())
631 throw new IllegalArgumentException(String.format("Address %s non-local at container %s", address, this));
632
633 N cached = address.getNodeCache();
634 if (cached != null)
635 return cached;
636 else {
637 N node = (N) nodesById.get(address.getNodeId());
638 address.setNodeCache(node);
639 return node;
640 }
641 }
642
643 /**
644 * Registers a node into the rete network (should be called by constructor). Every node MUST be registered by its
645 * constructor.
646 *
647 * @return the unique nodeId issued to the node.
648 */
649 public long registerNode(Node n) {
650 long id = nextId++;
651 nodesById.put(id, n);
652 return id;
653 }
654
655 /**
656 * Unregisters a node from the rete network. Do NOT call if node is still connected to other Nodes, or Adressed or
657 * otherwise referenced.
658 */
659 public void unregisterNode(Node n) {
660 nodesById.remove(n.getNodeId());
661 }
662
663 /**
664 * Registers a pattern memory into the rete network. Every memory MUST be registered by its owner node.
665 */
666 public void registerClearable(Clearable c) {
667 clearables.addFirst(c);
668 }
669
670 /**
671 * Unregisters a pattern memory from the rete network.
672 */
673 public void unregisterClearable(Clearable c) {
674 clearables.remove(c);
675 }
676
677 /**
678 * Clears all memory contents in the network. Reverts to initial state.
679 */
680 public void clearAll() {
681 for (Clearable c : clearables) {
682 c.clear();
683 }
684 }
685
686 public NodeFactory getNodeFactory() {
687 return network.getNodeFactory();
688 }
689
690 public ConnectionFactory getConnectionFactory() {
691 return connectionFactory;
692 }
693
694 public NodeProvisioner getProvisioner() {
695 return nodeProvisioner;
696 }
697
698 public Network getNetwork() {
699 return network;
700 }
701
702 @Override
703 public String toString() {
704 StringBuilder sb = new StringBuilder();
705 String separator = System.getProperty("line.separator");
706 sb.append(super.toString() + "[[[" + separator);
707 java.util.List<Long> keys = new java.util.ArrayList<Long>(nodesById.keySet());
708 java.util.Collections.sort(keys);
709 for (Long key : keys) {
710 sb.append(key + " -> " + nodesById.get(key) + separator);
711 }
712 sb.append("]]] of " + network);
713 return sb.toString();
714 }
715
716 /**
717 * Access all the Rete nodes inside this container.
718 *
719 * @return the collection of {@link Node} instances
720 */
721 public Collection<Node> getAllNodes() {
722 return nodesById.values();
723 }
724
725 public InputConnector getInputConnectionFactory() {
726 return network.getInputConnector();
727 }
728
729}