aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java171
1 files changed, 171 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java
new file mode 100644
index 00000000..b261d19d
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java
@@ -0,0 +1,171 @@
1/*******************************************************************************
2 * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro
3 * Copyright (c) 2023 The Refinery Authors <https://refinery.tools>
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v. 2.0 which is available at
6 * http://www.eclipse.org/legal/epl-v20.html.
7 *
8 * SPDX-License-Identifier: EPL-2.0
9 *******************************************************************************/
10package tools.refinery.viatra.runtime.rete.network;
11
12import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
13import tools.refinery.viatra.runtime.rete.aggregation.IndexerBasedAggregatorNode;
14import tools.refinery.viatra.runtime.rete.boundary.InputConnector;
15import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode;
16import tools.refinery.viatra.runtime.rete.index.DualInputNode;
17import tools.refinery.viatra.runtime.rete.index.Indexer;
18import tools.refinery.viatra.runtime.rete.index.IterableIndexer;
19import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer;
20import tools.refinery.viatra.runtime.rete.recipes.*;
21import tools.refinery.viatra.runtime.rete.remote.Address;
22import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo;
23
24import java.util.ArrayList;
25import java.util.Collection;
26import java.util.List;
27
28/**
29 * Class responsible for connecting freshly instantiating Rete nodes to their parents.
30 *
31 * @author Bergmann Gabor
32 *
33 */
34class ConnectionFactory {
35 ReteContainer reteContainer;
36
37 public ConnectionFactory(ReteContainer reteContainer) {
38 super();
39 this.reteContainer = reteContainer;
40 }
41
42 // TODO move to node implementation instead?
43 private boolean isStateful(ReteNodeRecipe recipe) {
44 return recipe instanceof ProjectionIndexerRecipe || recipe instanceof IndexerBasedAggregatorRecipe
45 || recipe instanceof SingleColumnAggregatorRecipe || recipe instanceof ExpressionEnforcerRecipe
46 || recipe instanceof TransitiveClosureRecipe || recipe instanceof ProductionRecipe
47 || recipe instanceof UniquenessEnforcerRecipe || recipe instanceof RelationEvaluationRecipe;
48
49 }
50
51 /**
52 * PRE: nodes for parent recipes must already be created and registered
53 * <p>
54 * PRE: must not be an input node (for which {@link InputConnector} is responsible)
55 */
56 public void connectToParents(RecipeTraceInfo recipeTrace, Node freshNode) {
57 final ReteNodeRecipe recipe = recipeTrace.getRecipe();
58 if (recipe instanceof ConstantRecipe) {
59 // NO-OP
60 } else if (recipe instanceof InputRecipe) {
61 throw new IllegalArgumentException(
62 ConnectionFactory.class.getSimpleName() + " not intended for input connection: " + recipe);
63 } else if (recipe instanceof SingleParentNodeRecipe) {
64 final Receiver receiver = (Receiver) freshNode;
65 ReteNodeRecipe parentRecipe = ((SingleParentNodeRecipe) recipe).getParent();
66 connectToParent(recipe, receiver, parentRecipe);
67 } else if (recipe instanceof RelationEvaluationRecipe) {
68 List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents();
69 List<Supplier> parentSuppliers = new ArrayList<Supplier>();
70 for (final ReteNodeRecipe parentRecipe : parentRecipes) {
71 parentSuppliers.add(getSupplierForRecipe(parentRecipe));
72 }
73 ((RelationEvaluatorNode) freshNode).connectToParents(parentSuppliers);
74 } else if (recipe instanceof BetaRecipe) {
75 final DualInputNode beta = (DualInputNode) freshNode;
76 final ArrayList<RecipeTraceInfo> parentTraces = new ArrayList<RecipeTraceInfo>(
77 recipeTrace.getParentRecipeTraces());
78 Slots slots = avoidActiveNodeConflict(parentTraces.get(0), parentTraces.get(1));
79 beta.connectToIndexers(slots.primary, slots.secondary);
80 } else if (recipe instanceof IndexerBasedAggregatorRecipe) {
81 final IndexerBasedAggregatorNode aggregator = (IndexerBasedAggregatorNode) freshNode;
82 final IndexerBasedAggregatorRecipe aggregatorRecipe = (IndexerBasedAggregatorRecipe) recipe;
83 aggregator.initializeWith((ProjectionIndexer) resolveIndexer(aggregatorRecipe.getParent()));
84 } else if (recipe instanceof MultiParentNodeRecipe) {
85 final Receiver receiver = (Receiver) freshNode;
86 List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents();
87 for (ReteNodeRecipe parentRecipe : parentRecipes) {
88 connectToParent(recipe, receiver, parentRecipe);
89 }
90 }
91 }
92
93 private Indexer resolveIndexer(final IndexerRecipe indexerRecipe) {
94 final Address<? extends Node> address = reteContainer.getNetwork().getExistingNodeByRecipe(indexerRecipe);
95 return (Indexer) reteContainer.resolveLocal(address);
96 }
97
98 private void connectToParent(ReteNodeRecipe recipe, Receiver freshNode, ReteNodeRecipe parentRecipe) {
99 final Supplier parentSupplier = getSupplierForRecipe(parentRecipe);
100
101 // special synch
102 if (freshNode instanceof ReinitializedNode) {
103 Collection<Tuple> tuples = new ArrayList<Tuple>();
104 parentSupplier.pullInto(tuples, true);
105 ((ReinitializedNode) freshNode).reinitializeWith(tuples);
106 reteContainer.connect(parentSupplier, freshNode);
107 } else { // default case
108 // stateless nodes do not have to be synced with contents UNLESS they already have children (recursive
109 // corner case)
110 if (isStateful(recipe)
111 || ((freshNode instanceof Supplier) && !((Supplier) freshNode).getReceivers().isEmpty())) {
112 reteContainer.connectAndSynchronize(parentSupplier, freshNode);
113 } else {
114 // stateless node, no synch
115 reteContainer.connect(parentSupplier, freshNode);
116 }
117 }
118 }
119
120 private Supplier getSupplierForRecipe(ReteNodeRecipe recipe) {
121 @SuppressWarnings("unchecked")
122 final Address<? extends Supplier> parentAddress = (Address<? extends Supplier>) reteContainer.getNetwork()
123 .getExistingNodeByRecipe(recipe);
124 final Supplier supplier = reteContainer.getProvisioner().asSupplier(parentAddress);
125 return supplier;
126 }
127
128 /**
129 * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of
130 * the two indexers is supported.
131 *
132 * @return a replacement for the secondary Indexers, if needed
133 */
134 private Slots avoidActiveNodeConflict(final RecipeTraceInfo primarySlot, final RecipeTraceInfo secondarySlot) {
135 Slots result = new Slots() {
136 {
137 primary = (IterableIndexer) resolveIndexer((ProjectionIndexerRecipe) primarySlot.getRecipe());
138 secondary = resolveIndexer((IndexerRecipe) secondarySlot.getRecipe());
139 }
140 };
141 if (activeNodeConflict(result.primary, result.secondary))
142 if (result.secondary instanceof IterableIndexer)
143 result.secondary = resolveActiveIndexer(secondarySlot);
144 else
145 result.primary = (IterableIndexer) resolveActiveIndexer(primarySlot);
146 return result;
147 }
148
149 private Indexer resolveActiveIndexer(final RecipeTraceInfo inactiveIndexerTrace) {
150 final RecipeTraceInfo activeIndexerTrace = reteContainer.getProvisioner()
151 .accessActiveIndexer(inactiveIndexerTrace);
152 reteContainer.getProvisioner().getOrCreateNodeByRecipe(activeIndexerTrace);
153 return resolveIndexer((ProjectionIndexerRecipe) activeIndexerTrace.getRecipe());
154 }
155
156 private static class Slots {
157 IterableIndexer primary;
158 Indexer secondary;
159 }
160
161 /**
162 * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of
163 * the two indexers is supported.
164 *
165 * @return true if there is a conflict of active nodes.
166 */
167 private boolean activeNodeConflict(Indexer primarySlot, Indexer secondarySlot) {
168 return !primarySlot.equals(secondarySlot) && primarySlot.getActiveNode().equals(secondarySlot.getActiveNode());
169 }
170
171}