diff --git a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java index 869dca7..2242478 100644 --- a/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java +++ b/quickfixj-core/src/main/java/quickfix/DefaultSessionFactory.java @@ -194,6 +194,9 @@ session.setLogonTimeout(logonTimeout); session.setLogoutTimeout(logoutTimeout); + final int maxScheduledWriteRequests = getSetting(settings, sessionID, Session.SETTING_MAX_SCHEDULED_WRITE_REQUESTS, 0); + session.setMaxScheduledWriteRequests(maxScheduledWriteRequests); + // // Session registration and creation callback is done here instead of in // session constructor to eliminate the possibility of other threads diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 18ad6c1..5761d86 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -335,6 +335,8 @@ */ public static final String SETTING_RESEND_REQUEST_CHUNK_SIZE = "ResendRequestChunkSize"; + public static final String SETTING_MAX_SCHEDULED_WRITE_REQUESTS = "MaxScheduledWriteRequests"; + private static final ConcurrentMap sessions = new ConcurrentHashMap(); private final Application application; @@ -379,6 +381,8 @@ private boolean forceResendWhenCorruptedStore = false; private boolean enableNextExpectedMsgSeqNum = false; private boolean enableLastMsgSeqNumProcessed = false; + + private int maxScheduledWriteRequests = 0; private final AtomicBoolean isResetting = new AtomicBoolean(); @@ -2775,6 +2779,15 @@ } } + public int getMaxScheduledWriteRequests() { + return maxScheduledWriteRequests; + } + + public void setMaxScheduledWriteRequests(int maxScheduledWriteRequests) { + this.maxScheduledWriteRequests = maxScheduledWriteRequests; + } + + public void setIgnoreHeartBeatFailure(boolean ignoreHeartBeatFailure) { disableHeartBeatCheck = ignoreHeartBeatFailure; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java index 897f14b..c8d7658 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java +++ b/quickfixj-core/src/main/java/quickfix/mina/IoSessionResponder.java @@ -19,10 +19,12 @@ package quickfix.mina; +import java.io.IOException; import java.net.SocketAddress; +import quickfix.Session; -import org.apache.mina.core.session.IoSession; import org.apache.mina.core.future.WriteFuture; +import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,15 +39,27 @@ private final IoSession ioSession; private final boolean synchronousWrites; private final long synchronousWriteTimeout; + private final int maxScheduledWriteRequests; - public IoSessionResponder(IoSession session, boolean synchronousWrites, long synchronousWriteTimeout) { + public IoSessionResponder(IoSession session, boolean synchronousWrites, long synchronousWriteTimeout, int maxScheduledWriteRequests) { ioSession = session; this.synchronousWrites = synchronousWrites; this.synchronousWriteTimeout = synchronousWriteTimeout; + this.maxScheduledWriteRequests = maxScheduledWriteRequests; } @Override public boolean send(String data) { + // Check for and disconnect slow consumers. + if (maxScheduledWriteRequests != 0 && ioSession.getScheduledWriteMessages() >= maxScheduledWriteRequests) { + Session qfjSession = (Session) ioSession.getAttribute(SessionConnector.QF_SESSION); + try { + qfjSession.disconnect("Slow consumer", true); + } catch (IOException e) { + } + return false; + } + // The data is written asynchronously in a MINA thread WriteFuture future = ioSession.write(data); if (synchronousWrites) { diff --git a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java index 39aed98..8eddf1a 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java +++ b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java @@ -85,7 +85,7 @@ final NetworkingOptions networkingOptions = getNetworkingOptions(); qfSession.setResponder(new IoSessionResponder(protocolSession, networkingOptions.getSynchronousWrites(), networkingOptions - .getSynchronousWriteTimeout())); + .getSynchronousWriteTimeout(), qfSession.getMaxScheduledWriteRequests())); if (sessionID.isFIXT()) { // QFJ-592 if (message.isSetField(DefaultApplVerID.FIELD)) { final ApplVerID applVerID = new ApplVerID( diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java index 0b7c3c4..5782118 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java @@ -47,7 +47,8 @@ NetworkingOptions networkingOptions = getNetworkingOptions(); quickfixSession.setResponder(new IoSessionResponder(session, networkingOptions.getSynchronousWrites(), - networkingOptions.getSynchronousWriteTimeout())); + networkingOptions.getSynchronousWriteTimeout(), + quickfixSession.getMaxScheduledWriteRequests())); log.info("MINA session created for " + quickfixSession.getSessionID() + ": local=" + session.getLocalAddress() + ", " + session.getClass() + ", remote=" + session.getRemoteAddress()); diff --git a/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java b/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java index a2cdffc..d2864f8 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/IoSessionResponderTest.java @@ -34,7 +34,7 @@ WriteFuture mockWriteFuture = mock(WriteFuture.class); stub(mockWriteFuture.isWritten()).toReturn(true); stub(mockIoSession.write("abcd")).toReturn(mockWriteFuture); - IoSessionResponder responder = new IoSessionResponder(mockIoSession, false, 0); + IoSessionResponder responder = new IoSessionResponder(mockIoSession, false, 0, 0); boolean result = responder.send("abcd"); @@ -50,7 +50,7 @@ WriteFuture mockWriteFuture = mock(WriteFuture.class); stub(mockIoSession.write("abcd")).toReturn(mockWriteFuture); stub(mockWriteFuture.awaitUninterruptibly(timeout)).toReturn(true); - IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout); + IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout, 0); boolean result = responder.send("abcd"); @@ -68,7 +68,7 @@ WriteFuture mockWriteFuture = mock(WriteFuture.class); stub(mockIoSession.write("abcd")).toReturn(mockWriteFuture); stubVoid(mockWriteFuture).toThrow(new RuntimeException("TEST")).on().awaitUninterruptibly(timeout); - IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout); + IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout, 0); boolean result = responder.send("abcd"); @@ -86,7 +86,7 @@ WriteFuture mockWriteFuture = mock(WriteFuture.class); stub(mockIoSession.write("abcd")).toReturn(mockWriteFuture); stub(mockWriteFuture.awaitUninterruptibly(timeout)).toReturn(false); - IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout); + IoSessionResponder responder = new IoSessionResponder(mockIoSession, true, timeout, 0); boolean result = responder.send("abcd"); @@ -102,7 +102,7 @@ stub(mockProtocolSession.getScheduledWriteMessages()).toReturn(0); stub(mockProtocolSession.close(true)).toReturn(null); - IoSessionResponder responder = new IoSessionResponder(mockProtocolSession, false, 0); + IoSessionResponder responder = new IoSessionResponder(mockProtocolSession, false, 0, 0); responder.disconnect(); verify(mockProtocolSession).getScheduledWriteMessages(); @@ -116,7 +116,7 @@ stub(mockProtocolSession.getRemoteAddress()).toReturn( new InetSocketAddress("1.2.3.4", 5432)); - IoSessionResponder responder = new IoSessionResponder(mockProtocolSession, false, 0); + IoSessionResponder responder = new IoSessionResponder(mockProtocolSession, false, 0, 0); assertEquals("/1.2.3.4:5432", responder.getRemoteAddress()); verify(mockProtocolSession).getRemoteAddress();