/******************************************************************************* * Copyright (c) quickfixengine.org All rights reserved. * * This file is part of the QuickFIX FIX Engine * * This file may be distributed under the terms of the quickfixengine.org * license as defined by quickfixengine.org and appearing in the file * LICENSE included in the packaging of this file. * * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE. * * See http://www.quickfixengine.org/LICENSE for licensing information. * * Contact ask@quickfixengine.org if any conditions of this licensing * are not clear to you. ******************************************************************************/ package quickfix.mina; import quickfix.LogUtil; import quickfix.Message; import quickfix.Session; import quickfix.SessionID; import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * Processes messages in a session-specific thread. */ public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { private final Map dispatchers = new ConcurrentHashMap(); public void onMessage(Session quickfixSession, Message message) { MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID()); if (dispatcher == null) { dispatcher = new MessageDispatchingThread(quickfixSession); dispatchers.put(quickfixSession.getSessionID(), dispatcher); startDispatcherThread(dispatcher); } dispatcher.enqueue(message); } /** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex * between multiple sessions here so this is null */ public SessionConnector getSessionConnector() { return null; } protected void startDispatcherThread(MessageDispatchingThread dispatcher) { dispatcher.start(); } public void stopDispatcherThreads() { Collection dispatchersToShutdown = dispatchers.values(); dispatchers.clear(); for (MessageDispatchingThread dispatcher : dispatchersToShutdown) { dispatcher.stopDispatcher(); } } private final class MessageDispatchingThread extends Thread { private final Session quickfixSession; private final BlockingQueue messages = new LinkedBlockingQueue(); private volatile boolean stopped; private MessageDispatchingThread(Session session) { super("QF/J Session dispatcher: " + session.getSessionID()); quickfixSession = session; } public void enqueue(Message message) { try { messages.put(message); } catch (InterruptedException e) { quickfixSession.getLog().onErrorEvent(e.toString()); } } public int getQueueSize() { return messages.size(); } public void run() { while (!stopped && !isInterrupted()) { try { Message message = getNextMessage(messages); if (quickfixSession.hasResponder()) { if (message != null) { quickfixSession.next(message); } } else { stopped = true; } } catch (InterruptedException e) { LogUtil.logThrowable(quickfixSession.getSessionID(), "Message dispatcher interrupted", e); stopped = true; } catch (Throwable e) { LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); } } dispatchers.remove(quickfixSession.getSessionID()); } public void stopDispatcher() { stopped = true; } } private BlockingQueue getMessages(SessionID sessionID) { MessageDispatchingThread dispatcher = getDispatcher(sessionID); return dispatcher.messages; } private MessageDispatchingThread getDispatcher(SessionID sessionID) { return dispatchers.get(sessionID); } private Message getNextMessage(BlockingQueue messages) throws InterruptedException { return messages.poll(1, TimeUnit.SECONDS); } public int getQueueSize() { int ret = 0; for(MessageDispatchingThread mdt : dispatchers.values()) { ret+=mdt.getQueueSize(); } return ret; } }