From acb5e04319883ceb0dffe25635db5dc22448f247 Mon Sep 17 00:00:00 2001 From: Kristóf Marussy Date: Sun, 15 Oct 2023 15:57:52 +0200 Subject: refactor(interpreter): communication tracker algorithm Use a faster algorithm to detect cycles in the RETE network. Only if cycles are detected fall back to the transitive closure algorithm to construct the SCCs and the reduced graph. --- .../interpreter/rete/network/ReteContainer.java | 7 +- .../communication/CommunicationTracker.java | 46 +++++++++--- .../communication/NetworkComponentDetector.java | 84 ++++++++++++++++++++++ .../timeless/TimelessCommunicationTracker.java | 20 +++--- .../timely/TimelyCommunicationTracker.java | 29 ++++---- 5 files changed, 151 insertions(+), 35 deletions(-) create mode 100644 subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/NetworkComponentDetector.java diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/ReteContainer.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/ReteContainer.java index b5c91d32..3eafc149 100644 --- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/ReteContainer.java +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/ReteContainer.java @@ -90,15 +90,16 @@ public final class ReteContainer { this.delayedCommandBuffer = new LinkedHashSet(); this.executingDelayedCommands = false; + this.logger = network.getEngine().getLogger(); + if (this.isTimelyEvaluation()) { - this.tracker = new TimelyCommunicationTracker(this.getTimelyConfiguration()); + this.tracker = new TimelyCommunicationTracker(logger, this.getTimelyConfiguration()); } else { - this.tracker = new TimelessCommunicationTracker(); + this.tracker = new TimelessCommunicationTracker(logger); } this.nodesById = CollectionsFactory.createMap(); this.clearables = new LinkedList(); - this.logger = network.getEngine().getLogger(); this.connectionFactory = new ConnectionFactory(this); this.nodeProvisioner = new NodeProvisioner(this); diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/CommunicationTracker.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/CommunicationTracker.java index 2e8eb338..1d089ba6 100644 --- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/CommunicationTracker.java +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/CommunicationTracker.java @@ -1,5 +1,6 @@ /******************************************************************************* * Copyright (c) 2010-2017, Tamas Szabo, Istvan Rath and Daniel Varro + * Copyright (c) 2023 The Refinery Authors * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at * http://www.eclipse.org/legal/epl-v20.html. @@ -8,12 +9,13 @@ *******************************************************************************/ package tools.refinery.interpreter.rete.network.communication; +import org.apache.log4j.Logger; +import org.jetbrains.annotations.Nullable; import tools.refinery.interpreter.matchers.tuple.TupleMask; import tools.refinery.interpreter.rete.aggregation.IAggregatorNode; import tools.refinery.interpreter.rete.boundary.ExternalInputEnumeratorNode; import tools.refinery.interpreter.rete.eval.RelationEvaluatorNode; import tools.refinery.interpreter.rete.index.*; -import tools.refinery.interpreter.rete.itc.alg.incscc.IncSCCAlg; import tools.refinery.interpreter.rete.itc.alg.misc.topsort.TopologicalSorting; import tools.refinery.interpreter.rete.itc.graphimpl.Graph; import tools.refinery.interpreter.rete.network.*; @@ -61,7 +63,7 @@ public abstract class CommunicationTracker { /** * Incremental SCC information about the dependency graph */ - protected final IncSCCAlg sccInformationProvider; + private final NetworkComponentDetector componentDetector; /** * Precomputed node -> communication group map @@ -76,9 +78,9 @@ public abstract class CommunicationTracker { // groups should have a simple integer flag which represents its position in a priority queue // priority queue only contains the ACTIVE groups - public CommunicationTracker() { + public CommunicationTracker(Logger logger) { this.dependencyGraph = new Graph(); - this.sccInformationProvider = new IncSCCAlg(this.dependencyGraph); + this.componentDetector = new NetworkComponentDetector(logger, dependencyGraph); this.groupQueue = new PriorityQueue(); this.groupMap = new HashMap(); } @@ -91,11 +93,33 @@ public abstract class CommunicationTracker { return this.groupMap.get(node); } + @Nullable + protected Set getPartition(Node node) { + return componentDetector.getPartition(node); + } + + protected boolean isSingleton(Node node) { + var partition = getPartition(node); + return partition == null || partition.isEmpty(); + } + + protected Node getRepresentative(Node node) { + return componentDetector.getRepresentative(node); + } + + protected boolean hasOutgoingEdges(Node representative) { + return componentDetector.hasOutgoingEdges(representative); + } + + protected Graph getReducedGraph() { + return componentDetector.getReducedGraph(); + } + private void precomputeGroups() { groupMap.clear(); // reconstruct group map from dependency graph - final Graph reducedGraph = sccInformationProvider.getReducedGraph(); + final Graph reducedGraph = getReducedGraph(); final List representatives = TopologicalSorting.compute(reducedGraph); for (int i = 0; i < representatives.size(); i++) { // groups for SCC representatives @@ -107,7 +131,7 @@ public abstract class CommunicationTracker { maxGroupId = representatives.size() - 1; for (final Node node : dependencyGraph.getAllNodes()) { // extend group map to the rest of nodes - final Node representative = sccInformationProvider.getRepresentative(node); + final Node representative = getRepresentative(node); final CommunicationGroup group = groupMap.get(representative); if (representative != node) { addToGroup(node, group); @@ -285,9 +309,9 @@ public abstract class CommunicationTracker { // query all these information before the actual edge insertion // because SCCs may be unified during the process - final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); - final Node targetRepresentative = sccInformationProvider.getRepresentative(target); - final boolean targetHadOutgoingEdges = sccInformationProvider.hasOutgoingEdges(targetRepresentative); + final Node sourceRepresentative = getRepresentative(source); + final Node targetRepresentative = getRepresentative(target); + final boolean targetHadOutgoingEdges = hasOutgoingEdges(targetRepresentative); // insert the edge dependencyGraph.insertEdge(source, target); @@ -372,8 +396,8 @@ public abstract class CommunicationTracker { // delete the edge first, and then query the SCC info provider this.dependencyGraph.deleteEdgeIfExists(source, target); - final Node sourceRepresentative = sccInformationProvider.getRepresentative(source); - final Node targetRepresentative = sccInformationProvider.getRepresentative(target); + final Node sourceRepresentative = getRepresentative(source); + final Node targetRepresentative = getRepresentative(target); // if they are still in the same SCC, // then this deletion did not affect the SCCs, diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/NetworkComponentDetector.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/NetworkComponentDetector.java new file mode 100644 index 00000000..6ed84e45 --- /dev/null +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/NetworkComponentDetector.java @@ -0,0 +1,84 @@ +/* + * SPDX-FileCopyrightText: 2023 The Refinery Authors + * + * SPDX-License-Identifier: EPL-2.0 + */ +package tools.refinery.interpreter.rete.network.communication; + +import org.apache.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import tools.refinery.interpreter.matchers.util.Direction; +import tools.refinery.interpreter.rete.itc.alg.incscc.IncSCCAlg; +import tools.refinery.interpreter.rete.itc.alg.representative.RepresentativeObserver; +import tools.refinery.interpreter.rete.itc.alg.representative.StronglyConnectedComponentAlgorithm; +import tools.refinery.interpreter.rete.itc.graphimpl.Graph; +import tools.refinery.interpreter.rete.network.Node; + +import java.util.Set; + +public class NetworkComponentDetector implements RepresentativeObserver { + private final Logger logger; + private final Graph dependencyGraph; + private StronglyConnectedComponentAlgorithm stronglyConnectedComponentAlgorithm; + private IncSCCAlg sccInformationProvider; + + public NetworkComponentDetector(Logger logger, Graph dependencyGraph) { + this.logger = logger; + this.dependencyGraph = dependencyGraph; + stronglyConnectedComponentAlgorithm = new StronglyConnectedComponentAlgorithm<>(dependencyGraph); + stronglyConnectedComponentAlgorithm.setObserver(this); + if (stronglyConnectedComponentAlgorithm.getComponents() + .values() + .stream() + .anyMatch(component -> component.size() > 1)) { + switchToTransitiveClosureAlgorithm(); + } + } + + @Nullable + public Set getPartition(Node node) { + if (sccInformationProvider == null) { + return null; + } + return sccInformationProvider.sccs.getPartition(node); + } + + public Node getRepresentative(Node node) { + if (sccInformationProvider == null) { + return node; + } + return sccInformationProvider.getRepresentative(node); + } + + public boolean hasOutgoingEdges(Node representative) { + if (sccInformationProvider == null) { + return !dependencyGraph.getTargetNodes(representative).isEmpty(); + } + return sccInformationProvider.hasOutgoingEdges(representative); + } + + public Graph getReducedGraph() { + if (sccInformationProvider == null) { + return dependencyGraph; + } + return sccInformationProvider.getReducedGraph(); + } + + @Override + public void tupleChanged(Node node, Node representative, Direction direction) { + if (direction == Direction.INSERT && !node.equals(representative)) { + switchToTransitiveClosureAlgorithm(); + } + } + + private void switchToTransitiveClosureAlgorithm() { + logger.warn("RETE network cycle detected, switching to transitive closure algorithm for communication groups"); + if (stronglyConnectedComponentAlgorithm != null) { + stronglyConnectedComponentAlgorithm.dispose(); + stronglyConnectedComponentAlgorithm = null; + } + if (sccInformationProvider == null) { + sccInformationProvider = new IncSCCAlg<>(dependencyGraph); + } + } +} diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timeless/TimelessCommunicationTracker.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timeless/TimelessCommunicationTracker.java index 77f40fc3..02116d71 100644 --- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timeless/TimelessCommunicationTracker.java +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timeless/TimelessCommunicationTracker.java @@ -1,5 +1,6 @@ /******************************************************************************* * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro + * Copyright (c) 2023 The Refinery Authors * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at * http://www.eclipse.org/legal/epl-v20.html. @@ -8,11 +9,7 @@ *******************************************************************************/ package tools.refinery.interpreter.rete.network.communication.timeless; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.Map.Entry; - +import org.apache.log4j.Logger; import tools.refinery.interpreter.rete.index.DualInputNode; import tools.refinery.interpreter.rete.index.Indexer; import tools.refinery.interpreter.rete.index.IndexerListener; @@ -26,6 +23,11 @@ import tools.refinery.interpreter.rete.network.communication.MessageSelector; import tools.refinery.interpreter.rete.network.mailbox.Mailbox; import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChangingMailbox; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + /** * Timeless implementation of the communication tracker. * @@ -33,10 +35,13 @@ import tools.refinery.interpreter.rete.network.mailbox.timeless.BehaviorChanging * @since 2.2 */ public class TimelessCommunicationTracker extends CommunicationTracker { + public TimelessCommunicationTracker(Logger logger) { + super(logger); + } @Override protected CommunicationGroup createGroup(Node representative, int index) { - final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; + final boolean isSingleton = isSingleton(representative); final boolean isReceiver = representative instanceof Receiver; final boolean isPosetIndifferent = isReceiver && ((Receiver) representative).getMailbox() instanceof BehaviorChangingMailbox; @@ -95,14 +100,13 @@ public class TimelessCommunicationTracker extends CommunicationTracker { final Mailbox mailbox = ((Receiver) node).getMailbox(); if (mailbox instanceof BehaviorChangingMailbox) { final CommunicationGroup group = this.groupMap.get(node); - final Set sccNodes = this.sccInformationProvider.sccs.getPartition(node); // a default mailbox must split its messages iff // (1) its receiver is in a recursive group and final boolean c1 = group.isRecursive(); // (2) its receiver is at the SCC boundary of that group final boolean c2 = isAtSCCBoundary(node); // (3) its group consists of more than one node - final boolean c3 = sccNodes.size() > 1; + final boolean c3 = isSingleton(node); ((BehaviorChangingMailbox) mailbox).setSplitFlag(c1 && c2 && c3); } } diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timely/TimelyCommunicationTracker.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timely/TimelyCommunicationTracker.java index a0cdc701..6a92ac91 100644 --- a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timely/TimelyCommunicationTracker.java +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/network/communication/timely/TimelyCommunicationTracker.java @@ -1,5 +1,6 @@ /******************************************************************************* * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro + * Copyright (c) 2023 The Refinery Authors * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at * http://www.eclipse.org/legal/epl-v20.html. @@ -8,20 +9,14 @@ *******************************************************************************/ package tools.refinery.interpreter.rete.network.communication.timely; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.function.Function; - -import tools.refinery.interpreter.rete.itc.alg.misc.topsort.TopologicalSorting; -import tools.refinery.interpreter.rete.itc.graphimpl.Graph; +import org.apache.log4j.Logger; import tools.refinery.interpreter.matchers.util.CollectionsFactory; import tools.refinery.interpreter.rete.index.IndexerListener; import tools.refinery.interpreter.rete.index.SpecializedProjectionIndexer; import tools.refinery.interpreter.rete.index.SpecializedProjectionIndexer.ListenerSubscription; import tools.refinery.interpreter.rete.index.StandardIndexer; +import tools.refinery.interpreter.rete.itc.alg.misc.topsort.TopologicalSorting; +import tools.refinery.interpreter.rete.itc.graphimpl.Graph; import tools.refinery.interpreter.rete.matcher.TimelyConfiguration; import tools.refinery.interpreter.rete.matcher.TimelyConfiguration.TimelineRepresentation; import tools.refinery.interpreter.rete.network.NetworkStructureChangeSensitiveNode; @@ -35,6 +30,13 @@ import tools.refinery.interpreter.rete.network.communication.NodeComparator; import tools.refinery.interpreter.rete.network.mailbox.Mailbox; import tools.refinery.interpreter.rete.single.DiscriminatorDispatcherNode; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.function.Function; + /** * Timely (DDF) implementation of the {@link CommunicationTracker}. * @@ -45,13 +47,14 @@ public class TimelyCommunicationTracker extends CommunicationTracker { protected final TimelyConfiguration configuration; - public TimelyCommunicationTracker(final TimelyConfiguration configuration) { + public TimelyCommunicationTracker(final Logger logger, final TimelyConfiguration configuration) { + super(logger); this.configuration = configuration; } @Override protected CommunicationGroup createGroup(final Node representative, final int index) { - final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; + final boolean isSingleton = isSingleton(representative); return new TimelyCommunicationGroup(this, representative, index, isSingleton); } @@ -130,8 +133,8 @@ public class TimelyCommunicationTracker extends CommunicationTracker { protected void postProcessGroup(final CommunicationGroup group) { if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { final Node representative = group.getRepresentative(); - final Set groupMembers = this.sccInformationProvider.sccs.getPartition(representative); - if (groupMembers.size() > 1) { + final Set groupMembers = getPartition(representative); + if (groupMembers != null && groupMembers.size() > 1) { final Graph graph = new Graph(); for (final Node node : groupMembers) { -- cgit v1.2.3-54-g00ecf