diff options
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java')
-rw-r--r-- | subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java new file mode 100644 index 00000000..9a6a0de9 --- /dev/null +++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java | |||
@@ -0,0 +1,193 @@ | |||
1 | /******************************************************************************* | ||
2 | * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro | ||
3 | * This program and the accompanying materials are made available under the | ||
4 | * terms of the Eclipse Public License v. 2.0 which is available at | ||
5 | * http://www.eclipse.org/legal/epl-v20.html. | ||
6 | * | ||
7 | * SPDX-License-Identifier: EPL-2.0 | ||
8 | *******************************************************************************/ | ||
9 | |||
10 | package tools.refinery.viatra.runtime.rete.index; | ||
11 | |||
12 | import java.util.Collection; | ||
13 | import java.util.Map; | ||
14 | |||
15 | import tools.refinery.viatra.runtime.matchers.tuple.Tuple; | ||
16 | import tools.refinery.viatra.runtime.matchers.tuple.TupleMask; | ||
17 | import tools.refinery.viatra.runtime.matchers.util.Direction; | ||
18 | import tools.refinery.viatra.runtime.matchers.util.Signed; | ||
19 | import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline; | ||
20 | import tools.refinery.viatra.runtime.rete.network.ReteContainer; | ||
21 | import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; | ||
22 | |||
23 | /** | ||
24 | * @author Gabor Bergmann | ||
25 | * | ||
26 | */ | ||
27 | public class JoinNode extends DualInputNode { | ||
28 | |||
29 | public JoinNode(final ReteContainer reteContainer, final TupleMask complementerSecondaryMask) { | ||
30 | super(reteContainer, complementerSecondaryMask); | ||
31 | this.logic = createLogic(); | ||
32 | } | ||
33 | |||
34 | @Override | ||
35 | public Tuple calibrate(final Tuple primary, final Tuple secondary) { | ||
36 | return unify(primary, secondary); | ||
37 | } | ||
38 | |||
39 | private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() { | ||
40 | |||
41 | @Override | ||
42 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
43 | throw new UnsupportedOperationException(); | ||
44 | } | ||
45 | |||
46 | @Override | ||
47 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
48 | if (primarySlot == null || secondarySlot == null) { | ||
49 | return; | ||
50 | } | ||
51 | |||
52 | if (flush) { | ||
53 | reteContainer.flushUpdates(); | ||
54 | } | ||
55 | |||
56 | for (final Tuple signature : primarySlot.getSignatures()) { | ||
57 | // primaries can not be null due to the contract of IterableIndex.getSignatures() | ||
58 | final Collection<Tuple> primaries = primarySlot.get(signature); | ||
59 | final Collection<Tuple> opposites = secondarySlot.get(signature); | ||
60 | if (opposites != null) { | ||
61 | for (final Tuple primary : primaries) { | ||
62 | for (final Tuple opposite : opposites) { | ||
63 | collector.add(unify(primary, opposite)); | ||
64 | } | ||
65 | } | ||
66 | } | ||
67 | } | ||
68 | } | ||
69 | |||
70 | @Override | ||
71 | public void notifyUpdate(final Side side, final Direction direction, final Tuple updateElement, | ||
72 | final Tuple signature, final boolean change, final Timestamp timestamp) { | ||
73 | // in the default case, all timestamps must be zero | ||
74 | assert Timestamp.ZERO.equals(timestamp); | ||
75 | |||
76 | final Collection<Tuple> opposites = retrieveOpposites(side, signature); | ||
77 | |||
78 | if (!coincidence) { | ||
79 | if (opposites != null) { | ||
80 | for (final Tuple opposite : opposites) { | ||
81 | propagateUpdate(direction, unify(side, updateElement, opposite), timestamp); | ||
82 | } | ||
83 | } | ||
84 | } else { | ||
85 | // compensate for coincidence of slots - this is the case when an Indexer is joined with itself | ||
86 | if (opposites != null) { | ||
87 | for (final Tuple opposite : opposites) { | ||
88 | if (opposite.equals(updateElement)) { | ||
89 | // handle self-joins of a single tuple separately | ||
90 | continue; | ||
91 | } | ||
92 | propagateUpdate(direction, unify(opposite, updateElement), timestamp); | ||
93 | propagateUpdate(direction, unify(updateElement, opposite), timestamp); | ||
94 | } | ||
95 | } | ||
96 | |||
97 | // handle self-joins here | ||
98 | propagateUpdate(direction, unify(updateElement, updateElement), timestamp); | ||
99 | } | ||
100 | } | ||
101 | }; | ||
102 | |||
103 | private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() { | ||
104 | |||
105 | @Override | ||
106 | public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) { | ||
107 | if (primarySlot == null || secondarySlot == null) { | ||
108 | return; | ||
109 | } | ||
110 | |||
111 | if (flush) { | ||
112 | reteContainer.flushUpdates(); | ||
113 | } | ||
114 | |||
115 | for (final Tuple signature : primarySlot.getSignatures()) { | ||
116 | // primaries can not be null due to the contract of IterableIndex.getSignatures() | ||
117 | final Map<Tuple, Timeline<Timestamp>> primaries = getTimeline(signature, primarySlot); | ||
118 | final Map<Tuple, Timeline<Timestamp>> opposites = getTimeline(signature, secondarySlot); | ||
119 | if (opposites != null) { | ||
120 | for (final Tuple primary : primaries.keySet()) { | ||
121 | for (final Tuple opposite : opposites.keySet()) { | ||
122 | final Timeline<Timestamp> primaryTimeline = primaries.get(primary); | ||
123 | final Timeline<Timestamp> oppositeTimeline = opposites.get(opposite); | ||
124 | final Timeline<Timestamp> mergedTimeline = primaryTimeline | ||
125 | .mergeMultiplicative(oppositeTimeline); | ||
126 | if (!mergedTimeline.isEmpty()) { | ||
127 | collector.put(unify(primary, opposite), mergedTimeline); | ||
128 | } | ||
129 | } | ||
130 | } | ||
131 | } | ||
132 | } | ||
133 | } | ||
134 | |||
135 | @Override | ||
136 | public void pullInto(final Collection<Tuple> collector, final boolean flush) { | ||
137 | JoinNode.this.TIMELESS.pullInto(collector, flush); | ||
138 | } | ||
139 | |||
140 | @Override | ||
141 | public void notifyUpdate(final Side side, final Direction direction, final Tuple updateElement, | ||
142 | final Tuple signature, final boolean change, final Timestamp timestamp) { | ||
143 | final Indexer oppositeIndexer = getSlot(side.opposite()); | ||
144 | final Map<Tuple, Timeline<Timestamp>> opposites = getTimeline(signature, oppositeIndexer); | ||
145 | |||
146 | if (!coincidence) { | ||
147 | if (opposites != null) { | ||
148 | for (final Tuple opposite : opposites.keySet()) { | ||
149 | final Tuple unifiedTuple = unify(side, updateElement, opposite); | ||
150 | for (final Signed<Timestamp> signed : opposites.get(opposite).asChangeSequence()) { | ||
151 | // TODO only consider signed timestamps that are greater or equal to timestamp | ||
152 | // plus compact the previous timestamps into at most one update | ||
153 | propagateUpdate(signed.getDirection().multiply(direction), unifiedTuple, | ||
154 | timestamp.max(signed.getPayload())); | ||
155 | } | ||
156 | } | ||
157 | } | ||
158 | } else { | ||
159 | // compensate for coincidence of slots - this is the case when an Indexer is joined with itself | ||
160 | if (opposites != null) { | ||
161 | for (final Tuple opposite : opposites.keySet()) { | ||
162 | if (opposite.equals(updateElement)) { | ||
163 | // handle self-joins of a single tuple separately | ||
164 | continue; | ||
165 | } | ||
166 | final Tuple u1 = unify(opposite, updateElement); | ||
167 | final Tuple u2 = unify(updateElement, opposite); | ||
168 | for (final Signed<Timestamp> oppositeSigned : opposites.get(opposite).asChangeSequence()) { | ||
169 | final Direction updateDirection = direction.multiply(oppositeSigned.getDirection()); | ||
170 | final Timestamp updateTimestamp = timestamp.max(oppositeSigned.getPayload()); | ||
171 | propagateUpdate(updateDirection, u1, updateTimestamp); | ||
172 | propagateUpdate(updateDirection, u2, updateTimestamp); | ||
173 | } | ||
174 | } | ||
175 | } | ||
176 | |||
177 | // handle self-join here | ||
178 | propagateUpdate(direction, unify(updateElement, updateElement), timestamp); | ||
179 | } | ||
180 | } | ||
181 | }; | ||
182 | |||
183 | @Override | ||
184 | protected NetworkStructureChangeSensitiveLogic createTimelessLogic() { | ||
185 | return this.TIMELESS; | ||
186 | } | ||
187 | |||
188 | @Override | ||
189 | protected NetworkStructureChangeSensitiveLogic createTimelyLogic() { | ||
190 | return this.TIMELY; | ||
191 | } | ||
192 | |||
193 | } | ||