aboutsummaryrefslogtreecommitdiffstats
path: root/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java
diff options
context:
space:
mode:
Diffstat (limited to 'subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java')
-rw-r--r--subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java193
1 files changed, 193 insertions, 0 deletions
diff --git a/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java
new file mode 100644
index 00000000..9a6a0de9
--- /dev/null
+++ b/subprojects/viatra-runtime-rete/src/main/java/tools/refinery/viatra/runtime/rete/index/JoinNode.java
@@ -0,0 +1,193 @@
1/*******************************************************************************
2 * Copyright (c) 2004-2008 Gabor Bergmann and Daniel Varro
3 * This program and the accompanying materials are made available under the
4 * terms of the Eclipse Public License v. 2.0 which is available at
5 * http://www.eclipse.org/legal/epl-v20.html.
6 *
7 * SPDX-License-Identifier: EPL-2.0
8 *******************************************************************************/
9
10package tools.refinery.viatra.runtime.rete.index;
11
12import java.util.Collection;
13import java.util.Map;
14
15import tools.refinery.viatra.runtime.matchers.tuple.Tuple;
16import tools.refinery.viatra.runtime.matchers.tuple.TupleMask;
17import tools.refinery.viatra.runtime.matchers.util.Direction;
18import tools.refinery.viatra.runtime.matchers.util.Signed;
19import tools.refinery.viatra.runtime.matchers.util.timeline.Timeline;
20import tools.refinery.viatra.runtime.rete.network.ReteContainer;
21import tools.refinery.viatra.runtime.rete.network.communication.Timestamp;
22
23/**
24 * @author Gabor Bergmann
25 *
26 */
27public class JoinNode extends DualInputNode {
28
29 public JoinNode(final ReteContainer reteContainer, final TupleMask complementerSecondaryMask) {
30 super(reteContainer, complementerSecondaryMask);
31 this.logic = createLogic();
32 }
33
34 @Override
35 public Tuple calibrate(final Tuple primary, final Tuple secondary) {
36 return unify(primary, secondary);
37 }
38
39 private final NetworkStructureChangeSensitiveLogic TIMELESS = new NetworkStructureChangeSensitiveLogic() {
40
41 @Override
42 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
43 throw new UnsupportedOperationException();
44 }
45
46 @Override
47 public void pullInto(final Collection<Tuple> collector, final boolean flush) {
48 if (primarySlot == null || secondarySlot == null) {
49 return;
50 }
51
52 if (flush) {
53 reteContainer.flushUpdates();
54 }
55
56 for (final Tuple signature : primarySlot.getSignatures()) {
57 // primaries can not be null due to the contract of IterableIndex.getSignatures()
58 final Collection<Tuple> primaries = primarySlot.get(signature);
59 final Collection<Tuple> opposites = secondarySlot.get(signature);
60 if (opposites != null) {
61 for (final Tuple primary : primaries) {
62 for (final Tuple opposite : opposites) {
63 collector.add(unify(primary, opposite));
64 }
65 }
66 }
67 }
68 }
69
70 @Override
71 public void notifyUpdate(final Side side, final Direction direction, final Tuple updateElement,
72 final Tuple signature, final boolean change, final Timestamp timestamp) {
73 // in the default case, all timestamps must be zero
74 assert Timestamp.ZERO.equals(timestamp);
75
76 final Collection<Tuple> opposites = retrieveOpposites(side, signature);
77
78 if (!coincidence) {
79 if (opposites != null) {
80 for (final Tuple opposite : opposites) {
81 propagateUpdate(direction, unify(side, updateElement, opposite), timestamp);
82 }
83 }
84 } else {
85 // compensate for coincidence of slots - this is the case when an Indexer is joined with itself
86 if (opposites != null) {
87 for (final Tuple opposite : opposites) {
88 if (opposite.equals(updateElement)) {
89 // handle self-joins of a single tuple separately
90 continue;
91 }
92 propagateUpdate(direction, unify(opposite, updateElement), timestamp);
93 propagateUpdate(direction, unify(updateElement, opposite), timestamp);
94 }
95 }
96
97 // handle self-joins here
98 propagateUpdate(direction, unify(updateElement, updateElement), timestamp);
99 }
100 }
101 };
102
103 private final NetworkStructureChangeSensitiveLogic TIMELY = new NetworkStructureChangeSensitiveLogic() {
104
105 @Override
106 public void pullIntoWithTimeline(final Map<Tuple, Timeline<Timestamp>> collector, final boolean flush) {
107 if (primarySlot == null || secondarySlot == null) {
108 return;
109 }
110
111 if (flush) {
112 reteContainer.flushUpdates();
113 }
114
115 for (final Tuple signature : primarySlot.getSignatures()) {
116 // primaries can not be null due to the contract of IterableIndex.getSignatures()
117 final Map<Tuple, Timeline<Timestamp>> primaries = getTimeline(signature, primarySlot);
118 final Map<Tuple, Timeline<Timestamp>> opposites = getTimeline(signature, secondarySlot);
119 if (opposites != null) {
120 for (final Tuple primary : primaries.keySet()) {
121 for (final Tuple opposite : opposites.keySet()) {
122 final Timeline<Timestamp> primaryTimeline = primaries.get(primary);
123 final Timeline<Timestamp> oppositeTimeline = opposites.get(opposite);
124 final Timeline<Timestamp> mergedTimeline = primaryTimeline
125 .mergeMultiplicative(oppositeTimeline);
126 if (!mergedTimeline.isEmpty()) {
127 collector.put(unify(primary, opposite), mergedTimeline);
128 }
129 }
130 }
131 }
132 }
133 }
134
135 @Override
136 public void pullInto(final Collection<Tuple> collector, final boolean flush) {
137 JoinNode.this.TIMELESS.pullInto(collector, flush);
138 }
139
140 @Override
141 public void notifyUpdate(final Side side, final Direction direction, final Tuple updateElement,
142 final Tuple signature, final boolean change, final Timestamp timestamp) {
143 final Indexer oppositeIndexer = getSlot(side.opposite());
144 final Map<Tuple, Timeline<Timestamp>> opposites = getTimeline(signature, oppositeIndexer);
145
146 if (!coincidence) {
147 if (opposites != null) {
148 for (final Tuple opposite : opposites.keySet()) {
149 final Tuple unifiedTuple = unify(side, updateElement, opposite);
150 for (final Signed<Timestamp> signed : opposites.get(opposite).asChangeSequence()) {
151 // TODO only consider signed timestamps that are greater or equal to timestamp
152 // plus compact the previous timestamps into at most one update
153 propagateUpdate(signed.getDirection().multiply(direction), unifiedTuple,
154 timestamp.max(signed.getPayload()));
155 }
156 }
157 }
158 } else {
159 // compensate for coincidence of slots - this is the case when an Indexer is joined with itself
160 if (opposites != null) {
161 for (final Tuple opposite : opposites.keySet()) {
162 if (opposite.equals(updateElement)) {
163 // handle self-joins of a single tuple separately
164 continue;
165 }
166 final Tuple u1 = unify(opposite, updateElement);
167 final Tuple u2 = unify(updateElement, opposite);
168 for (final Signed<Timestamp> oppositeSigned : opposites.get(opposite).asChangeSequence()) {
169 final Direction updateDirection = direction.multiply(oppositeSigned.getDirection());
170 final Timestamp updateTimestamp = timestamp.max(oppositeSigned.getPayload());
171 propagateUpdate(updateDirection, u1, updateTimestamp);
172 propagateUpdate(updateDirection, u2, updateTimestamp);
173 }
174 }
175 }
176
177 // handle self-join here
178 propagateUpdate(direction, unify(updateElement, updateElement), timestamp);
179 }
180 }
181 };
182
183 @Override
184 protected NetworkStructureChangeSensitiveLogic createTimelessLogic() {
185 return this.TIMELESS;
186 }
187
188 @Override
189 protected NetworkStructureChangeSensitiveLogic createTimelyLogic() {
190 return this.TIMELY;
191 }
192
193}