diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java new file mode 100644 index 00000000..b261d19d --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/network/ConnectionFactory.java | |||
@@ -0,0 +1,171 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2010-2014, Bergmann Gabor, Istvan Rath and Daniel Varro | ||
3 | * Copyright (c) 2023 The Refinery Authors <https://refinery.tools> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * | ||
8 | * SPDX-License-Identifier: EPL-2.0 | ||
9 | *******************************************************************************/ | ||
10 | package tools.refinery.viatra.runtime.rete.network; | ||
11 | |||
12 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
13 | import tools.refinery.viatra.runtime.rete.aggregation.IndexerBasedAggregatorNode; | ||
14 | import tools.refinery.viatra.runtime.rete.boundary.InputConnector; | ||
15 | import tools.refinery.viatra.runtime.rete.eval.RelationEvaluatorNode; | ||
16 | import tools.refinery.viatra.runtime.rete.index.DualInputNode; | ||
17 | import tools.refinery.viatra.runtime.rete.index.Indexer; | ||
18 | import tools.refinery.viatra.runtime.rete.index.IterableIndexer; | ||
19 | import tools.refinery.viatra.runtime.rete.index.ProjectionIndexer; | ||
20 | import tools.refinery.viatra.runtime.rete.recipes.*; | ||
21 | import tools.refinery.viatra.runtime.rete.remote.Address; | ||
22 | import tools.refinery.viatra.runtime.rete.traceability.RecipeTraceInfo; | ||
23 | |||
24 | import java.util.ArrayList; | ||
25 | import java.util.Collection; | ||
26 | import java.util.List; | ||
27 | |||
28 | /** | ||
29 | * Class responsible for connecting freshly instantiating Rete nodes to their parents. | ||
30 | * | ||
31 | * @author Bergmann Gabor | ||
32 | * | ||
33 | */ | ||
34 | class ConnectionFactory { | ||
35 | ReteContainer reteContainer; | ||
36 | |||
37 | public ConnectionFactory(ReteContainer reteContainer) { | ||
38 | super(); | ||
39 | this.reteContainer = reteContainer; | ||
40 | } | ||
41 | |||
42 | // TODO move to node implementation instead? | ||
43 | private boolean isStateful(ReteNodeRecipe recipe) { | ||
44 | return recipe instanceof ProjectionIndexerRecipe || recipe instanceof IndexerBasedAggregatorRecipe | ||
45 | || recipe instanceof SingleColumnAggregatorRecipe || recipe instanceof ExpressionEnforcerRecipe | ||
46 | || recipe instanceof TransitiveClosureRecipe || recipe instanceof ProductionRecipe | ||
47 | || recipe instanceof UniquenessEnforcerRecipe || recipe instanceof RelationEvaluationRecipe; | ||
48 | |||
49 | } | ||
50 | |||
51 | /** | ||
52 | * PRE: nodes for parent recipes must already be created and registered | ||
53 | * <p> | ||
54 | * PRE: must not be an input node (for which {@link InputConnector} is responsible) | ||
55 | */ | ||
56 | public void connectToParents(RecipeTraceInfo recipeTrace, Node freshNode) { | ||
57 | final ReteNodeRecipe recipe = recipeTrace.getRecipe(); | ||
58 | if (recipe instanceof ConstantRecipe) { | ||
59 | // NO-OP | ||
60 | } else if (recipe instanceof InputRecipe) { | ||
61 | throw new IllegalArgumentException( | ||
62 | ConnectionFactory.class.getSimpleName() + " not intended for input connection: " + recipe); | ||
63 | } else if (recipe instanceof SingleParentNodeRecipe) { | ||
64 | final Receiver receiver = (Receiver) freshNode; | ||
65 | ReteNodeRecipe parentRecipe = ((SingleParentNodeRecipe) recipe).getParent(); | ||
66 | connectToParent(recipe, receiver, parentRecipe); | ||
67 | } else if (recipe instanceof RelationEvaluationRecipe) { | ||
68 | List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents(); | ||
69 | List<Supplier> parentSuppliers = new ArrayList<Supplier>(); | ||
70 | for (final ReteNodeRecipe parentRecipe : parentRecipes) { | ||
71 | parentSuppliers.add(getSupplierForRecipe(parentRecipe)); | ||
72 | } | ||
73 | ((RelationEvaluatorNode) freshNode).connectToParents(parentSuppliers); | ||
74 | } else if (recipe instanceof BetaRecipe) { | ||
75 | final DualInputNode beta = (DualInputNode) freshNode; | ||
76 | final ArrayList<RecipeTraceInfo> parentTraces = new ArrayList<RecipeTraceInfo>( | ||
77 | recipeTrace.getParentRecipeTraces()); | ||
78 | Slots slots = avoidActiveNodeConflict(parentTraces.get(0), parentTraces.get(1)); | ||
79 | beta.connectToIndexers(slots.primary, slots.secondary); | ||
80 | } else if (recipe instanceof IndexerBasedAggregatorRecipe) { | ||
81 | final IndexerBasedAggregatorNode aggregator = (IndexerBasedAggregatorNode) freshNode; | ||
82 | final IndexerBasedAggregatorRecipe aggregatorRecipe = (IndexerBasedAggregatorRecipe) recipe; | ||
83 | aggregator.initializeWith((ProjectionIndexer) resolveIndexer(aggregatorRecipe.getParent())); | ||
84 | } else if (recipe instanceof MultiParentNodeRecipe) { | ||
85 | final Receiver receiver = (Receiver) freshNode; | ||
86 | List<ReteNodeRecipe> parentRecipes = ((MultiParentNodeRecipe) recipe).getParents(); | ||
87 | for (ReteNodeRecipe parentRecipe : parentRecipes) { | ||
88 | connectToParent(recipe, receiver, parentRecipe); | ||
89 | } | ||
90 | } | ||
91 | } | ||
92 | |||
93 | private Indexer resolveIndexer(final IndexerRecipe indexerRecipe) { | ||
94 | final Address<? extends Node> address = reteContainer.getNetwork().getExistingNodeByRecipe(indexerRecipe); | ||
95 | return (Indexer) reteContainer.resolveLocal(address); | ||
96 | } | ||
97 | |||
98 | private void connectToParent(ReteNodeRecipe recipe, Receiver freshNode, ReteNodeRecipe parentRecipe) { | ||
99 | final Supplier parentSupplier = getSupplierForRecipe(parentRecipe); | ||
100 | |||
101 | // special synch | ||
102 | if (freshNode instanceof ReinitializedNode) { | ||
103 | Collection<Tuple> tuples = new ArrayList<Tuple>(); | ||
104 | parentSupplier.pullInto(tuples, true); | ||
105 | ((ReinitializedNode) freshNode).reinitializeWith(tuples); | ||
106 | reteContainer.connect(parentSupplier, freshNode); | ||
107 | } else { // default case | ||
108 | // stateless nodes do not have to be synced with contents UNLESS they already have children (recursive | ||
109 | // corner case) | ||
110 | if (isStateful(recipe) | ||
111 | || ((freshNode instanceof Supplier) && !((Supplier) freshNode).getReceivers().isEmpty())) { | ||
112 | reteContainer.connectAndSynchronize(parentSupplier, freshNode); | ||
113 | } else { | ||
114 | // stateless node, no synch | ||
115 | reteContainer.connect(parentSupplier, freshNode); | ||
116 | } | ||
117 | } | ||
118 | } | ||
119 | |||
120 | private Supplier getSupplierForRecipe(ReteNodeRecipe recipe) { | ||
121 | @SuppressWarnings("unchecked") | ||
122 | final Address<? extends Supplier> parentAddress = (Address<? extends Supplier>) reteContainer.getNetwork() | ||
123 | .getExistingNodeByRecipe(recipe); | ||
124 | final Supplier supplier = reteContainer.getProvisioner().asSupplier(parentAddress); | ||
125 | return supplier; | ||
126 | } | ||
127 | |||
128 | /** | ||
129 | * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of | ||
130 | * the two indexers is supported. | ||
131 | * | ||
132 | * @return a replacement for the secondary Indexers, if needed | ||
133 | */ | ||
134 | private Slots avoidActiveNodeConflict(final RecipeTraceInfo primarySlot, final RecipeTraceInfo secondarySlot) { | ||
135 | Slots result = new Slots() { | ||
136 | { | ||
137 | primary = (IterableIndexer) resolveIndexer((ProjectionIndexerRecipe) primarySlot.getRecipe()); | ||
138 | secondary = resolveIndexer((IndexerRecipe) secondarySlot.getRecipe()); | ||
139 | } | ||
140 | }; | ||
141 | if (activeNodeConflict(result.primary, result.secondary)) | ||
142 | if (result.secondary instanceof IterableIndexer) | ||
143 | result.secondary = resolveActiveIndexer(secondarySlot); | ||
144 | else | ||
145 | result.primary = (IterableIndexer) resolveActiveIndexer(primarySlot); | ||
146 | return result; | ||
147 | } | ||
148 | |||
149 | private Indexer resolveActiveIndexer(final RecipeTraceInfo inactiveIndexerTrace) { | ||
150 | final RecipeTraceInfo activeIndexerTrace = reteContainer.getProvisioner() | ||
151 | .accessActiveIndexer(inactiveIndexerTrace); | ||
152 | reteContainer.getProvisioner().getOrCreateNodeByRecipe(activeIndexerTrace); | ||
153 | return resolveIndexer((ProjectionIndexerRecipe) activeIndexerTrace.getRecipe()); | ||
154 | } | ||
155 | |||
156 | private static class Slots { | ||
157 | IterableIndexer primary; | ||
158 | Indexer secondary; | ||
159 | } | ||
160 | |||
161 | /** | ||
162 | * If two indexers share their active node, joining them via DualInputNode is error-prone. Exception: coincidence of | ||
163 | * the two indexers is supported. | ||
164 | * | ||
165 | * @return true if there is a conflict of active nodes. | ||
166 | */ | ||
167 | private boolean activeNodeConflict(Indexer primarySlot, Indexer secondarySlot) { | ||
168 | return !primarySlot.equals(secondarySlot) && primarySlot.getActiveNode().equals(secondarySlot.getActiveNode()); | ||
169 | } | ||
170 | |||
171 | } | ||