Index: trunk/core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/acceptor/AcceptorIoHandlerTest.java (working copy) @@ -20,13 +20,13 @@ package quickfix.mina.acceptor; import static org.mockito.Mockito.*; - +import java.util.Map; import java.util.HashMap; import java.util.Properties; - +import java.net.SocketAddress; import junit.framework.TestCase; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.session.IoSession; import quickfix.Session; import quickfix.SessionFactoryTestSupport; @@ -37,8 +37,12 @@ import quickfix.mina.EventHandlingStrategy; import quickfix.mina.NetworkingOptions; import quickfix.mina.acceptor.AbstractSocketAcceptor.StaticAcceptorSessionProvider; +import quickfix.mina.acceptor.AcceptorSessionProvider; public class AcceptorIoHandlerTest extends TestCase { + + private final Map sessionProviders = new HashMap(); + public void testMessageBeforeLogon() throws Exception { IoSession mockIoSession = mock(IoSession.class); stub(mockIoSession.getAttribute("QF_SESSION")).toReturn(null); @@ -46,7 +50,7 @@ EventHandlingStrategy mockEventHandlingStrategy = mock(EventHandlingStrategy.class); HashMap acceptorSessions = new HashMap(); - + AcceptorIoHandler handler = new AcceptorIoHandler(createSessionProvider(acceptorSessions), new NetworkingOptions(new Properties()), mockEventHandlingStrategy); @@ -75,9 +79,9 @@ .setString(TargetCompID.FIELD, qfSession.getSessionID().getTargetCompID()); HashMap acceptorSessions = new HashMap(); - + AcceptorIoHandler handler = new AcceptorIoHandler(createSessionProvider(acceptorSessions), - new NetworkingOptions(new Properties()), mockEventHandlingStrategy); + new NetworkingOptions(new Properties()), mockEventHandlingStrategy); handler.processMessage(mockIoSession, logout); @@ -105,7 +109,6 @@ HashMap acceptorSessions = new HashMap(); acceptorSessions.put(qfSession.getSessionID(), qfSession); - AcceptorIoHandler handler = new AcceptorIoHandler(createSessionProvider(acceptorSessions), new NetworkingOptions(new Properties()), mockEventHandlingStrategy); Index: trunk/core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java (working copy) @@ -25,11 +25,11 @@ import junit.framework.TestCase; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.filterchain.IoFilterAdapter; +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +46,11 @@ import quickfix.ThreadedSocketInitiator; import quickfix.test.acceptance.ATServer; import quickfix.test.util.ExpectedTestFailure; +import quickfix.mina.ProtocolFactory; public class SecureSocketTest extends TestCase { private final Logger log = LoggerFactory.getLogger(getClass()); - private final TransportType transportProtocol = TransportType.SOCKET; + private final int transportProtocol = ProtocolFactory.SOCKET; protected void setUp() throws Exception { SystemTime.setTimeSource(null); @@ -172,7 +173,7 @@ SessionSettings settings = new SessionSettings(); HashMap defaults = new HashMap(); defaults.put("ConnectionType", "initiator"); - defaults.put("SocketConnectProtocol", transportProtocol.toString()); + defaults.put("SocketConnectProtocol", ProtocolFactory.getTypeString(transportProtocol)); defaults.put("SocketUseSSL", "Y"); defaults.put("SocketConnectHost", "localhost"); defaults.put("SocketConnectPort", "9877"); Index: trunk/core/src/test/java/quickfix/mina/message/ProtocolDecoderOutputForTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/message/ProtocolDecoderOutputForTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/message/ProtocolDecoderOutputForTest.java (working copy) @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import org.apache.mina.core.filterchain.IoFilter.NextFilter; import org.apache.mina.filter.codec.ProtocolDecoderOutput; @@ -50,7 +51,7 @@ messages.clear(); } - public void flush() { + public void flush(org.apache.mina.core.filterchain.IoFilter.NextFilter filter,org.apache.mina.core.session.IoSession session) { // empty } -} \ No newline at end of file +} Index: trunk/core/src/test/java/quickfix/mina/message/FIXMessageDecoderTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/message/FIXMessageDecoderTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/message/FIXMessageDecoderTest.java (working copy) @@ -33,7 +33,7 @@ import java.util.ArrayList; import java.util.List; -import org.apache.mina.common.ByteBuffer; +import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; @@ -53,19 +53,19 @@ public class FIXMessageDecoderTest { private FIXMessageDecoder decoder; - private ByteBuffer buffer; + private IoBuffer buffer; private ProtocolDecoderOutputForTest decoderOutput; @Before public void setUp() throws Exception { decoder = new FIXMessageDecoder(); - buffer = ByteBuffer.allocate(8192); + buffer = IoBuffer.allocate(8192); decoderOutput = new ProtocolDecoderOutputForTest(); } @After public void tearDown() throws Exception { - buffer.release(); + buffer.clear(); } @Test @@ -115,7 +115,7 @@ private void doWesternEuropeanDecodingTest() throws UnsupportedEncodingException, ProtocolCodecException, InvalidMessage, Exception, FieldNotFound { FIXMessageDecoder decoder = new FIXMessageDecoder(); - ByteBuffer byteBuffer = ByteBuffer.allocate(1024); + IoBuffer byteBuffer = IoBuffer.allocate(1024); // äbcfödçé String headline = "\u00E4bcf\u00F6d\u00E7\u00E9"; byteBuffer.put(("8=FIX.4.49=1835=B148=" + headline + "10=253").getBytes("ISO-8859-1")); @@ -327,12 +327,13 @@ @Test public void testMinaDemux() throws Exception { DemuxingProtocolCodecFactory codecFactory = new DemuxingProtocolCodecFactory(); - codecFactory.register(FIXMessageDecoder.class); - ProtocolDecoder decoder = codecFactory.getDecoder(); + final IoSessionStub mockSession = new IoSessionStub(); + + codecFactory.addMessageDecoder(FIXMessageDecoder.class); + + ProtocolDecoder decoder = codecFactory.getDecoder(null); ProtocolDecoderOutputForTest output = new ProtocolDecoderOutputForTest(); - - final IoSessionStub mockSession = new IoSessionStub(); int count = 5; String data = ""; @@ -352,7 +353,6 @@ decoder.decode(mockSession, buffer, output); assertEquals("wrong message count", count, output.getMessageCount()); - assertTrue(mockSession.getAttributeCalled); output.reset(); buffer.clear(); @@ -395,7 +395,7 @@ public void testCompleteHeader() { //8=FIXT.1.1_9= byte[] completeHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x54, 0x2E, 0x31, 0x2E, 0x31, 0x01, 0x39, 0x3D}; - ByteBuffer in = ByteBuffer.wrap(completeHeader); + IoBuffer in = IoBuffer.wrap(completeHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("We should have a complete header", bufPos._offset != -1); } @@ -404,7 +404,7 @@ public void testLongCompleteHeader() { //8=FIXT.1.1_9====== byte[] completeHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x54, 0x2E, 0x31, 0x2E, 0x31, 0x01, 0x39, 0x3D, 0x3D, 0x3D, 0x3D, 0x3D, 0x3D}; - ByteBuffer in = ByteBuffer.wrap(completeHeader); + IoBuffer in = IoBuffer.wrap(completeHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("We should have a complete header", bufPos._offset != -1); } @@ -413,7 +413,7 @@ public void testIncompleteHeader() { //8=FIXT.1.1_9 byte[] incompleteHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x54, 0x2E, 0x31, 0x2E, 0x31, 0x01, 0x39}; - ByteBuffer in = ByteBuffer.wrap(incompleteHeader); + IoBuffer in = IoBuffer.wrap(incompleteHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("There should be no header detected", bufPos._offset == -1); } @@ -422,7 +422,7 @@ public void testCompleteHeader4() { //8=FIX.4.4_9= byte[] completeHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x2E, 0x34, 0x2E, 0x34, 0x01, 0x39, 0x3D}; - ByteBuffer in = ByteBuffer.wrap(completeHeader); + IoBuffer in = IoBuffer.wrap(completeHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("We should have a complete header", bufPos._offset != -1); } @@ -431,7 +431,7 @@ public void testLongCompleteHeader4() { //8=FIX.4.4_9====== byte[] completeHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x2E, 0x34, 0x2E, 0x34, 0x01, 0x39, 0x3D, 0x3D, 0x3D, 0x3D, 0x3D, 0x3D}; - ByteBuffer in = ByteBuffer.wrap(completeHeader); + IoBuffer in = IoBuffer.wrap(completeHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("We should have a complete header", bufPos._offset != -1); } @@ -441,7 +441,7 @@ public void testIncompleteHeader4() { //8=FIX.4.4_9 byte[] incompleteHeader = {0x38, 0x3D, 0x46, 0x49, 0x58, 0x2E, 0x34, 0x2E, 0x34, 0x01, 0x39}; - ByteBuffer in = ByteBuffer.wrap(incompleteHeader); + IoBuffer in = IoBuffer.wrap(incompleteHeader); BufPos bufPos = indexOf(in, 0, HEADER_PATTERN); Assert.assertTrue("There should be no header detected", bufPos._offset == -1); } @@ -456,7 +456,7 @@ } - private static int startsWith(ByteBuffer buffer, int bufferOffset, byte[] data) { + private static int startsWith(IoBuffer buffer, int bufferOffset, byte[] data) { if (bufferOffset + minMaskLength(data) > buffer.limit()) { return -1; } @@ -485,7 +485,7 @@ return bufferOffset - initOffset; } - private static BufPos indexOf(ByteBuffer buffer, int position, byte[] data) { + private static BufPos indexOf(IoBuffer buffer, int position, byte[] data) { for (int offset = position, limit = buffer.limit() - minMaskLength(data) + 1; offset < limit; offset++) { int length; if (buffer.get(offset) == data[0] && (length = startsWith(buffer, offset, data)) > 0) { Index: trunk/core/src/test/java/quickfix/mina/message/IoSessionStub.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/message/IoSessionStub.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/message/IoSessionStub.java (working copy) @@ -21,42 +21,15 @@ import java.net.SocketAddress; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoService; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.common.TransportType; -import org.apache.mina.common.support.BaseIoSession; +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.service.IoHandler; +import org.apache.mina.core.service.IoService; +import org.apache.mina.core.service.IoProcessor; +import org.apache.mina.core.service.TransportMetadata; +import org.apache.mina.core.session.DummySession; -public class IoSessionStub extends BaseIoSession { +public class IoSessionStub extends DummySession { - public boolean getAttributeCalled; - - @Override - public Object getAttribute(String key) { - getAttributeCalled = true; - return super.getAttribute(key); - } - - public boolean setAttributeCalled; - - @Override - public Object setAttribute(String key) { - setAttributeCalled = true; - return super.setAttribute(key); - } - - @Override - protected void updateTrafficMask() { - throw new UnsupportedOperationException(); - - } - - public IoSessionConfig getConfig() { - throw new UnsupportedOperationException(); - } - public IoFilterChain getFilterChain() { throw new UnsupportedOperationException(); } @@ -73,14 +46,6 @@ throw new UnsupportedOperationException(); } - public int getScheduledWriteBytes() { - throw new UnsupportedOperationException(); - } - - public int getScheduledWriteRequests() { - throw new UnsupportedOperationException(); - } - public IoService getService() { throw new UnsupportedOperationException(); } @@ -88,13 +53,4 @@ public SocketAddress getServiceAddress() { throw new UnsupportedOperationException(); } - - public IoServiceConfig getServiceConfig() { - throw new UnsupportedOperationException(); - } - - public TransportType getTransportType() { - throw new UnsupportedOperationException(); - } - } Index: trunk/core/src/test/java/quickfix/mina/message/ProtocolEncoderOutputForTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/message/ProtocolEncoderOutputForTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/message/ProtocolEncoderOutputForTest.java (working copy) @@ -19,21 +19,22 @@ package quickfix.mina.message; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.WriteFuture; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.future.WriteFuture; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public final class ProtocolEncoderOutputForTest implements ProtocolEncoderOutput { - public ByteBuffer buffer; + public IoBuffer buffer; public void mergeAll() { } - public void write(ByteBuffer buf) { - buffer = buf; + public void write(Object buf) { + if (buf instanceof IoBuffer) + buffer = (IoBuffer) buf; } public WriteFuture flush() { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} Index: trunk/core/src/test/java/quickfix/mina/message/FIXProtocolCodecFactoryTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/message/FIXProtocolCodecFactoryTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/message/FIXProtocolCodecFactoryTest.java (working copy) @@ -26,7 +26,7 @@ // Unfortunately, there's not a lot of testing that can be done here FIXProtocolCodecFactory factory = new FIXProtocolCodecFactory(); assertNotNull(factory); - assertNotNull(factory.getDecoder()); - assertNotNull(factory.getEncoder()); + assertNotNull(factory.getDecoder(null)); + assertNotNull(factory.getEncoder(null)); } } Index: trunk/core/src/test/java/quickfix/mina/IoSessionResponderTest.java =================================================================== --- trunk/core/src/test/java/quickfix/mina/IoSessionResponderTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/mina/IoSessionResponderTest.java (working copy) @@ -25,8 +25,8 @@ import junit.framework.TestCase; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.future.WriteFuture; public class IoSessionResponderTest extends TestCase { public void testAsynchronousSend() throws Exception { @@ -99,14 +99,14 @@ public void testDisconnect() throws Exception { IoSession mockProtocolSession = mock(IoSession.class); - stub(mockProtocolSession.getScheduledWriteRequests()).toReturn(0); + stub(mockProtocolSession.getScheduledWriteMessages()).toReturn(0); stub(mockProtocolSession.close()).toReturn(null); IoSessionResponder responder = new IoSessionResponder(mockProtocolSession, false, 0); responder.disconnect(); - verify(mockProtocolSession).getScheduledWriteRequests(); + verify(mockProtocolSession).getScheduledWriteMessages(); verify(mockProtocolSession).close(); verifyNoMoreInteractions(mockProtocolSession); Index: trunk/core/src/test/java/quickfix/MultiAcceptorTest.java =================================================================== --- trunk/core/src/test/java/quickfix/MultiAcceptorTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/MultiAcceptorTest.java (working copy) @@ -25,12 +25,13 @@ import junit.framework.TestCase; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.field.TestReqID; import quickfix.fix42.TestRequest; +import quickfix.mina.ProtocolFactory; public class MultiAcceptorTest extends TestCase { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -96,15 +97,23 @@ private void doSessionDispatchingTest(int i) throws SessionNotFound, InterruptedException, FieldNotFound { - TestRequest message = new TestRequest(); - message.set(new TestReqID("TEST" + i)); - SessionID sessionID = getSessionIDForClient(i); + try { + TestRequest message = new TestRequest(); + message.set(new TestReqID("TEST" + i)); + SessionID sessionID = getSessionIDForClient(i); - testAcceptorApplication.setMessageLatch(new CountDownLatch(1)); - Session.sendToTarget(message, sessionID); + testAcceptorApplication.setMessageLatch(new CountDownLatch(1)); + Session.sendToTarget(message, sessionID); - testAcceptorApplication.waitForMessages(); - testAcceptorApplication.assertTestRequestOnSession("TEST" + i, sessionID); + testAcceptorApplication.waitForMessages(); + testAcceptorApplication.assertTestRequestOnSession("TEST" + i, sessionID); + } catch (SessionNotFound se) + { + log.error(se.getMessage(), se); + } catch (FieldNotFound fe) + { + log.error(fe.getMessage(), fe); + } } private SessionID getSessionIDForClient(int i) { @@ -197,7 +206,7 @@ private void configureInitiatorForSession(SessionSettings settings, int i, int port) { SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "INITIATOR", "ACCEPTOR-" + i); - settings.setString(sessionID, "SocketConnectProtocol", TransportType.VM_PIPE.toString()); + settings.setString(sessionID, "SocketConnectProtocol",ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(sessionID, "SocketConnectHost", "127.0.0.1"); settings.setString(sessionID, "SocketConnectPort", Integer.toString(port)); } @@ -226,7 +235,7 @@ private void configureAcceptorForSession(SessionSettings settings, int i, int port) { SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "ACCEPTOR-" + i, "INITIATOR"); - settings.setString(sessionID, "SocketAcceptProtocol", TransportType.VM_PIPE.toString()); + settings.setString(sessionID, "SocketAcceptProtocol", ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(sessionID, "SocketAcceptPort", Integer.toString(port)); } } Index: trunk/core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java =================================================================== --- trunk/core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java (revision 1070) +++ trunk/core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java (working copy) @@ -26,19 +26,20 @@ import junit.framework.Assert; import junit.framework.TestResult; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import quickfix.mina.ProtocolFactory; public class ConnectToServerStep implements TestStep { private Logger log = LoggerFactory.getLogger(getClass()); private static final Pattern CONNECT_PATTERN = Pattern.compile("i(\\d+)*,?CONNECT"); private String command; private int clientId = 0; - private TransportType transportType = TransportType.SOCKET; + private int transportType = ProtocolFactory.SOCKET; private final int port; - public ConnectToServerStep(String command, TransportType transportType, int port) { + public ConnectToServerStep(String command, int transportType, int port) { this.command = command; this.transportType = transportType; this.port = port; Index: trunk/core/src/test/java/quickfix/test/acceptance/TestConnection.java =================================================================== --- trunk/core/src/test/java/quickfix/test/acceptance/TestConnection.java (revision 1070) +++ trunk/core/src/test/java/quickfix/test/acceptance/TestConnection.java (working copy) @@ -32,23 +32,22 @@ import junit.framework.Assert; -import org.apache.mina.common.CloseFuture; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.future.CloseFuture; +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.service.IoConnector; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import quickfix.mina.ProtocolFactory; import quickfix.mina.message.FIXProtocolCodecFactory; public class TestConnection { - private static HashMap connectors = new HashMap(); + private static HashMap connectors = new HashMap(); private Logger log = LoggerFactory.getLogger(getClass()); private HashMap ioHandlers = new HashMap(); @@ -80,24 +79,24 @@ getIoHandler(clientId).waitForDisconnect(); } - public void connect(int clientId, TransportType transportType, int port) + public void connect(int clientId, int transportType, int port) throws UnknownHostException, IOException { - IoConnector connector = connectors.get(transportType); + IoConnector connector = connectors.get(Integer.toString(clientId)); if (connector == null) { - if (transportType == TransportType.SOCKET) { - connector = new SocketConnector(); - } else if (transportType == TransportType.VM_PIPE) { + if (transportType == ProtocolFactory.SOCKET) { + connector = new NioSocketConnector(); + } else if (transportType == ProtocolFactory.VM_PIPE) { connector = new VmPipeConnector(); } else { throw new RuntimeException("Unsupported transport type: " + transportType); } - connectors.put(transportType, connector); + connectors.put(Integer.toString(clientId), connector); } SocketAddress address; - if (transportType == TransportType.SOCKET) { + if (transportType == ProtocolFactory.SOCKET) { address = new InetSocketAddress("localhost", port); - } else if (transportType == TransportType.VM_PIPE) { + } else if (transportType == ProtocolFactory.VM_PIPE) { address = new VmPipeAddress(port); } else { throw new RuntimeException("Unsupported transport type: " + transportType); @@ -106,7 +105,8 @@ TestIoHandler testIoHandler = new TestIoHandler(); synchronized (ioHandlers) { ioHandlers.put(Integer.valueOf(clientId), testIoHandler); - ConnectFuture future = connector.connect(address, testIoHandler); + connector.setHandler(testIoHandler); + ConnectFuture future = connector.connect(address); future.join(); Assert.assertTrue("connection to server failed", future.isConnected()); } @@ -159,4 +159,4 @@ } } } -} \ No newline at end of file +} Index: trunk/core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java =================================================================== --- trunk/core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java (revision 1070) +++ trunk/core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java (working copy) @@ -17,13 +17,13 @@ import junit.framework.TestResult; import junit.framework.TestSuite; -import org.apache.mina.common.TransportType; import org.apache.mina.util.AvailablePortFinder; import org.logicalcobwebs.proxool.ProxoolException; import org.logicalcobwebs.proxool.ProxoolFacade; import org.logicalcobwebs.proxool.admin.SnapshotIF; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import quickfix.mina.ProtocolFactory; import quickfix.Session; @@ -35,7 +35,7 @@ private static final String acceptanceTestResourcePath = "quickfix/test/acceptance/definitions/"; private static final String acceptanceTestBaseDir; - private static TransportType transportType = TransportType.SOCKET; + private static int transportType = ProtocolFactory.SOCKET; private static int port = 9887; private boolean skipSlowTests; @@ -122,7 +122,7 @@ } else if (line.startsWith("E")) { steps.add(new ExpectMessageStep(line)); } else if (line.matches("^i\\d*,?CONNECT")) { - steps.add(new ConnectToServerStep(line, transportType, port)); + steps.add(new ConnectToServerStep(line,transportType, port)); } else if (line.matches("^iSET_SESSION.*")) { steps.add(new ConfigureSessionStep(line)); } else if (line.matches("^e\\d*,?DISCONNECT")) { @@ -246,8 +246,7 @@ } public static Test suite() { - transportType = TransportType - .getInstance(System.getProperty(ATEST_TRANSPORT_KEY, "SOCKET")); + transportType = ProtocolFactory.getTransportType(System.getProperty(ATEST_TRANSPORT_KEY, ProtocolFactory.getTypeString(ProtocolFactory.SOCKET))); port = AvailablePortFinder.getNextAvailable(port); TestSuite acceptanceTests = new TestSuite(); //default server @@ -275,4 +274,4 @@ public Map getOverridenProperties() { return overridenProperties; } -} \ No newline at end of file +} Index: trunk/core/src/test/java/quickfix/test/acceptance/ATServer.java =================================================================== --- trunk/core/src/test/java/quickfix/test/acceptance/ATServer.java (revision 1070) +++ trunk/core/src/test/java/quickfix/test/acceptance/ATServer.java (working copy) @@ -30,8 +30,8 @@ import junit.framework.Assert; import junit.framework.TestSuite; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +47,7 @@ import quickfix.ThreadedSocketAcceptor; import quickfix.mina.acceptor.AbstractSocketAcceptor; import quickfix.mina.ssl.SSLSupport; +import quickfix.mina.ProtocolFactory; public class ATServer implements Runnable { private final Logger log = LoggerFactory.getLogger(ATServer.class); @@ -57,7 +58,7 @@ private boolean resetOnDisconnect; private boolean usingMemoryStore; private AbstractSocketAcceptor acceptor; - private TransportType transportType = TransportType.SOCKET; + private int transportType = ProtocolFactory.SOCKET; private int port = 9877; private boolean threaded; private IoFilterChainBuilder ioFilterChainBuilder; @@ -70,11 +71,11 @@ // defaults } - public ATServer(TestSuite suite, boolean threaded, TransportType transportType, int port) { + public ATServer(TestSuite suite, boolean threaded, int transportType, int port) { this(suite, threaded, transportType, port, null); } - public ATServer(TestSuite suite, boolean threaded, TransportType transportType, int port, Map overridenProperties) { + public ATServer(TestSuite suite, boolean threaded, int transportType, int port, Map overridenProperties) { this.threaded = threaded; this.overridenProperties = overridenProperties; this.transportType = transportType; @@ -91,7 +92,7 @@ try { HashMap defaults = new HashMap(); defaults.put("ConnectionType", "acceptor"); - defaults.put("SocketAcceptProtocol", transportType.toString()); + defaults.put("SocketAcceptProtocol", ProtocolFactory.getTypeString(transportType)); defaults.put("SocketAcceptPort", Integer.toString(port)); defaults.put("SocketTcpNoDelay", "Y"); defaults.put("StartTime", "00:00:00"); @@ -102,6 +103,8 @@ defaults.put("JdbcURL", "jdbc:mysql://localhost/quickfix"); defaults.put("JdbcUser", "quickfixj"); defaults.put("JdbcPassword", "quickfixj"); + + if (useSSL) { defaults.put("SocketUseSSL", "Y"); } @@ -249,4 +252,4 @@ public void setKeyStorePassword(String keyStorePassword) { this.keyStorePassword = keyStorePassword; } -} \ No newline at end of file +} Index: trunk/core/src/test/java/quickfix/SocketAcceptorTest.java =================================================================== --- trunk/core/src/test/java/quickfix/SocketAcceptorTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/SocketAcceptorTest.java (working copy) @@ -25,10 +25,11 @@ import junit.framework.TestCase; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.service.TransportMetadata; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import quickfix.mina.ProtocolFactory; /** * QFJ-643: Unable to restart a stopped acceptor (SocketAcceptor) @@ -120,8 +121,7 @@ defaults.put("StartTime", "00:00:00"); defaults.put("EndTime", "00:00:00"); defaults.put("BeginString", "FIX.4.2"); - settings.setString(acceptorSessionID, "SocketAcceptProtocol", - TransportType.VM_PIPE.toString()); + settings.setString(acceptorSessionID, "SocketAcceptProtocol",ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(acceptorSessionID, "SocketAcceptPort", "10000"); settings.set(defaults); @@ -142,8 +142,7 @@ defaults.put("FileStorePath", "output/data/client"); defaults.put("ValidateUserDefinedFields", "Y"); settings.setString("BeginString", FixVersions.BEGINSTRING_FIX42); - settings.setString(initiatorSessionID, "SocketConnectProtocol", - TransportType.VM_PIPE.toString()); + settings.setString(initiatorSessionID, "SocketConnectProtocol",ProtocolFactory.getTypeString(ProtocolFactory.VM_PIPE)); settings.setString(initiatorSessionID, "SocketConnectHost", "127.0.0.1"); settings.setString(initiatorSessionID, "SocketConnectPort", "10000"); settings.set(defaults); Index: trunk/core/src/test/java/quickfix/SocketInitiatorTest.java =================================================================== --- trunk/core/src/test/java/quickfix/SocketInitiatorTest.java (revision 1070) +++ trunk/core/src/test/java/quickfix/SocketInitiatorTest.java (working copy) @@ -25,19 +25,19 @@ import junit.framework.TestCase; -import org.apache.mina.common.IoFilterAdapter; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.filterchain.IoFilterAdapter; +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import quickfix.test.acceptance.ATServer; +import quickfix.mina.ProtocolFactory; public class SocketInitiatorTest extends TestCase { private final Logger log = LoggerFactory.getLogger(getClass()); - private final TransportType transportProtocol = TransportType.SOCKET; protected void setUp() throws Exception { SystemTime.setTimeSource(null); @@ -179,7 +179,7 @@ SessionSettings settings = new SessionSettings(); HashMap defaults = new HashMap(); defaults.put("ConnectionType", "initiator"); - defaults.put("SocketConnectProtocol", transportProtocol.toString()); + defaults.put("SocketConnectProtocol", ProtocolFactory.getTypeString(ProtocolFactory.SOCKET)); defaults.put("SocketConnectHost", "localhost"); defaults.put("SocketConnectPort", "9877"); defaults.put("StartTime", "00:00:00"); Index: trunk/core/src/main/java/quickfix/mina/NetworkingOptions.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/NetworkingOptions.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/NetworkingOptions.java (working copy) @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Properties; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.IoSessionConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.session.IoSessionConfig; +import org.apache.mina.transport.socket.SocketSessionConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Index: trunk/core/src/main/java/quickfix/mina/message/FIXMessageDecoder.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/message/FIXMessageDecoder.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/message/FIXMessageDecoder.java (working copy) @@ -28,8 +28,9 @@ import java.util.ArrayList; import java.util.List; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; @@ -108,14 +109,14 @@ resetState(); } - public MessageDecoderResult decodable(IoSession session, ByteBuffer in) { + public MessageDecoderResult decodable(IoSession session, IoBuffer in) { BufPos bufPos = indexOf(in, in.position(), HEADER_PATTERN); int headerOffset = bufPos._offset; return headerOffset != -1 ? MessageDecoderResult.OK : (in.remaining() > MAX_UNDECODED_DATA_LENGTH ? MessageDecoderResult.NOT_OK : MessageDecoderResult.NEED_DATA); } - public MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) + public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws ProtocolCodecException { int messageCount = 0; while (parseMessage(in, out)) { @@ -139,7 +140,7 @@ * error has occurred. Otherwise, MINA will compact the buffer and we lose * data. */ - private boolean parseMessage(ByteBuffer in, ProtocolDecoderOutput out) + private boolean parseMessage(IoBuffer in, ProtocolDecoderOutput out) throws ProtocolCodecException { try { boolean messageFound = false; @@ -246,20 +247,20 @@ } } - private int remaining(ByteBuffer in) { + private int remaining(IoBuffer in) { return in.limit() - position; } - private String getBufferDebugInfo(ByteBuffer in) { + private String getBufferDebugInfo(IoBuffer in) { return "pos=" + in.position() + ",lim=" + in.limit() + ",rem=" + in.remaining() + ",offset=" + position + ",state=" + state; } - private byte get(ByteBuffer in) { + private byte get(IoBuffer in) { return in.get(position++); } - private boolean hasRemaining(ByteBuffer in) { + private boolean hasRemaining(IoBuffer in) { return position < in.limit(); } @@ -273,13 +274,13 @@ return len; } - private String getMessageString(ByteBuffer buffer) throws UnsupportedEncodingException { + private String getMessageString(IoBuffer buffer) throws UnsupportedEncodingException { byte[] data = new byte[position - buffer.position()]; buffer.get(data); return new String(data, charsetEncoding); } - private String getMessageStringForError(ByteBuffer buffer) throws UnsupportedEncodingException { + private String getMessageStringForError(IoBuffer buffer) throws UnsupportedEncodingException { int initialPosition = buffer.position(); byte[] data = new byte[buffer.limit() - initialPosition]; buffer.get(data); @@ -287,7 +288,7 @@ return new String(data, charsetEncoding); } - private void handleError(ByteBuffer buffer, int recoveryPosition, String text, + private void handleError(IoBuffer buffer, int recoveryPosition, String text, boolean disconnect) throws ProtocolCodecException { buffer.position(recoveryPosition); position = recoveryPosition; @@ -300,12 +301,12 @@ } } - private boolean isLogon(ByteBuffer buffer) { + private boolean isLogon(IoBuffer buffer) { BufPos bufPos = indexOf(buffer, buffer.position(), LOGON_PATTERN); return bufPos._offset != -1; } - private static BufPos indexOf(ByteBuffer buffer, int position, byte[] data) { + private static BufPos indexOf(IoBuffer buffer, int position, byte[] data) { for (int offset = position, limit = buffer.limit() - minMaskLength(data) + 1; offset < limit; offset++) { int length; if (buffer.get(offset) == data[0] && (length = startsWith(buffer, offset, data)) > 0) { @@ -324,7 +325,7 @@ * @param data * @return */ - private static int startsWith(ByteBuffer buffer, int bufferOffset, byte[] data) { + private static int startsWith(IoBuffer buffer, int bufferOffset, byte[] data) { if (bufferOffset + minMaskLength(data) > buffer.limit()) { return -1; } @@ -403,13 +404,13 @@ MappedByteBuffer memoryMappedBuffer = readOnlyChannel.map(FileChannel.MapMode.READ_ONLY, 0, (int) readOnlyChannel.size()); - decode(null, ByteBuffer.wrap(memoryMappedBuffer), new ProtocolDecoderOutput() { + decode(null, IoBuffer.wrap(memoryMappedBuffer), new ProtocolDecoderOutput() { public void write(Object message) { listener.onMessage((String) message); } - public void flush() { + public void flush(IoFilter.NextFilter nextFilter, IoSession ioSession) { // ignored } Index: trunk/core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/message/FIXMessageEncoder.java (working copy) @@ -24,8 +24,8 @@ import java.util.HashSet; import java.util.Set; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; @@ -53,7 +53,7 @@ charsetEncoding = CharsetSupport.getCharset(); } - public Set> getMessageTypes() { + public static Set> getMessageTypes() { return TYPES; } @@ -71,7 +71,7 @@ } } - ByteBuffer buffer = ByteBuffer.allocate(fixMessageString.length()); + IoBuffer buffer = IoBuffer.allocate(fixMessageString.length()); try { buffer.put(fixMessageString.getBytes(charsetEncoding)); } catch (UnsupportedEncodingException e) { Index: trunk/core/src/main/java/quickfix/mina/message/FIXProtocolCodecFactory.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/message/FIXProtocolCodecFactory.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/message/FIXProtocolCodecFactory.java (working copy) @@ -28,7 +28,7 @@ public static final String FILTER_NAME = "FIXCodec"; public FIXProtocolCodecFactory() { - super.register(FIXMessageDecoder.class); - super.register(FIXMessageEncoder.class); + addMessageDecoder(FIXMessageDecoder.class); + addMessageEncoder(FIXMessageEncoder.getMessageTypes(),FIXMessageEncoder.class); } } Index: trunk/core/src/main/java/quickfix/mina/CompositeIoFilterChainBuilder.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/CompositeIoFilterChainBuilder.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/CompositeIoFilterChainBuilder.java (working copy) @@ -19,9 +19,9 @@ package quickfix.mina; -import org.apache.mina.common.DefaultIoFilterChainBuilder; -import org.apache.mina.common.IoFilterChain; -import org.apache.mina.common.IoFilterChainBuilder; +import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; /** * (For internal usage only.) This is the filter that initializes the FIX Index: trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorSessionProvider.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorSessionProvider.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorSessionProvider.java (working copy) @@ -37,4 +37,5 @@ * associated with the given ID. */ Session getSession(SessionID sessionID, SessionConnector connector); + } Index: trunk/core/src/main/java/quickfix/mina/acceptor/DynamicAcceptorSessionProvider.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/acceptor/DynamicAcceptorSessionProvider.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/acceptor/DynamicAcceptorSessionProvider.java (working copy) @@ -39,7 +39,12 @@ import quickfix.SessionID; import quickfix.SessionSettings; import quickfix.mina.SessionConnector; +import quickfix.LogUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Dynamically defines sessions for an acceptor. This can be useful for * applications like simulators that want to accept any connection and @@ -50,6 +55,8 @@ */ public class DynamicAcceptorSessionProvider implements AcceptorSessionProvider { public static final String WILDCARD = "*"; + private final static Logger log = LoggerFactory.getLogger(LogUtil.class); + private static final SessionID ANY_SESSION = new SessionID(WILDCARD, WILDCARD, WILDCARD, WILDCARD, WILDCARD, WILDCARD, WILDCARD, null); @@ -154,7 +161,7 @@ } catch (ConfigError e) { throw new QFJException(e); } - } + } return s; } Index: trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/acceptor/AcceptorIoHandler.java (working copy) @@ -22,8 +22,9 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Map; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.session.IoSession; import quickfix.Log; import quickfix.Message; @@ -52,6 +53,7 @@ @Override public void sessionCreated(IoSession session) throws Exception { + log.info("MINA session open : " + session.getRemoteAddress()); super.sessionCreated(session); log.info("MINA session created: " + session.getRemoteAddress()); } @@ -62,14 +64,14 @@ if (qfSession == null) { if (message.getHeader().getString(MsgType.FIELD).equals(MsgType.LOGON)) { final SessionID sessionID = MessageUtils.getReverseSessionID(message); - qfSession = sessionProvider.getSession(sessionID, - eventHandlingStrategy.getSessionConnector()); + qfSession = sessionProvider.getSession(sessionID,eventHandlingStrategy.getSessionConnector()); if (qfSession != null) { final Log sessionLog = qfSession.getLog(); if (qfSession.hasResponder()) { // Session is already bound to another connection sessionLog .onErrorEvent("Multiple logons/connections for this session are not allowed"); + log.info("Multiple logons - protocolSession.close"); protocolSession.close(); return; } Index: trunk/core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java (working copy) @@ -11,7 +11,7 @@ * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A * PARTICULAR PURPOSE. * - * See http://www.quickfixengine.org/LICENSE for licensing information. + * See http://www.quickfixengine.org/LICENSE for licensing information.s * * Contact ask@quickfixengine.org if any conditions of this licensing * are not clear to you. @@ -29,15 +29,13 @@ import javax.net.ssl.SSLContext; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.common.TransportType; -import org.apache.mina.filter.SSLFilter; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.service.IoAcceptor; +import org.apache.mina.core.service.IoService; +import org.apache.mina.core.buffer.SimpleBufferAllocator; +import org.apache.mina.core.service.TransportMetadata; +import org.apache.mina.filter.ssl.SslFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import quickfix.Acceptor; import quickfix.Application; @@ -69,13 +67,12 @@ private final Map sessionProviders = new HashMap(); private final SessionFactory sessionFactory; private final Map socketDescriptorForAddress = new HashMap(); - private final Map ioAcceptorForTransport = new HashMap(); + private final Map ioAcceptors = new HashMap(); protected AbstractSocketAcceptor(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { super(settings, sessionFactory); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - ByteBuffer.setUseDirectBuffers(false); + IoBuffer.setAllocator(new SimpleBufferAllocator()); this.sessionFactory = sessionFactory; } @@ -95,6 +92,7 @@ // TODO SYNC Does this method really need synchronization? protected synchronized void startAcceptingConnections() throws ConfigError { + SocketAddress s = null; try { createSessions(getSettings()); startSessionTimer(); @@ -102,14 +100,11 @@ Iterator descriptors = socketDescriptorForAddress.values().iterator(); while (descriptors.hasNext()) { - AcceptorSocketDescriptor socketDescriptor = descriptors - .next(); - IoAcceptor ioAcceptor = getIoAcceptor(socketDescriptor.getAddress()); - IoServiceConfig serviceConfig = ioAcceptor.getDefaultConfig(); + AcceptorSocketDescriptor socketDescriptor = descriptors.next(); + s=socketDescriptor.getAddress(); + IoAcceptor ioAcceptor = getIoAcceptor(socketDescriptor); + CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder(getIoFilterChainBuilder()); - CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder( - getIoFilterChainBuilder()); - if (socketDescriptor.isUseSSL()) { installSSL(socketDescriptor, ioFilterChainBuilder); } @@ -117,28 +112,15 @@ ioFilterChainBuilder.addLast(FIXProtocolCodecFactory.FILTER_NAME, new ProtocolCodecFilter(new FIXProtocolCodecFactory())); - serviceConfig.setFilterChainBuilder(ioFilterChainBuilder); - serviceConfig.setThreadModel(ThreadModel.MANUAL); - - AcceptorSessionProvider sessionProvider = sessionProviders - .get(socketDescriptor.getAddress()); - if (sessionProvider == null) { - sessionProvider = new DefaultAcceptorSessionProvider(socketDescriptor - .getAcceptedSessions()); - } - - if (serviceConfig instanceof SocketAcceptorConfig) { - ((SocketAcceptorConfig)serviceConfig).setDisconnectOnUnbind(false); - } - - ioAcceptor.bind(socketDescriptor.getAddress(), new AcceptorIoHandler( - sessionProvider, new NetworkingOptions(settings.getDefaultProperties()), - getEventHandlingStrategy())); + ioAcceptor.setFilterChainBuilder(ioFilterChainBuilder); + ioAcceptor.setCloseOnDeactivation(false); + ioAcceptor.bind(socketDescriptor.getAddress()); log.info("Listening for connections at " + socketDescriptor.getAddress()); } } catch (FieldConvertError e) { throw new ConfigError(e); } catch (Exception e) { + System.err.println("Cannot start accept session for "+s+" error:"+e); throw new RuntimeError(e); } } @@ -148,28 +130,47 @@ log.info("Installing SSL filter for " + descriptor.getAddress()); SSLContext sslContext = SSLContextFactory.getInstance(descriptor.getKeyStoreName(), descriptor.getKeyStorePassword().toCharArray()); - SSLFilter sslFilter = new SSLFilter(sslContext); + SslFilter sslFilter = new SslFilter(sslContext); sslFilter.setUseClientMode(false); ioFilterChainBuilder.addLast(SSLSupport.FILTER_NAME, sslFilter); } - private IoAcceptor getIoAcceptor(SocketAddress address) { - TransportType transportType = ProtocolFactory.getAddressTransportType(address); - IoAcceptor ioAcceptor = ioAcceptorForTransport.get(transportType); - if (ioAcceptor == null) { + private IoAcceptor getIoAcceptor(AcceptorSocketDescriptor socketDescriptor, boolean init) throws ConfigError { + int transportType = ProtocolFactory.getAddressTransportType(socketDescriptor.getAddress()); + + AcceptorSessionProvider sessionProvider = sessionProviders.get(socketDescriptor.getAddress()); + if (sessionProvider == null) { + sessionProvider = new DefaultAcceptorSessionProvider(socketDescriptor.getAcceptedSessions()); + sessionProviders.put(socketDescriptor.getAddress(),sessionProvider); + } + + IoAcceptor ioAcceptor = ioAcceptors.get(socketDescriptor); + if (ioAcceptor == null && init) { ioAcceptor = ProtocolFactory.createIoAcceptor(transportType); - ioAcceptorForTransport.put(transportType, ioAcceptor); + try { + SessionSettings settings = getSettings(); + ioAcceptor.setHandler(new AcceptorIoHandler(sessionProvider, new NetworkingOptions(settings.getDefaultProperties()), getEventHandlingStrategy())); + } catch (FieldConvertError e) { + throw new ConfigError(e); + } + ioAcceptors.put(socketDescriptor, ioAcceptor); } return ioAcceptor; } + + + private IoAcceptor getIoAcceptor(AcceptorSocketDescriptor socketDescriptor) throws ConfigError { + return getIoAcceptor(socketDescriptor,true); + + } private AcceptorSocketDescriptor getAcceptorSocketDescriptor(SessionSettings settings, SessionID sessionID) throws ConfigError, FieldConvertError { - TransportType acceptTransportType = TransportType.SOCKET; + int acceptTransportType = ProtocolFactory.SOCKET; + if (settings.isSetting(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL)) { try { - acceptTransportType = TransportType.getInstance(settings.getString(sessionID, - Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL)); + acceptTransportType = ProtocolFactory.getTransportType(settings.getString(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PROTOCOL)); } catch (IllegalArgumentException e) { // Unknown transport type throw new ConfigError(e); @@ -181,7 +182,7 @@ String keyStorePassword = null; if (getSettings().isSetting(sessionID, SSLSupport.SETTING_USE_SSL) && getSettings().getBool(sessionID, SSLSupport.SETTING_USE_SSL)) { - if (acceptTransportType == TransportType.SOCKET) { + if (acceptTransportType == ProtocolFactory.SOCKET) { useSSL = true; keyStoreName = SSLSupport.getKeystoreName(getSettings(), sessionID); keyStorePassword = SSLSupport.getKeystorePasswd(getSettings(), sessionID); @@ -202,8 +203,8 @@ acceptHost, acceptPort); // Check for cached descriptor - AcceptorSocketDescriptor descriptor = socketDescriptorForAddress - .get(acceptorAddress); + AcceptorSocketDescriptor descriptor = socketDescriptorForAddress.get(acceptorAddress); + if (descriptor != null) { if (descriptor.isUseSSL() && !useSSL || !equals(descriptor.getKeyStoreName(), keyStoreName) @@ -232,6 +233,7 @@ SessionFactory.SETTING_CONNECTION_TYPE); boolean isTemplate = false; + if (settings.isSetting(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE)) { isTemplate = settings.getBool(sessionID, Acceptor.SETTING_ACCEPTOR_TEMPLATE); } @@ -252,19 +254,28 @@ } } - protected void stopAcceptingConnections() { - Iterator descriptors = socketDescriptorForAddress.values().iterator(); - while (descriptors.hasNext()) { - AcceptorSocketDescriptor socketDescriptor = descriptors - .next(); - SocketAddress acceptorSocketAddress = socketDescriptor.getAddress(); + protected synchronized void stopAcceptingConnections() throws ConfigError { + Iterator ioIt = ioAcceptors.values().iterator(); + while (ioIt.hasNext()) { + IoAcceptor ioAcceptor = ioIt.next(); + SocketAddress acceptorSocketAddress = ioAcceptor.getLocalAddress(); + Iterator i = ioAcceptor.getLocalAddresses().iterator(); + while(i.hasNext()) + { + System.out.println("bind listening on "+i.next()); + } + ioAcceptor.unbind(); + while(i.hasNext()) + { + System.out.println("bind listening after unbind on "+i.next()); + } + + //acceptorSocketAddress = ioAcceptor.getLocalAddress(); + //ioAcceptor.unbind(acceptorSocketAddress); log.info("No longer accepting connections on " + acceptorSocketAddress); - IoAcceptor ioAcceptor = getIoAcceptor(acceptorSocketAddress); - if (ioAcceptor.isManaged(acceptorSocketAddress)) { - ioAcceptor.unbind(acceptorSocketAddress); - } + ioIt.remove(); } - ioAcceptorForTransport.clear(); + log.info("ioAcceptors size : "+ioAcceptors.size()); } private static class AcceptorSocketDescriptor { @@ -309,7 +320,7 @@ } public Collection getEndpoints() { - return ioAcceptorForTransport.values(); + return ioAcceptors.values(); } public Map getAcceptorAddresses() { @@ -357,12 +368,15 @@ { this.acceptorSessions = acceptorSessions; } - + public Session getSession(SessionID sessionID, SessionConnector ignored) { Session session = acceptorSessions.get(sessionID); if(session == null) - session = acceptorSessions.get(reduceSessionID(sessionID)); + { + SessionID reduced = reduceSessionID(sessionID); + session = acceptorSessions.get(reduced); + } return session; } Index: trunk/core/src/main/java/quickfix/mina/ssl/SSLSupport.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/ssl/SSLSupport.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/ssl/SSLSupport.java (working copy) @@ -27,7 +27,7 @@ public class SSLSupport { // This will be moved else when settings mechanism is refactored. - public static final String FILTER_NAME = "SSLFilter"; + public static final String FILTER_NAME = "SslFilter"; public static final String SETTING_KEY_STORE_PWD = "SocketKeyStorePassword"; public static final String SETTING_KEY_STORE_NAME = "SocketKeyStore"; public static final String SETTING_USE_SSL = "SocketUseSSL"; Index: trunk/core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java (working copy) @@ -28,9 +28,9 @@ import java.util.Map; import java.util.Set; -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.common.TransportType; +import org.apache.mina.core.buffer.IoBuffer; +import org.apache.mina.core.buffer.SimpleBufferAllocator; +import org.apache.mina.core.service.TransportMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +71,7 @@ protected AbstractSocketInitiator(SessionSettings settings, SessionFactory sessionFactory) throws ConfigError { super(settings, sessionFactory); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - ByteBuffer.setUseDirectBuffers(false); + IoBuffer.setAllocator(new SimpleBufferAllocator()); } protected void createSessionInitiators() @@ -137,7 +136,7 @@ if (settings.isSetting(sessionID, Initiator.SETTING_SOCKET_LOCAL_PORT)) { port = (int) settings.getLong(sessionID, Initiator.SETTING_SOCKET_LOCAL_PORT); } - localAddress = ProtocolFactory.createSocketAddress(TransportType.SOCKET, host, port); + localAddress = ProtocolFactory.createSocketAddress(ProtocolFactory.SOCKET, host, port); if (log.isInfoEnabled()) { log.info("Using initiator local host: " + localAddress); } @@ -203,16 +202,16 @@ + (index == 0 ? "" : Integer.toString(index)); final String portKey = Initiator.SETTING_SOCKET_CONNECT_PORT + (index == 0 ? "" : Integer.toString(index)); - TransportType transportType = TransportType.SOCKET; + int transportType = ProtocolFactory.SOCKET; if (settings.isSetting(sessionID, protocolKey)) { try { - transportType = TransportType.getInstance(settings.getString(sessionID, - protocolKey)); + transportType = ProtocolFactory.getTransportType(settings.getString(sessionID, protocolKey)); } catch (final IllegalArgumentException e) { // Unknown transport type throw new ConfigError(e); } } + if (settings.isSetting(sessionID, portKey)) { String host; if (!isHostRequired(transportType)) { @@ -233,8 +232,8 @@ return addresses.toArray(new SocketAddress[addresses.size()]); } - private boolean isHostRequired(TransportType transportType) { - return transportType != TransportType.VM_PIPE; + private boolean isHostRequired(int transportType) { + return transportType != ProtocolFactory.VM_PIPE; } private boolean isInitiatorSession(Object sectionKey) throws ConfigError, FieldConvertError { Index: trunk/core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/initiator/InitiatorIoHandler.java (working copy) @@ -19,7 +19,7 @@ package quickfix.mina.initiator; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.session.IoSession; import quickfix.Message; import quickfix.Session; Index: trunk/core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java (working copy) @@ -28,13 +28,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.ThreadModel; -import org.apache.mina.filter.SSLFilter; +import org.apache.mina.core.future.ConnectFuture; +import org.apache.mina.core.service.IoConnector; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; +import org.apache.mina.core.service.IoService; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.ssl.SslFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,22 +115,20 @@ userIoFilterChainBuilder); if (sslEnabled) { - installSSLFilter(ioFilterChainBuilder); + installSslFilter(ioFilterChainBuilder); } ioFilterChainBuilder.addLast(FIXProtocolCodecFactory.FILTER_NAME, new ProtocolCodecFilter(new FIXProtocolCodecFactory())); - IoServiceConfig serviceConfig = ioConnector.getDefaultConfig(); - serviceConfig.setFilterChainBuilder(ioFilterChainBuilder); - serviceConfig.setThreadModel(ThreadModel.MANUAL); + ioConnector.setFilterChainBuilder(ioFilterChainBuilder); ioHandler = new InitiatorIoHandler(fixSession, networkingOptions, eventHandlingStrategy); } - private void installSSLFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder) + private void installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder) throws GeneralSecurityException { - SSLFilter sslFilter = new SSLFilter(SSLContextFactory.getInstance(keyStoreName, + SslFilter sslFilter = new SslFilter(SSLContextFactory.getInstance(keyStoreName, keyStorePassword.toCharArray())); if(enableProtocole != null)sslFilter.setEnabledProtocols(enableProtocole); if(cipherSuites != null) sslFilter.setEnabledCipherSuites(cipherSuites); @@ -152,12 +149,13 @@ private void connect() { lastReconnectAttemptTime = SystemTime.currentTimeMillis(); SocketAddress nextSocketAddress = getNextSocketAddress(); + ioConnector.setHandler(ioHandler); try { if (localAddress == null) { - connectFuture = ioConnector.connect(nextSocketAddress, ioHandler); + connectFuture = ioConnector.connect(nextSocketAddress); } else { //QFJ-482 - connectFuture = ioConnector.connect(nextSocketAddress, localAddress, ioHandler); + connectFuture = ioConnector.connect(nextSocketAddress, localAddress); } pollConnectFuture(); } catch (Throwable e) { Index: trunk/core/src/main/java/quickfix/mina/AbstractIoHandler.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/AbstractIoHandler.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/AbstractIoHandler.java (working copy) @@ -23,8 +23,8 @@ import java.io.IOException; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecException; import org.apache.mina.filter.codec.ProtocolDecoderException; import org.slf4j.Logger; @@ -89,12 +89,19 @@ } public void sessionClosed(IoSession ioSession) throws Exception { - Session quickFixSession = findQFSession(ioSession); - if (quickFixSession != null) { - ioSession.removeAttribute(SessionConnector.QF_SESSION); - if (quickFixSession.hasResponder()) { - quickFixSession.disconnect("IO Session closed", false); + try { + Session quickFixSession = findQFSession(ioSession); + if (quickFixSession != null) { + ioSession.removeAttribute(SessionConnector.QF_SESSION); + if (quickFixSession.hasResponder()) { + quickFixSession.disconnect("IO Session closed", false); + } } + ioSession.close(); + } catch (Exception e) + { + ioSession.close(); + throw e; } } Index: trunk/core/src/main/java/quickfix/mina/ProtocolFactory.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/ProtocolFactory.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/ProtocolFactory.java (working copy) @@ -22,11 +22,11 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.TransportType; -import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.core.service.IoAcceptor; +import org.apache.mina.core.service.IoConnector; +import org.apache.mina.core.service.TransportMetadata; +import org.apache.mina.transport.socket.nio.NioSocketAcceptor; +import org.apache.mina.transport.socket.nio.NioSocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; @@ -40,32 +40,65 @@ */ public class ProtocolFactory { - public static SocketAddress createSocketAddress(TransportType transportType, String host, + public final static int SOCKET = 0; + public final static int VM_PIPE = 1; + public final static int PROXY = 2; + + public static String getTypeString(int type) { + switch(type) + { + case SOCKET: + return "SOCKET"; + case VM_PIPE: + return "VM_PIPE"; + case PROXY: + return "PROXY"; + default: + return "unknown"; + } + } + + + public static SocketAddress createSocketAddress(int transportType, String host, int port) throws ConfigError { - if (transportType == TransportType.SOCKET) { + if (transportType == SOCKET) { return host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(port); - } else if (transportType == TransportType.VM_PIPE) { + } else if (transportType == VM_PIPE) { return new VmPipeAddress(port); } else { throw new ConfigError("Unknown session transport type: " + transportType); } } - public static TransportType getAddressTransportType(SocketAddress address) { + public static int getAddressTransportType(SocketAddress address) { if (address instanceof InetSocketAddress) { - return TransportType.SOCKET; + return SOCKET; } else if (address instanceof VmPipeAddress) { - return TransportType.VM_PIPE; + return VM_PIPE; } else { throw new RuntimeError("Unknown address type: " + address.getClass().getName()); } } - public static IoAcceptor createIoAcceptor(TransportType transportType) { - if (transportType == TransportType.SOCKET) { - return new SocketAcceptor(); - } else if (transportType == TransportType.VM_PIPE) { + public static int getTransportType(String string) { + if (string.equalsIgnoreCase("tcp") || string.equalsIgnoreCase("SOCKET")) { + return SOCKET; + } else if (string.equalsIgnoreCase("VM_PIPE")) { + return VM_PIPE; + } else if (string.equalsIgnoreCase("PROXY")) { + return PROXY; + } else { + throw new RuntimeError("Unknown Transport Type type: " + string); + } + } + + public static IoAcceptor createIoAcceptor(int transportType) { + if (transportType == SOCKET) { + NioSocketAcceptor ret= new NioSocketAcceptor(); + ret.setReuseAddress(true); + return ret; + } else if (transportType == VM_PIPE) { return new VmPipeAcceptor(); } else { throw new RuntimeError("Unsupported transport type: " + transportType); @@ -74,7 +107,7 @@ public static IoConnector createIoConnector(SocketAddress address) throws ConfigError { if (address instanceof InetSocketAddress) { - return new SocketConnector(); + return new NioSocketConnector(); } else if (address instanceof VmPipeAddress) { return new VmPipeConnector(); } else { Index: trunk/core/src/main/java/quickfix/mina/IoSessionResponder.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/IoSessionResponder.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/IoSessionResponder.java (working copy) @@ -19,8 +19,8 @@ package quickfix.mina; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.WriteFuture; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.future.WriteFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +77,7 @@ // // Only wait for a limited time since MINA may deadlock // in some rare cases where a socket dies in a strange way. - for (int i = 0; i < 5 && ioSession.getScheduledWriteRequests() > 0; i++) { + for (int i = 0; i < 5 && ioSession.getScheduledWriteMessages() > 0; i++) { try { Thread.sleep(10L); } catch (InterruptedException e) { Index: trunk/core/src/main/java/quickfix/mina/SessionConnector.java =================================================================== --- trunk/core/src/main/java/quickfix/mina/SessionConnector.java (revision 1070) +++ trunk/core/src/main/java/quickfix/mina/SessionConnector.java (working copy) @@ -35,8 +35,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import org.apache.mina.common.IoFilterChainBuilder; -import org.apache.mina.common.IoSession; +import org.apache.mina.core.filterchain.IoFilterChainBuilder; +import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; Index: trunk/core/src/main/java/quickfix/Session.java =================================================================== --- trunk/core/src/main/java/quickfix/Session.java (revision 1070) +++ trunk/core/src/main/java/quickfix/Session.java (working copy) @@ -2422,7 +2422,7 @@ public static int numSessions() { return sessions.size(); } - + /** * Sets the timeout for waiting for a logon response. * @param seconds the timeout in seconds Index: trunk/core/src/main/java/quickfix/SocketAcceptor.java =================================================================== --- trunk/core/src/main/java/quickfix/SocketAcceptor.java (revision 1070) +++ trunk/core/src/main/java/quickfix/SocketAcceptor.java (working copy) @@ -75,7 +75,12 @@ public void stop(boolean forceDisconnect) { eventHandlingStrategy.stopHandlingMessages(); - stopAcceptingConnections(); + try { + stopAcceptingConnections(); + } catch (ConfigError e) + { + System.err.println(e); + } logoutAllSessions(forceDisconnect); stopSessionTimer(); Session.unregisterSessions(getSessions()); Index: trunk/core/src/main/java/quickfix/ThreadedSocketAcceptor.java =================================================================== --- trunk/core/src/main/java/quickfix/ThreadedSocketAcceptor.java (revision 1070) +++ trunk/core/src/main/java/quickfix/ThreadedSocketAcceptor.java (working copy) @@ -55,7 +55,10 @@ } public void stop(boolean forceDisconnect) { - stopAcceptingConnections(); + try { + stopAcceptingConnections(); + } catch (ConfigError e) + {} logoutAllSessions(forceDisconnect); stopSessionTimer(); eventHandlingStrategy.stopDispatcherThreads(); Index: trunk/core/src/main/java/quickfix/DefaultSessionFactory.java =================================================================== --- trunk/core/src/main/java/quickfix/DefaultSessionFactory.java (revision 1070) +++ trunk/core/src/main/java/quickfix/DefaultSessionFactory.java (working copy) @@ -15,7 +15,7 @@ * * Contact ask@quickfixengine.org if any conditions of this licensing * are not clear to you. - ******************************************************************************/ + **************************************************************************Æ’****/ package quickfix; Index: trunk/core/build.xml =================================================================== --- trunk/core/build.xml (revision 1070) +++ trunk/core/build.xml (working copy) @@ -2,6 +2,7 @@ + @@ -270,8 +271,10 @@ - + + + @@ -510,10 +513,10 @@ - + - +