/******************************************************************************* * Copyright (c) 2010-2019, Tamas Szabo, Istvan Rath and Daniel Varro * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at * http://www.eclipse.org/legal/epl-v20.html. * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ package tools.refinery.viatra.runtime.rete.network.communication.timely; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.function.Function; import tools.refinery.viatra.runtime.rete.itc.alg.misc.topsort.TopologicalSorting; import tools.refinery.viatra.runtime.rete.itc.graphimpl.Graph; import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; import tools.refinery.viatra.runtime.rete.index.IndexerListener; import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer; import tools.refinery.viatra.runtime.rete.index.SpecializedProjectionIndexer.ListenerSubscription; import tools.refinery.viatra.runtime.rete.index.StandardIndexer; import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration; import tools.refinery.viatra.runtime.rete.matcher.TimelyConfiguration.TimelineRepresentation; import tools.refinery.viatra.runtime.rete.network.NetworkStructureChangeSensitiveNode; import tools.refinery.viatra.runtime.rete.network.Node; import tools.refinery.viatra.runtime.rete.network.ProductionNode; import tools.refinery.viatra.runtime.rete.network.StandardNode; import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; import tools.refinery.viatra.runtime.rete.network.communication.CommunicationTracker; import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; import tools.refinery.viatra.runtime.rete.network.communication.NodeComparator; import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; import tools.refinery.viatra.runtime.rete.single.DiscriminatorDispatcherNode; /** * Timely (DDF) implementation of the {@link CommunicationTracker}. * * @author Tamas Szabo * @since 2.3 */ public class TimelyCommunicationTracker extends CommunicationTracker { protected final TimelyConfiguration configuration; public TimelyCommunicationTracker(final TimelyConfiguration configuration) { this.configuration = configuration; } @Override protected CommunicationGroup createGroup(final Node representative, final int index) { final boolean isSingleton = this.sccInformationProvider.sccs.getPartition(representative).size() == 1; return new TimelyCommunicationGroup(this, representative, index, isSingleton); } @Override protected void reconstructQueueContents(final Set oldActiveGroups) { for (final CommunicationGroup oldGroup : oldActiveGroups) { for (final Entry> entry : oldGroup.getMailboxes().entrySet()) { for (final Mailbox mailbox : entry.getValue()) { final CommunicationGroup newGroup = this.groupMap.get(mailbox.getReceiver()); newGroup.notifyHasMessage(mailbox, entry.getKey()); } } } } @Override public Mailbox proxifyMailbox(final Node requester, final Mailbox original) { final Mailbox mailboxToProxify = (original instanceof TimelyMailboxProxy) ? ((TimelyMailboxProxy) original).getWrappedMailbox() : original; final TimestampTransformation preprocessor = getPreprocessor(requester, mailboxToProxify.getReceiver()); if (preprocessor == null) { return mailboxToProxify; } else { return new TimelyMailboxProxy(mailboxToProxify, preprocessor); } } @Override public IndexerListener proxifyIndexerListener(final Node requester, final IndexerListener original) { final IndexerListener listenerToProxify = (original instanceof TimelyIndexerListenerProxy) ? ((TimelyIndexerListenerProxy) original).getWrappedIndexerListener() : original; final TimestampTransformation preprocessor = getPreprocessor(requester, listenerToProxify.getOwner()); if (preprocessor == null) { return listenerToProxify; } else { return new TimelyIndexerListenerProxy(listenerToProxify, preprocessor); } } protected TimestampTransformation getPreprocessor(final Node source, final Node target) { final Node effectiveSource = source instanceof SpecializedProjectionIndexer ? ((SpecializedProjectionIndexer) source).getActiveNode() : source; final CommunicationGroup sourceGroup = this.getGroup(effectiveSource); final CommunicationGroup targetGroup = this.getGroup(target); if (sourceGroup != null && targetGroup != null) { // during RETE construction, the groups may be still null if (sourceGroup != targetGroup && sourceGroup.isRecursive()) { // targetGroup is a successor SCC of sourceGroup // and sourceGroup is a recursive SCC // then we need to zero out the timestamps return TimestampTransformation.RESET; } if (sourceGroup == targetGroup && target instanceof ProductionNode) { // if requester and receiver are in the same SCC // and receiver is a production node // then we need to increment the timestamps return TimestampTransformation.INCREMENT; } } return null; } @Override protected void postProcessNode(final Node node) { if (node instanceof NetworkStructureChangeSensitiveNode) { ((NetworkStructureChangeSensitiveNode) node).networkStructureChanged(); } } @Override protected void postProcessGroup(final CommunicationGroup group) { if (this.configuration.getTimelineRepresentation() == TimelineRepresentation.FAITHFUL) { final Node representative = group.getRepresentative(); final Set groupMembers = this.sccInformationProvider.sccs.getPartition(representative); if (groupMembers.size() > 1) { final Graph graph = new Graph(); for (final Node node : groupMembers) { graph.insertNode(node); } for (final Node source : groupMembers) { for (final Node target : this.dependencyGraph.getTargetNodes(source)) { // (1) the edge is not a recursion cut point // (2) the edge is within this group if (!this.isRecursionCutPoint(source, target) && groupMembers.contains(target)) { graph.insertEdge(source, target); } } } final List orderedNodes = TopologicalSorting.compute(graph); final Map nodeMap = CollectionsFactory.createMap(); int identifier = 0; for (final Node orderedNode : orderedNodes) { nodeMap.put(orderedNode, identifier++); } ((TimelyCommunicationGroup) group).setComparatorAndReorderMailboxes(new NodeComparator(nodeMap)); } } } /** * This static field is used for debug purposes in the DotGenerator. */ public static final Function> EDGE_LABEL_FUNCTION = new Function>() { @Override public Function apply(final Node source) { return new Function() { @Override public String apply(final Node target) { if (source instanceof SpecializedProjectionIndexer) { final Collection subscriptions = ((SpecializedProjectionIndexer) source) .getSubscriptions(); for (final ListenerSubscription subscription : subscriptions) { if (subscription.getListener().getOwner() == target && subscription.getListener() instanceof TimelyIndexerListenerProxy) { return ((TimelyIndexerListenerProxy) subscription.getListener()).preprocessor .toString(); } } } if (source instanceof StandardIndexer) { final Collection listeners = ((StandardIndexer) source).getListeners(); for (final IndexerListener listener : listeners) { if (listener.getOwner() == target && listener instanceof TimelyIndexerListenerProxy) { return ((TimelyIndexerListenerProxy) listener).preprocessor.toString(); } } } if (source instanceof StandardNode) { final Collection mailboxes = ((StandardNode) source).getChildMailboxes(); for (final Mailbox mailbox : mailboxes) { if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); } } } if (source instanceof DiscriminatorDispatcherNode) { final Collection mailboxes = ((DiscriminatorDispatcherNode) source) .getBucketMailboxes().values(); for (final Mailbox mailbox : mailboxes) { if (mailbox.getReceiver() == target && mailbox instanceof TimelyMailboxProxy) { return ((TimelyMailboxProxy) mailbox).preprocessor.toString(); } } } return null; } }; } }; }