diff options
Diffstat (limited to 'subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/LeftJoinNode.java')
-rw-r--r-- | subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/LeftJoinNode.java | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/LeftJoinNode.java b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/LeftJoinNode.java new file mode 100644 index 00000000..9871e3bc --- /dev/null +++ b/subprojects/interpreter-rete/src/main/java/tools/refinery/interpreter/rete/aggregation/LeftJoinNode.java | |||
@@ -0,0 +1,167 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2009 Gabor Bergmann and Daniel Varro | ||
3 | * Copyright (c) 2024 The Refinery Authors <https://refinery.tools/> | ||
4 | * This program and the accompanying materials are made available under the | ||
5 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
6 | * http://www.eclipse.org/legal/epl-v20.html. | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | package tools.refinery.interpreter.rete.aggregation; | ||
10 | |||
11 | import tools.refinery.interpreter.matchers.tuple.ITuple; | ||
12 | import tools.refinery.interpreter.matchers.tuple.Tuple; | ||
13 | import tools.refinery.interpreter.matchers.tuple.TupleMask; | ||
14 | import tools.refinery.interpreter.matchers.tuple.Tuples; | ||
15 | import tools.refinery.interpreter.matchers.util.Direction; | ||
16 | import tools.refinery.interpreter.matchers.util.timeline.Timeline; | ||
17 | import tools.refinery.interpreter.rete.index.DefaultIndexerListener; | ||
18 | import tools.refinery.interpreter.rete.index.Indexer; | ||
19 | import tools.refinery.interpreter.rete.index.ProjectionIndexer; | ||
20 | import tools.refinery.interpreter.rete.index.StandardIndexer; | ||
21 | import tools.refinery.interpreter.rete.network.Node; | ||
22 | import tools.refinery.interpreter.rete.network.ReteContainer; | ||
23 | import tools.refinery.interpreter.rete.network.StandardNode; | ||
24 | import tools.refinery.interpreter.rete.network.communication.Timestamp; | ||
25 | |||
26 | import java.util.Collection; | ||
27 | import java.util.List; | ||
28 | import java.util.Map; | ||
29 | import java.util.Set; | ||
30 | |||
31 | public class LeftJoinNode extends StandardNode { | ||
32 | private final Object defaultValue; | ||
33 | private ProjectionIndexer projectionIndexer; | ||
34 | private TupleMask projectionMask; | ||
35 | private boolean leftInheritanceOutputMask; | ||
36 | private OuterIndexer outerIndexer = null; | ||
37 | |||
38 | public LeftJoinNode(ReteContainer reteContainer, Object defaultValue) { | ||
39 | super(reteContainer); | ||
40 | this.defaultValue = defaultValue; | ||
41 | } | ||
42 | |||
43 | @Override | ||
44 | public void networkStructureChanged() { | ||
45 | if (reteContainer.isTimelyEvaluation() && reteContainer.getCommunicationTracker().isInRecursiveGroup(this)) { | ||
46 | throw new IllegalStateException(this + " cannot be used in recursive differential dataflow evaluation!"); | ||
47 | } | ||
48 | super.networkStructureChanged(); | ||
49 | } | ||
50 | |||
51 | public void initializeWith(ProjectionIndexer projectionIndexer) { | ||
52 | this.projectionIndexer = projectionIndexer; | ||
53 | projectionMask = projectionIndexer.getMask(); | ||
54 | leftInheritanceOutputMask = isLeftInheritanceOutputMask(projectionMask); | ||
55 | projectionIndexer.attachListener(new DefaultIndexerListener(this) { | ||
56 | @Override | ||
57 | public void notifyIndexerUpdate(Direction direction, Tuple updateElement, Tuple signature, boolean change, | ||
58 | Timestamp timestamp) { | ||
59 | update(direction, updateElement, signature, change, timestamp); | ||
60 | } | ||
61 | }); | ||
62 | } | ||
63 | |||
64 | private static boolean isLeftInheritanceOutputMask(TupleMask mask) { | ||
65 | int size = mask.getSize(); | ||
66 | int sourceWidth = mask.getSourceWidth(); | ||
67 | if (size != sourceWidth - 1) { | ||
68 | throw new IllegalArgumentException("projectionMask should omit a single index, got " + mask); | ||
69 | } | ||
70 | int[] repetitions = new int[sourceWidth]; | ||
71 | for (int i = 0; i < size; i++) { | ||
72 | int index = mask.indices[i]; | ||
73 | int repetition = repetitions[index] + 1; | ||
74 | if (repetition >= 2) { | ||
75 | throw new IllegalArgumentException("Repeated index %d in projectionMask %s".formatted(index, mask)); | ||
76 | } | ||
77 | repetitions[index] = repetition; | ||
78 | } | ||
79 | for (int i = 0; i < size; i++) { | ||
80 | int index = mask.indices[i]; | ||
81 | if (index != i) { | ||
82 | return false; | ||
83 | } | ||
84 | } | ||
85 | return true; | ||
86 | } | ||
87 | |||
88 | protected void update(Direction direction, Tuple updateElement, Tuple signature, boolean change, | ||
89 | Timestamp timestamp) { | ||
90 | propagateUpdate(direction, updateElement, timestamp); | ||
91 | if (outerIndexer != null) { | ||
92 | outerIndexer.update(direction, updateElement, signature, change, timestamp); | ||
93 | } | ||
94 | } | ||
95 | |||
96 | protected Tuple getDefaultTuple(ITuple key) { | ||
97 | if (leftInheritanceOutputMask) { | ||
98 | return Tuples.staticArityFlatTupleOf(key, defaultValue); | ||
99 | } | ||
100 | var objects = new Object[projectionMask.sourceWidth]; | ||
101 | int targetLength = projectionMask.indices.length; | ||
102 | for (int i = 0; i < targetLength; i++) { | ||
103 | int j = projectionMask.indices[i]; | ||
104 | objects[j] = key.get(j); | ||
105 | } | ||
106 | return Tuples.flatTupleOf(objects); | ||
107 | } | ||
108 | |||
109 | @Override | ||
110 | public void pullInto(Collection<Tuple> collector, boolean flush) { | ||
111 | projectionIndexer.getParent().pullInto(collector, flush); | ||
112 | } | ||
113 | |||
114 | @Override | ||
115 | public void pullIntoWithTimeline(Map<Tuple, Timeline<Timestamp>> collector, boolean flush) { | ||
116 | projectionIndexer.getParent().pullIntoWithTimeline(collector, flush); | ||
117 | } | ||
118 | |||
119 | @Override | ||
120 | public Set<Tuple> getPulledContents(boolean flush) { | ||
121 | return projectionIndexer.getParent().getPulledContents(flush); | ||
122 | } | ||
123 | |||
124 | public Indexer getOuterIndexer() { | ||
125 | if (outerIndexer == null) { | ||
126 | outerIndexer = new OuterIndexer(); | ||
127 | getCommunicationTracker().registerDependency(this, outerIndexer); | ||
128 | } | ||
129 | return outerIndexer; | ||
130 | } | ||
131 | |||
132 | /** | ||
133 | * A special non-iterable index that retrieves the aggregated, packed result (signature+aggregate) for the original | ||
134 | * signature. | ||
135 | * | ||
136 | * @author Gabor Bergmann | ||
137 | */ | ||
138 | class OuterIndexer extends StandardIndexer { | ||
139 | public OuterIndexer() { | ||
140 | super(LeftJoinNode.this.reteContainer, LeftJoinNode.this.projectionMask); | ||
141 | this.parent = LeftJoinNode.this; | ||
142 | } | ||
143 | |||
144 | @Override | ||
145 | public Collection<Tuple> get(Tuple signature) { | ||
146 | var collection = projectionIndexer.get(signature); | ||
147 | if (collection == null || collection.isEmpty()) { | ||
148 | return List.of(getDefaultTuple(signature)); | ||
149 | } | ||
150 | return collection; | ||
151 | } | ||
152 | |||
153 | public void update(Direction direction, Tuple updateElement, Tuple signature, boolean change, | ||
154 | Timestamp timestamp) { | ||
155 | propagate(direction, updateElement, signature, false, timestamp); | ||
156 | if (change) { | ||
157 | var defaultTuple = getDefaultTuple(signature); | ||
158 | propagate(direction.opposite(), defaultTuple, signature, false, timestamp); | ||
159 | } | ||
160 | } | ||
161 | |||
162 | @Override | ||
163 | public Node getActiveNode() { | ||
164 | return projectionIndexer.getActiveNode(); | ||
165 | } | ||
166 | } | ||
167 | } | ||