diff --git a/quickfixj-core/src/.DS_Store b/quickfixj-core/src/.DS_Store new file mode 100644 index 0000000..254fc3a --- /dev/null +++ b/quickfixj-core/src/.DS_Store Binary files differ diff --git a/quickfixj-core/src/main/.DS_Store b/quickfixj-core/src/main/.DS_Store new file mode 100644 index 0000000..d5581d0 --- /dev/null +++ b/quickfixj-core/src/main/.DS_Store Binary files differ diff --git a/quickfixj-core/src/main/java/quickfix/Initiator.java b/quickfixj-core/src/main/java/quickfix/Initiator.java index 21899c6..4b1aa0b 100644 --- a/quickfixj-core/src/main/java/quickfix/Initiator.java +++ b/quickfixj-core/src/main/java/quickfix/Initiator.java @@ -68,4 +68,52 @@ * @see quickfix.SessionFactory#SETTING_CONNECTION_TYPE */ public static final String SETTING_SOCKET_LOCAL_PORT = "SocketLocalPort"; + + /** + * Initiator setting for proxy type. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_TYPE = "ProxyType"; + + /** + * Initiator setting for proxy version. Only valid when session connection + * type is "initiator". - http 1.0 / 1.1 + */ + public static final String SETTING_PROXY_VERSION = "ProxyVersion"; + + /** + * Initiator setting for proxy host. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_HOST = "ProxyHost"; + + /** + * Initiator setting for proxy port. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_PORT = "ProxyPort"; + + /** + * Initiator setting for proxy port. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_USER = "ProxyUser"; + + /** + * Initiator setting for proxy port. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_PASSWORD = "ProxyPassword"; + + /** + * Initiator setting for proxy domain. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_DOMAIN = "ProxyDomain"; + + /** + * Initiator setting for proxy workstation. Only valid when session connection + * type is "initiator". + */ + public static final String SETTING_PROXY_WORKSTATION = "ProxyWorkstation"; } diff --git a/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java b/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java index 640247b..22bd854 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java +++ b/quickfixj-core/src/main/java/quickfix/mina/ProtocolFactory.java @@ -21,6 +21,11 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URL; +import java.util.ArrayList; +import java.util.List;; +import java.util.HashMap;; + import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoConnector; @@ -29,6 +34,15 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; +import org.apache.mina.proxy.ProxyConnector; +import org.apache.mina.proxy.handlers.ProxyRequest; +import org.apache.mina.proxy.handlers.http.HttpAuthenticationMethods; +import org.apache.mina.proxy.handlers.http.HttpProxyConstants; +import org.apache.mina.proxy.handlers.http.HttpProxyRequest; +import org.apache.mina.proxy.handlers.socks.SocksProxyConstants; +import org.apache.mina.proxy.handlers.socks.SocksProxyRequest; +import org.apache.mina.proxy.session.ProxyIoSession; + import quickfix.ConfigError; import quickfix.RuntimeError; @@ -58,7 +72,7 @@ public static SocketAddress createSocketAddress(int transportType, String host, int port) throws ConfigError { - if (transportType == SOCKET) { + if (transportType == SOCKET || transportType == PROXY) { return host != null ? new InetSocketAddress(host, port) : new InetSocketAddress(port); } else if (transportType == VM_PIPE) { return new VmPipeAddress(port); @@ -102,14 +116,119 @@ } } + + + + public static IoConnector createIoProxyConnector(InetSocketAddress address, InetSocketAddress proxyAddress, String proxyType, String proxyVersion, String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation ) + throws ConfigError { + + // Create proxy connector. + ProxyRequest req = null; + NioSocketConnector socketConnector = new NioSocketConnector(); + ProxyConnector connector = new ProxyConnector(socketConnector); + connector.setConnectTimeoutMillis(5000); + + String uri = "http://"+address.getAddress().getHostAddress()+":"+address.getPort(); + HashMap props = new HashMap(); + if (proxyUser != null) { + props.put(HttpProxyConstants.USER_PROPERTY, proxyUser); + props.put(HttpProxyConstants.PWD_PROPERTY, proxyUser); + } + if (proxyDomain != null && proxyWorkstation != null) { + props.put(HttpProxyConstants.DOMAIN_PROPERTY, proxyDomain); + props.put(HttpProxyConstants.WORKSTATION_PROPERTY, proxyWorkstation); + } + + if (proxyType.equalsIgnoreCase("http")) + { + req=createHttpProxyRequest(address, proxyType, proxyVersion, proxyUser, proxyPassword, proxyDomain, proxyWorkstation ); + } else if (proxyType.equalsIgnoreCase("socks")) { + req=createSocksProxyRequest(address, proxyType, proxyVersion, proxyUser, proxyPassword, proxyDomain, proxyWorkstation ); + } else { + throw new ConfigError("Proxy type must be http or socks"); + } + + ProxyIoSession proxyIoSession = new ProxyIoSession(proxyAddress,req); + + List l = new ArrayList(); + l.add(HttpAuthenticationMethods.DIGEST); + l.add(HttpAuthenticationMethods.BASIC); + + proxyIoSession.setPreferedOrder(l); + connector.setProxyIoSession(proxyIoSession); + return connector; + } + + + private static ProxyRequest createHttpProxyRequest(InetSocketAddress address, String proxyType, String proxyVersion, String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation ) + { + String uri = "http://"+address.getAddress().getHostAddress()+":"+address.getPort(); + HashMap props = new HashMap(); + props.put(HttpProxyConstants.USER_PROPERTY, proxyUser); + props.put(HttpProxyConstants.PWD_PROPERTY, proxyUser); + if (proxyDomain != null && proxyWorkstation != null) { + props.put(HttpProxyConstants.DOMAIN_PROPERTY, proxyDomain); + props.put(HttpProxyConstants.WORKSTATION_PROPERTY, proxyWorkstation); + } + + HttpProxyRequest req = new HttpProxyRequest(uri); + req.setProperties(props); + if (proxyVersion != null && proxyVersion.equalsIgnoreCase("1.1")) { + req.setHttpVersion(HttpProxyConstants.HTTP_1_1); + } else { + req.setHttpVersion(HttpProxyConstants.HTTP_1_0); + } + return req; + } + + + private static ProxyRequest createSocksProxyRequest(InetSocketAddress address, String proxyType, String proxyVersion, + String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation ) throws ConfigError + { + SocksProxyRequest req = null; + if (proxyVersion.equalsIgnoreCase("4")) + { + + req=new SocksProxyRequest( + SocksProxyConstants.SOCKS_VERSION_4, + SocksProxyConstants.ESTABLISH_TCPIP_STREAM, + address, + proxyUser); + } else + if (proxyVersion.equalsIgnoreCase("4a")) + { + req= new SocksProxyRequest( + SocksProxyConstants.ESTABLISH_TCPIP_STREAM, + address.getAddress().getHostAddress(), + address.getPort(), + proxyUser); + + } else + if (proxyVersion.equalsIgnoreCase("5")) + { + req = new SocksProxyRequest( + SocksProxyConstants.SOCKS_VERSION_5, + SocksProxyConstants.ESTABLISH_TCPIP_STREAM, + address, + proxyUser); + // ((SocksProxyRequest) req).setServiceKerberosName(Socks5GSSAPITestServer.SERVICE_NAME); + } else + throw new ConfigError("SOCKS ProxyType must be 4,4a or 5"); + + if (req != null && proxyPassword != null) + req.setPassword(proxyPassword); + return req; + } + + public static IoConnector createIoConnector(SocketAddress address) throws ConfigError { if (address instanceof InetSocketAddress) { return new NioSocketConnector(); - } else if (address instanceof VmPipeAddress) { + } + else if (address instanceof VmPipeAddress) { return new VmPipeConnector(); } else { - throw new ConfigError("Unknown session acceptor address type: " - + address.getClass().getName()); + throw new ConfigError("Unknown session acceptor address type: "); } } } diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java index 3d05edb..381e102 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/AbstractSocketInitiator.java @@ -74,8 +74,7 @@ IoBuffer.setUseDirectBuffer(false); } - protected void createSessionInitiators() - throws ConfigError { + protected void createSessionInitiators() throws ConfigError { try { // QFJ698: clear() is needed on restart, otherwise the set gets filled up with // more and more initiators which are not equal because the local port differs @@ -113,10 +112,48 @@ ? strCipherSuites.split(",") : null; + String proxyUser = null; + String proxyPassword = null; + String proxyHost = null; + + String proxyType = null; + String proxyVersion = null; + + String proxyWorkstation = null; + String proxyDomain = null; + + int proxyPort = -1; + + if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_TYPE)) { + proxyType = settings.getString(sessionID, Initiator.SETTING_PROXY_TYPE); + if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_VERSION)) { + proxyVersion = settings.getString(sessionID, + Initiator.SETTING_PROXY_VERSION); + } + + if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_USER)) { + proxyUser = settings.getString(sessionID, Initiator.SETTING_PROXY_USER); + proxyPassword = settings.getString(sessionID, + Initiator.SETTING_PROXY_PASSWORD); + } + if (getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_WORKSTATION) + && getSettings().isSetting(sessionID, Initiator.SETTING_PROXY_DOMAIN)) { + proxyWorkstation = settings.getString(sessionID, + Initiator.SETTING_PROXY_WORKSTATION); + proxyDomain = settings.getString(sessionID, Initiator.SETTING_PROXY_DOMAIN); + } + + proxyHost = settings.getString(sessionID, Initiator.SETTING_PROXY_HOST); + proxyPort = (int) settings.getLong(sessionID, Initiator.SETTING_PROXY_PORT); + } + final IoSessionInitiator ioSessionInitiator = new IoSessionInitiator(session, - socketAddresses, localAddress, reconnectingIntervals, getScheduledExecutorService(), - networkingOptions, getEventHandlingStrategy(), getIoFilterChainBuilder(), - sslEnabled, keyStoreName, keyStorePassword, enableProtocole, cipherSuites); + socketAddresses, localAddress, reconnectingIntervals, + getScheduledExecutorService(), networkingOptions, + getEventHandlingStrategy(), getIoFilterChainBuilder(), sslEnabled, + keyStoreName, keyStorePassword, enableProtocole, cipherSuites, proxyType, + proxyVersion, proxyHost, proxyPort, proxyUser, proxyPassword, proxyDomain, + proxyWorkstation); initiators.add(ioSessionInitiator); } @@ -133,7 +170,8 @@ if (settings.isSetting(sessionID, Initiator.SETTING_SOCKET_LOCAL_HOST)) { String host = settings.getString(sessionID, Initiator.SETTING_SOCKET_LOCAL_HOST); if ("localhost".equals(host)) { - throw new ConfigError(Initiator.SETTING_SOCKET_LOCAL_HOST + " cannot be \"localhost\"!"); + throw new ConfigError(Initiator.SETTING_SOCKET_LOCAL_HOST + + " cannot be \"localhost\"!"); } int port = 0; if (settings.isSetting(sessionID, Initiator.SETTING_SOCKET_LOCAL_PORT)) { @@ -208,7 +246,8 @@ int transportType = ProtocolFactory.SOCKET; if (settings.isSetting(sessionID, protocolKey)) { try { - transportType = ProtocolFactory.getTransportType(settings.getString(sessionID, protocolKey)); + transportType = ProtocolFactory.getTransportType(settings.getString( + sessionID, protocolKey)); } catch (final IllegalArgumentException e) { // Unknown transport type throw new ConfigError(e); diff --git a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java index b831110..6357135 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/mina/initiator/IoSessionInitiator.java @@ -34,6 +34,8 @@ import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.ssl.SslFilter; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.proxy.ProxyConnector; +import org.apache.mina.proxy.session.ProxyIoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,22 +57,29 @@ private final ConnectTask reconnectTask; private Future reconnectFuture; - protected final static Logger log = LoggerFactory.getLogger("display." + IoSessionInitiator.class.getName()); + protected final static Logger log = LoggerFactory.getLogger("display." + + IoSessionInitiator.class.getName()); - public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, SocketAddress localAddress, - int[] reconnectIntervalInSeconds, ScheduledExecutorService executor, - NetworkingOptions networkingOptions, EventHandlingStrategy eventHandlingStrategy, + public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, + SocketAddress localAddress, int[] reconnectIntervalInSeconds, + ScheduledExecutorService executor, NetworkingOptions networkingOptions, + EventHandlingStrategy eventHandlingStrategy, IoFilterChainBuilder userIoFilterChainBuilder, boolean sslEnabled, String keyStoreName, - String keyStorePassword, String[] enableProtocole, String[] cipherSuites) throws ConfigError { + String keyStorePassword, String[] enableProtocole, String[] cipherSuites, + String proxyType, String proxyVersion, String proxyHost, int proxyPort, + String proxyUser, String proxyPassword, String proxyDomain, String proxyWorkstation) + throws ConfigError { this.executor = executor; final long[] reconnectIntervalInMillis = new long[reconnectIntervalInSeconds.length]; for (int ii = 0; ii != reconnectIntervalInSeconds.length; ++ii) { reconnectIntervalInMillis[ii] = reconnectIntervalInSeconds[ii] * 1000L; } try { - reconnectTask = new ConnectTask(sslEnabled, socketAddresses, localAddress, userIoFilterChainBuilder, - fixSession, reconnectIntervalInMillis, networkingOptions, - eventHandlingStrategy, keyStoreName, keyStorePassword, enableProtocole, cipherSuites); + reconnectTask = new ConnectTask(sslEnabled, socketAddresses, localAddress, + userIoFilterChainBuilder, fixSession, reconnectIntervalInMillis, + networkingOptions, eventHandlingStrategy, keyStoreName, keyStorePassword, + enableProtocole, cipherSuites, proxyType, proxyVersion, proxyHost, proxyPort, + proxyUser, proxyPassword, proxyDomain, proxyWorkstation); } catch (GeneralSecurityException e) { throw new ConfigError(e); } @@ -96,11 +105,23 @@ private int connectionFailureCount; private ConnectFuture connectFuture; + private final String proxyType; + private final String proxyVersion; + private final String proxyHost; + private final int proxyPort; + private final String proxyUser; + private final String proxyPassword; + private final String proxyDomain; + private final String proxyWorkstation; + public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses, - SocketAddress localAddress, IoFilterChainBuilder userIoFilterChainBuilder, Session fixSession, - long[] reconnectIntervalInMillis, NetworkingOptions networkingOptions, - EventHandlingStrategy eventHandlingStrategy, String keyStoreName, - String keyStorePassword, String[] enableProtocole, String[] cipherSuites) throws ConfigError, GeneralSecurityException { + SocketAddress localAddress, IoFilterChainBuilder userIoFilterChainBuilder, + Session fixSession, long[] reconnectIntervalInMillis, + NetworkingOptions networkingOptions, EventHandlingStrategy eventHandlingStrategy, + String keyStoreName, String keyStorePassword, String[] enableProtocole, + String[] cipherSuites, String proxyType, String proxyVersion, String proxyHost, + int proxyPort, String proxyUser, String proxyPassword, String proxyDomain, + String proxyWorkstation) throws ConfigError, GeneralSecurityException { this.socketAddresses = socketAddresses; this.localAddress = localAddress; this.fixSession = fixSession; @@ -109,7 +130,26 @@ this.keyStorePassword = keyStorePassword; this.enableProtocole = enableProtocole; this.cipherSuites = cipherSuites; - ioConnector = ProtocolFactory.createIoConnector(socketAddresses[0]); + + this.proxyType = proxyType; + this.proxyVersion = proxyVersion; + this.proxyHost = proxyHost; + this.proxyPort = proxyPort; + this.proxyUser = proxyUser; + this.proxyPassword = proxyPassword; + this.proxyDomain = proxyDomain; + this.proxyWorkstation = proxyWorkstation; + + if (proxyType != null && proxyPort > 0 + && socketAddresses[0] instanceof InetSocketAddress) { + ioConnector = ProtocolFactory.createIoProxyConnector( + (InetSocketAddress) socketAddresses[0], new InetSocketAddress(proxyHost, + proxyPort), proxyType, proxyVersion, proxyUser, proxyPassword, + proxyDomain, proxyWorkstation); + } else { + ioConnector = ProtocolFactory.createIoConnector(socketAddresses[0]); + } + CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder( userIoFilterChainBuilder); @@ -121,8 +161,7 @@ new ProtocolCodecFilter(new FIXProtocolCodecFactory())); ioConnector.setFilterChainBuilder(ioFilterChainBuilder); - ioHandler = new InitiatorIoHandler(fixSession, networkingOptions, - eventHandlingStrategy); + ioHandler = new InitiatorIoHandler(fixSession, networkingOptions, eventHandlingStrategy); } private void installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder) @@ -188,11 +227,13 @@ while (e.getCause() != null) { e = e.getCause(); } - final String nextRetryMsg = " (Next retry in " + computeNextRetryConnectDelay() + " milliseconds)"; + final String nextRetryMsg = " (Next retry in " + computeNextRetryConnectDelay() + + " milliseconds)"; if (e instanceof IOException) { fixSession.getLog().onErrorEvent(e.getClass().getName() + ": " + e + nextRetryMsg); } else { - LogUtil.logThrowable(fixSession.getLog(), "Exception during connection" + nextRetryMsg, e); + LogUtil.logThrowable(fixSession.getLog(), "Exception during connection" + + nextRetryMsg, e); } connectFuture = null; } @@ -204,8 +245,8 @@ if (socketAddress instanceof InetSocketAddress) { InetSocketAddress inetAddr = (InetSocketAddress) socketAddress; if (inetAddr.isUnresolved()) { - socketAddress = new InetSocketAddress(inetAddr.getHostName(), inetAddr - .getPort()); + socketAddress = new InetSocketAddress(inetAddr.getHostName(), + inetAddr.getPort()); socketAddresses[nextSocketAddressIndex] = socketAddress; } } @@ -237,17 +278,20 @@ // TODO JMX Expose reconnect property - @SuppressWarnings("unused") // exposed via JMX + @SuppressWarnings("unused") + // exposed via JMX public synchronized int getConnectionFailureCount() { return connectionFailureCount; } - @SuppressWarnings("unused") // exposed via JMX + @SuppressWarnings("unused") + // exposed via JMX public synchronized long getLastReconnectAttemptTime() { return lastReconnectAttemptTime; } - @SuppressWarnings("unused") // exposed via JMX + @SuppressWarnings("unused") + // exposed via JMX public synchronized long getLastConnectTime() { return lastConnectTime; }