Spring Integration
Sample Implementation
Info |
---|
This is a slightly modified version of what we've implemented at my company. I haven't tested my modifications, so I apologise if it doesn't work exactly but it should give you an idea of what can be done. It should work with quickfixj 1.0.0/Spring 1.2.5. – - Brad Harvey |
This was all written before MINA 0.9 was being used. MINA 0.9 has its own spring integration classes, and (at least in theory in my head) it would be really nifty if quickfixj just hooked into these as it would open up some extra customisation possibilities such as SSL Filters.
Application Context
Here's the basic application context. You supply an Application implementation and quickfix will start up once the application context is loaded. Aside from your Application you also need sample.QuickFixBean which does most of the work.
...
Code Block | ||||
---|---|---|---|---|
| ||||
<!-- Poor man's session management bean --> <bean id="sessionExporter" class="org.springframework.jmx.export.MBeanExporter"> <property name="beans"> <bean id="quickfix.sessions" class="org.springframework.beans.factory.config.PropertyPathFactoryBean"/> </property> <property name="assembler"> <bean class="org.springframework.jmx.export.assembler.MethodNameBasedMBeanInfoAssembler"> <property name="managedMethods"> <value>isEnabled,getExpectedSenderNum,getExpectedTargetNum,isLoggedOn,isSessionTime,logon,logout,disconnect,enable,sentLogon,receivedLogon,sentLogout</value> </property> </bean> </property> </bean> |
sample.QuickFixBean
Code Block | ||||
---|---|---|---|---|
| ||||
package sample; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.StringUtils; import quickfix.Acceptor; import quickfix.Application; import quickfix.ConfigError; import quickfix.DefaultSessionFactory; import quickfix.Initiator; import quickfix.MemoryStoreFactory; import quickfix.MessageStoreFactory; import quickfix.SLF4JLogFactory; import quickfix.Session; import quickfix.SessionFactory; import quickfix.SessionID; import quickfix.SessionSettings; import quickfix.SocketAcceptor; import quickfix.SocketInitiator; import quickfix.ThreadedSocketAcceptor; import quickfix.ThreadedSocketInitiator; /** * The QuickFixBean gives us a convenient way to boostrap quickfix from Spring. */ public class QuickFixBean implements DisposableBean, InitializingBean { private Application application; private SessionSettings sessionSettings; private MessageStoreFactory messageStoreFactory; private quickfix.LogFactory logFactory; private boolean client = true; private boolean threaded = false; private Map sessions; private HashMap jmxSessions; private static Log log = LogFactory.getLog(QuickFixBean.class); public void afterPropertiesSet() throws ConfigError { validateProperties(); SessionFactory sessionFactory = new DefaultSessionFactory(application, messageStoreFactory, logFactory); if (client) { Initiator initiator; if (threaded) { initiator = new ThreadedSocketInitiator(sessionFactory, sessionSettings); } else { initiator = new SocketInitiator(sessionFactory, sessionSettings); } initiator.start(); } else { Acceptor acceptor; if (threaded) { acceptor = new ThreadedSocketAcceptor(sessionFactory, sessionSettings); } else { acceptor = new SocketAcceptor(sessionFactory, sessionSettings); } acceptor.start(); } lookupSessions(); visitSessions(new SessionVisitor() { public void visitSession(SessionID sessionId, Session session) { log.info("Started " + sessionId); }}); } private void validateProperties() { if (application == null) { throw new BeanCreationException("Must specify an application"); } if (sessionSettings == null) { throw new BeanCreationException("Must specify session settings"); } if (logFactory == null) { logFactory = new SLF4JLogFactory(sessionSettings); } if (messageStoreFactory == null) { messageStoreFactory = new MemoryStoreFactory(); } } public void destroy() { // ask them all to logout visitSessions(new SessionVisitor() { public void visitSession(SessionID sessionId, Session session) { session.logout(); }}); // wait for them to do so visitSessions(new SessionVisitor() { public void visitSession(SessionID sessionId, Session session) { // bit of a hack... int waitCount = 1; while (waitCount < 5 && session.isLoggedOn()) { log.info("Waiting for " + sessionId + " to logout"); try { Thread.sleep(200 * waitCount); } catch (InterruptedException e) { break; } waitCount++; } if (session.isLoggedOn()) { log.warn(sessionId + " didn't log out, shutting down anyway."); } }}); } // at the time of writing acceptor.getSessions() didn't work... private void lookupSessions() { sessions = new HashMap(); jmxSessions = new HashMap(); for (Iterator iter = sessionSettings.sectionIterator(); iter.hasNext();) { SessionID sessionId = (SessionID) iter.next(); Session session = Session.lookupSession(sessionId); if (session != null) { sessions.put(sessionId, session); // can't have : in the name but the sessionId looks like FIX4.4:LocalCompId->RemoteCompId // prefix should be configurable jmxSessions.put("quickfixj:name=" + StringUtils.replace(sessionId.toString(), ":", ""), session); } } } /** * This is just to avoid duplicating the loop code. * No it isn't really the Visitor pattern. No it probably isn't necessary. * @param sessionVisitor Does something to each session. * @see SessionVisitor#visitSession(SessionID, Session) */ private void visitSessions(SessionVisitor sessionVisitor) { for (Iterator iter = sessions.entrySet().iterator(); iter.hasNext();) { Entry entry = (Entry) iter.next(); SessionID sessionId = (SessionID) entry.getKey(); Session session = (Session) entry.getValue(); if (sessionId != null && session != null) { sessionVisitor.visitSession(sessionId, session); } } } private interface SessionVisitor { /** * Do something to the given session. * @param sessionId * @param session */ public void visitSession(SessionID sessionId, Session session); } public Map getSessions() { return jmxSessions; } public Application getApplication() { return application; } public void setApplication(Application application) { this.application = application; } public boolean isClient() { return client; } public void setClient(boolean client) { this.client = client; } public quickfix.LogFactory getLogFactory() { return logFactory; } public void setLogFactory(quickfix.LogFactory logFactory) { this.logFactory = logFactory; } public MessageStoreFactory getMessageStoreFactory() { return messageStoreFactory; } public void setMessageStoreFactory(MessageStoreFactory messageStoreFactory) { this.messageStoreFactory = messageStoreFactory; } public SessionSettings getSessionSettings() { return sessionSettings; } public void setSessionSettings(SessionSettings sessionSettings) { this.sessionSettings = sessionSettings; } public boolean isThreaded() { return threaded; } public void setThreaded(boolean threaded) { this.threaded = threaded; } } |
Starting it up
Now you just need to load your application context - put quickfixContext.xml on your classpath and do something like this:
...