Index: core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java =================================================================== --- core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java (revision 918) +++ core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java (working copy) @@ -20,10 +20,12 @@ package quickfix.mina; import java.util.Collection; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import quickfix.LogUtil; import quickfix.Message; @@ -34,6 +36,11 @@ * Processes messages in a session-specific thread. */ public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy { + /** + * Constant indicating how long we wait for an incoming message. After this thread has been asked + * to stop, it can take up to this long to terminate. + */ + private static final long THREAD_WAIT_FOR_MESSAGE_MS = 250; private final Map dispatchers = new ConcurrentHashMap(); public void onMessage(Session quickfixSession, Message message) { @@ -52,17 +59,33 @@ } public void stopDispatcherThreads() { + // dispatchersToShutdown is backed by the map itself so changes in one are reflected in the other Collection dispatchersToShutdown = dispatchers.values(); - dispatchers.clear(); for (MessageDispatchingThread dispatcher : dispatchersToShutdown) { dispatcher.stopDispatcher(); } + + // wait for threads to stop + while (dispatchersToShutdown.size() > 0) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + for (Iterator iterator = dispatchersToShutdown.iterator(); iterator.hasNext();) { + MessageDispatchingThread messageDispatchingThread = iterator.next(); + if (messageDispatchingThread.isStopped()) { + iterator.remove(); + } + } + } } class MessageDispatchingThread extends Thread { private final Session quickfixSession; private final BlockingQueue messages = new LinkedBlockingQueue(); - private volatile boolean stopped; + private volatile boolean stopped = false, stopping = false; public MessageDispatchingThread(Session session) { super("QF/J Session dispatcher: " + session.getSessionID()); @@ -78,10 +101,10 @@ } public void run() { - while (!stopped) { + while (!stopping) { try { Message message = getNextMessage(messages); - if (quickfixSession.hasResponder()) { + if (message != null && quickfixSession.hasResponder()) { quickfixSession.next(message); } } catch (InterruptedException e) { @@ -93,11 +116,16 @@ "Error during message processing", e); } } + stopped = true; } public void stopDispatcher() { - stopped = true; + stopping = true; } + + public boolean isStopped() { + return stopped; + } } BlockingQueue getMessages(SessionID sessionID) { @@ -109,9 +137,15 @@ return dispatchers.get(sessionID); } + /** + * Get the next message from the messages {@link java.util.concurrent.BlockingQueue}. + *

We do not block indefinately as that would prevent this thread from ever stopping + * @see #THREAD_WAIT_FOR_MESSAGE_MS + * @param messages + * @return next message or null if nothing arrived within the timeout period + * @throws InterruptedException + */ Message getNextMessage(BlockingQueue messages) throws InterruptedException { - return messages.take(); + return messages.poll(THREAD_WAIT_FOR_MESSAGE_MS, TimeUnit.MILLISECONDS); } - - }