/******************************************************************************* * Copyright (c) 2010-2016, Tamas Szabo, Istvan Rath and Daniel Varro * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at * http://www.eclipse.org/legal/epl-v20.html. * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ package tools.refinery.viatra.runtime.rete.network.mailbox.timeless; import java.util.Map; import tools.refinery.viatra.runtime.matchers.tuple.Tuple; import tools.refinery.viatra.runtime.matchers.util.CollectionsFactory; import tools.refinery.viatra.runtime.matchers.util.Direction; import tools.refinery.viatra.runtime.rete.network.Receiver; import tools.refinery.viatra.runtime.rete.network.ReteContainer; import tools.refinery.viatra.runtime.rete.network.communication.CommunicationGroup; import tools.refinery.viatra.runtime.rete.network.communication.MessageSelector; import tools.refinery.viatra.runtime.rete.network.communication.PhasedSelector; import tools.refinery.viatra.runtime.rete.network.communication.Timestamp; import tools.refinery.viatra.runtime.rete.network.mailbox.AdaptableMailbox; import tools.refinery.viatra.runtime.rete.network.mailbox.Mailbox; /** * Default mailbox implementation. *

* Usually, the mailbox performs counting of messages so that they can cancel each other out. However, if marked as a * fall-through mailbox, than update messages are delivered directly to the receiver node to reduce overhead. * * @author Tamas Szabo * @since 2.0 */ public class DefaultMailbox implements AdaptableMailbox { private static int SIZE_TRESHOLD = 127; protected Map queue; protected Map buffer; protected final Receiver receiver; protected final ReteContainer container; protected boolean delivering; protected Mailbox adapter; protected CommunicationGroup group; public DefaultMailbox(final Receiver receiver, final ReteContainer container) { this.receiver = receiver; this.container = container; this.queue = CollectionsFactory.createMap(); this.buffer = CollectionsFactory.createMap(); this.adapter = this; } protected Map getActiveQueue() { if (this.delivering) { return this.buffer; } else { return this.queue; } } @Override public Mailbox getAdapter() { return this.adapter; } @Override public void setAdapter(final Mailbox adapter) { this.adapter = adapter; } @Override public boolean isEmpty() { return getActiveQueue().isEmpty(); } @Override public void postMessage(final Direction direction, final Tuple update, final Timestamp timestamp) { final Map activeQueue = getActiveQueue(); final boolean wasEmpty = activeQueue.isEmpty(); boolean significantChange = false; Integer count = activeQueue.get(update); if (count == null) { count = 0; significantChange = true; } if (direction == Direction.DELETE) { count--; } else { count++; } if (count == 0) { activeQueue.remove(update); significantChange = true; } else { activeQueue.put(update, count); } if (significantChange) { final Mailbox targetMailbox = this.adapter; final CommunicationGroup targetGroup = this.adapter.getCurrentGroup(); if (wasEmpty) { targetGroup.notifyHasMessage(targetMailbox, PhasedSelector.DEFAULT); } else if (activeQueue.isEmpty()) { targetGroup.notifyLostAllMessages(targetMailbox, PhasedSelector.DEFAULT); } } } @Override public void deliverAll(final MessageSelector kind) { if (kind == PhasedSelector.DEFAULT) { // use the buffer during delivering so that there is a clear // separation between the stages this.delivering = true; this.receiver.batchUpdate(this.queue.entrySet(), Timestamp.ZERO); this.delivering = false; if (queue.size() > SIZE_TRESHOLD) { this.queue = this.buffer; this.buffer = CollectionsFactory.createMap(); } else { this.queue.clear(); final Map tmpQueue = this.queue; this.queue = this.buffer; this.buffer = tmpQueue; } } else { throw new IllegalArgumentException("Unsupported message kind " + kind); } } @Override public String toString() { return "D_MBOX (" + this.receiver + ") " + this.getActiveQueue(); } @Override public Receiver getReceiver() { return this.receiver; } @Override public void clear() { this.queue.clear(); this.buffer.clear(); } @Override public CommunicationGroup getCurrentGroup() { return this.group; } @Override public void setCurrentGroup(final CommunicationGroup group) { this.group = group; } }