Uploaded image for project: 'QuickFIX/J'
  1. QuickFIX/J
  2. QFJ-510

ThreadPerSessionEventHandlingStrategy.MessageDispatchingThread threads live after disconnect

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Default
    • Resolution: Duplicate
    • Affects Version/s: 1.4.0
    • Fix Version/s: None
    • Component/s: Engine
    • Labels:
      None

      Description

      This patch ends dispatcher thread after logout.

      /*******************************************************************************

      • Copyright (c) quickfixengine.org All rights reserved.
        *
      • This file is part of the QuickFIX FIX Engine
        *
      • This file may be distributed under the terms of the quickfixengine.org
      • license as defined by quickfixengine.org and appearing in the file
      • LICENSE included in the packaging of this file.
        *
      • This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
      • THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
      • PARTICULAR PURPOSE.
        *
      • See http://www.quickfixengine.org/LICENSE for licensing information.
        *
      • Contact [email protected] if any conditions of this licensing
      • are not clear to you.
        ******************************************************************************/

      package quickfix.mina;

      import quickfix.LogUtil;
      import quickfix.Message;
      import quickfix.Session;
      import quickfix.SessionID;
      import quickfix.SessionStateListener;

      import java.util.Collection;
      import java.util.Map;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.LinkedBlockingQueue;

      /**

      • Processes messages in a session-specific thread.
        */
        public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
        private final Map<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap<SessionID, MessageDispatchingThread>();

      public void onMessage(Session quickfixSession, Message message) {
      MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID());
      if (dispatcher == null)

      { dispatcher = new MessageDispatchingThread(quickfixSession); dispatchers.put(quickfixSession.getSessionID(), dispatcher); startDispatcherThread(dispatcher); }

      dispatcher.enqueue(message);
      }

      /** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex

      • between multiple sessions here so this is null
        */
        public SessionConnector getSessionConnector() { return null; }

      protected void startDispatcherThread(MessageDispatchingThread dispatcher)

      { dispatcher.start(); }

      public void stopDispatcherThreads() {
      Collection<MessageDispatchingThread> dispatchersToShutdown = dispatchers.values();
      dispatchers.clear();
      for (MessageDispatchingThread dispatcher : dispatchersToShutdown)

      { dispatcher.stopDispatcher(); }

      }

      private final class MessageDispatchingThread extends Thread {
      private final Session quickfixSession;
      private final BlockingQueue<Message> messages = new LinkedBlockingQueue<Message>();
      private volatile boolean stopped;

      private MessageDispatchingThread(Session session) {
      super("QF/J Session dispatcher: " + session.getSessionID());
      quickfixSession = session;
      quickfixSession.addStateListener(new SessionStateListener() {
      public void onConnect() {
      }

      public void onDisconnect() {
      }

      public void onLogon() {
      }

      public void onLogout()

      { stopped = true; interrupt(); dispatchers.remove(quickfixSession.getSessionID()); }

      public void onReset() {
      }

      public void onRefresh() {
      }

      public void onMissedHeartBeat() {
      }

      public void onHeartBeatTimeout() {
      }
      });
      }

      public void enqueue(Message message) {
      try

      { messages.put(message); }

      catch (InterruptedException e)

      { quickfixSession.getLog().onErrorEvent(e.toString()); }

      }

      public int getQueueSize()

      { return messages.size(); }

      public void run() {
      while (!stopped) {
      try {
      Message message = getNextMessage(messages);
      if (quickfixSession.hasResponder())

      { quickfixSession.next(message); }

      } catch (InterruptedException e) {
      if (!stopped)

      { LogUtil.logThrowable(quickfixSession.getSessionID(), "Message dispatcher interrupted", e); return; }

      } catch (Throwable e)

      { LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); }

      }
      }

      public void stopDispatcher()

      { stopped = true; }

      }

      private BlockingQueue<Message> getMessages(SessionID sessionID)

      { MessageDispatchingThread dispatcher = getDispatcher(sessionID); return dispatcher.messages; }

      private MessageDispatchingThread getDispatcher(SessionID sessionID)

      { return dispatchers.get(sessionID); }

      private Message getNextMessage(BlockingQueue<Message> messages) throws InterruptedException

      { return messages.take(); }

      public int getQueueSize() {
      int ret = 0;
      for(MessageDispatchingThread mdt : dispatchers.values())

      { ret+=mdt.getQueueSize(); }

      return ret;
      }

      }

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                andmer Andre Mermegas
              • Votes:
                0 Vote for this issue
                Watchers:
                0 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: