[QFJ-854] Race discovered in SingleThreadedEventHandlingStrategyTest Created: 09/Jul/15  Updated: 21/Apr/16  Resolved: 23/Dec/15

Status: Closed
Project: QuickFIX/J
Component/s: Engine
Affects Version/s: 1.6.0
Fix Version/s: 1.6.2

Type: Bug Priority: Default
Reporter: Nathan Tippy Assignee: Marcin L
Resolution: Fixed Votes: 1
Labels: None

Attachments: Java Source File IsAliveTest.java     File start.bat    

 Description   

I have run into a problem with the unit tests on my quad core haswell i7. It looks like a race condition.

Failed tests:
SingleThreadedEventHandlingStrategyTest.testDoubleStart:55->checkThreads:94 Exactly one 'QFJ Message Processor' thread expected expected:<1> but was:<2>

Tests run: 1294, Failures: 1, Errors: 0, Skipped: 0

I can fix the problem by adding this sleep on line 54 of SingleThreadedEventHandlingStrategyTest. However this is not the right solution but it works.

ehs.blockInThread();
Thread.sleep(200); //need delay
ehs.blockInThread();
checkThreads(bean);



 Comments   
Comment by Alexey Serdyuk [ 23/Jul/15 ]

I could reproduce failing test on Intel Xeon (6 cores).

I believe that the problem is caused not by a bug in the tested class, but by an error in the unit test itself: a test method does not properly clean up, so the second test method may fail.

When the test SingleThreadedEventHandlingStrategyTest.testStartStop() is executed, it calls the method SingleThreadedEventHandlingStrategy.blockInThread() that starts a message processor thread. When the method testStartStop() finishes, the message processor thread is still running for some milliseconds. If the second test SingleThreadedEventHandlingStrategyTest.testDoubleStart() is executed quick enough, it also calls SingleThreadedEventHandlingStrategy.blockInThread() and starts the second message processor thread. After this an attempt to assert that there is only one message processor thread fails.

How to prove that this is the case: add the following main() method to SingleThreadedEventHandlingStrategyTest and run it.

    public static void main(String []args) throws Exception {
        SingleThreadedEventHandlingStrategyTest test_1 = new SingleThreadedEventHandlingStrategyTest();
        test_1.testStartStop();
        test_1.checkThreads(ManagementFactory.getThreadMXBean());
        
        SingleThreadedEventHandlingStrategyTest test_2 = new SingleThreadedEventHandlingStrategyTest();
        test_2.testDoubleStart();
    }

The method checkThreads() throws an exception if the number of message processor threads differs from 1. In my test it does not throw an exception, so the message processor thread still runs after the unit test is executed. When the second unit test is executed, it finds 2 message processor threads and fails.

Solution: in the "finally" block each unit test must somehow ensure that the message processing thread is finished. Maybe the method SingleThreadedEventHandlingStrategy.stopHandlingMessages() must be modified to wait until the message processing thread finishes, or a new method is required - I do not know QuickfixJ internals good enough to judge.

A quick workaround is to add a sleep in the "finally" block of each unit test after the method stopHandlingMessages() is called, to give the message processing thread some time to exit, but I do not consider this to be a right solution.

Comment by Christoph John [ 23/Jul/15 ]

Hi Alexey Serdyuk, thanks for the analysis!

Comment by Michael Whidden [ 11/Oct/15 ]

Also reproducible on a 4-core, JDK 8. My solution was to add this method to SingleThreadedEventHandlingStrategy.java

    /**
     * 
     * @param join if true, don't return until the thread has terminated.
     */
    public void stopHandlingMessages(boolean join) throws InterruptedException
    {
        stopHandlingMessages();
        if(join && messageProcessingThread.isAlive())
            messageProcessingThread.join();
    }

and call
ehs.stopHandlingMessages(true);
in two finally blocks in
SingleThreadedEventHandlingStrategyTest

Comment by Michael Whidden [ 11/Oct/15 ]

Sorry about the abysmal formatting of my previous comment. Hopefully you get the drift...

Comment by Christoph John [ 12/Oct/15 ]

Thanks for your comment. I have fixed the formatting.

Comment by Christoph John [ 05/Jan/16 ]

I have merged this also to the 1.6.x branch and afterwards the build failed two or three times consecutively because of this test. Now it succeeded a few times in a row. Strange indeed...
For the trunk this test did not fail at all during the last builds. Maybe there is also a relation to the JDK version being used. For trunk it is JDK7 for the branch it is JDK6.

Comment by Marcin L [ 05/Jan/16 ]

I re-run the test class multiple times on different Java versions, but I can't see it failing. I can see that you added some extra logging. Let's wait and see.

Comment by Christoph John [ 06/Jan/16 ]

For me it only occurred when running the complete build (or at least when running all tests from that module).
E.g. http://www.quickfixj.org:8085/browse/QFJ-GIT16X-22
http://www.quickfixj.org:8085/browse/QFJ-GIT16X-23
http://www.quickfixj.org:8085/browse/QFJ-GIT16X-24

Comment by Marcin L [ 06/Jan/16 ]

I found out that the problem is caused by Thread#isAlive() method. It doesn't actually guarantee to return "true" after calling Thread#start() method. It seems like there is a window of inconsistency between these two methods. I prepared a simple test class and a batch script to illustrate the problem. It runs in a loop, because it fails very occasionally and it takes some time to prove it. Same problem exists for Java 7 and Java 8, although it happens more often on Java 6 (it might not be true in practice).

I noticed that Thread#getState() method is better, because it never returns "NEW" after calling Thread#start().

Looking at QFJ-GIT16X-JOB1-32 log I can see that test case testDoubleStart() bypassed if (messageProcessingThread != null && messageProcessingThread.isAlive()) statement.
In the stack trace we can see two threads being started. We see "INFO: Started QFJ Message Processor" only once, because the second thread was executing line 123 which is this log line.

build 06-Jan-2016 13:32:05 Running quickfix.mina.SingleThreadedEventHandlingStrategyTest
error 06-Jan-2016 13:32:05 Jan 6, 2016 1:32:05 PM quickfix.mina.SingleThreadedEventHandlingStrategy$1 run
error 06-Jan-2016 13:32:05 INFO: Started QFJ Message Processor
build 06-Jan-2016 13:32:05 QFJ Message Processor RUNNABLE
build 06-Jan-2016 13:32:05 java.lang.Object.getClass(Native Method)
build 06-Jan-2016 13:32:05 java.util.logging.LogRecord.<init>(LogRecord.java:126)
build 06-Jan-2016 13:32:05 org.slf4j.impl.JDK14LoggerAdapter.log(JDK14LoggerAdapter.java:574)
build 06-Jan-2016 13:32:05 org.slf4j.impl.JDK14LoggerAdapter.info(JDK14LoggerAdapter.java:275)
build 06-Jan-2016 13:32:05 quickfix.mina.SingleThreadedEventHandlingStrategy$1.run(SingleThreadedEventHandlingStrategy.java:123)
build 06-Jan-2016 13:32:05 java.lang.Thread.run(Thread.java:662)
build 06-Jan-2016 13:32:05 QFJ Message Processor TIMED_WAITING
build 06-Jan-2016 13:32:05 sun.misc.Unsafe.park(Native Method)
build 06-Jan-2016 13:32:05 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:196)
build 06-Jan-2016 13:32:05 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
build 06-Jan-2016 13:32:05 java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424)
build 06-Jan-2016 13:32:05 quickfix.mina.SingleThreadedEventHandlingStrategy.getMessage(SingleThreadedEventHandlingStrategy.java:99)
build 06-Jan-2016 13:32:05 quickfix.mina.SingleThreadedEventHandlingStrategy.block(SingleThreadedEventHandlingStrategy.java:88)
build 06-Jan-2016 13:32:05 quickfix.mina.SingleThreadedEventHandlingStrategy$1.run(SingleThreadedEventHandlingStrategy.java:124)
build 06-Jan-2016 13:32:05 java.lang.Thread.run(Thread.java:662)

I will provide a fix as soon as I can.

Comment by Alexey Serdyuk [ 07/Jan/16 ]

In IsAliveTest.java you start a new thread with an empty run() method, so it is possible that it is already finished at the moment when the original thread calls isAlive(). If you want to test whether the second thread reports isAlive() = true after the start, you have wait inside it's run() method to not finish to early, for example like this:

public class IsAliveTest1 {
    public static void main(final String[] args) {
        long startMillis = System.currentTimeMillis();

        while (true) {
            final java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
            Thread thread = new Thread("FooBar") {
                @Override
                public void run() {
                    // Wait until the main thread commands this thread to finish.
                    try {
                        latch.await();
                    } catch (InterruptedException ex) {
                        // No-op.
                    }
                }
            };

            thread.setDaemon(true);
            thread.start();
            
            // in theory 'isAlive' should always return true at this point
            if (!thread.isAlive()) {
                System.out.println("BOOM!");
                System.exit(0);
            }
            
            // Command the second thread to finish.
            latch.countDown();
            
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            if (System.currentTimeMillis() - startMillis > 60000) {
                System.exit(0);
            }
        }
    }
}

With the original IsAliveTest I have observed the "BOOM!" again and again, but with the modified version I can't reproduce it.

Comment by Marcin L [ 07/Jan/16 ]

You are absolutely right Alexey. I was fiddling with the test for too long and got to ridiculous assumptions.

Comment by Marcin L [ 08/Jan/16 ]

SingleThreadedEventHandlingStrategyTest fails, because there is a stale 'QFJ Message Processor' thread left over from a different test case.
I found two test classes which do not clean up 'QFJ Message Processor' thread correctly - ResynchTest and TimerTest. The thread will be running until Java process is restarted.

Looking at the build logs I can see that ResynchTestServer (SingleThreadedEventHandlingStrategy actually) class will not clean up resources correctly if a thread
which makes a call to SocketAcceptor.stop() is in interrupted state (which is triggered in tearDown code). All failing tests have the same exception in common and there is always one log line 'INFO: Started QFJ Message Processor ' more than 'INFO: Stopped QFJ Message Processor'.

QFJ-GIT16X-JOB1-32.log:
error 06-Jan-2016 13:31:48 SEVERE: Error in TimerTestServer server:
error 06-Jan-2016 13:31:48 java.lang.RuntimeException: java.lang.InterruptedException
error 06-Jan-2016 13:31:48 at quickfix.mina.SingleThreadedEventHandlingStrategy.onMessage(SingleThreadedEventHandlingStrategy.java:56)
error 06-Jan-2016 13:31:48 at quickfix.mina.SingleThreadedEventHandlingStrategy.stopHandlingMessages(SingleThreadedEventHandlingStrategy.java:156)
error 06-Jan-2016 13:31:48 at quickfix.SocketAcceptor.stop(SocketAcceptor.java:108)
error 06-Jan-2016 13:31:48 at quickfix.SocketAcceptor.stop(SocketAcceptor.java:101)
error 06-Jan-2016 13:31:48 at quickfix.test.acceptance.resynch.ResynchTestServer.run(ResynchTestServer.java:136)
error 06-Jan-2016 13:31:48 at java.lang.Thread.run(Thread.java:662)
error 06-Jan-2016 13:31:48 Caused by: java.lang.InterruptedException
error 06-Jan-2016 13:31:48 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1199)
error 06-Jan-2016 13:31:48 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
error 06-Jan-2016 13:31:48 at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:294)
error 06-Jan-2016 13:31:48 at quickfix.mina.SingleThreadedEventHandlingStrategy.onMessage(SingleThreadedEventHandlingStrategy.java:54)
error 06-Jan-2016 13:31:48 ... 5 more

SingleThreadedEventHandlingStrategy#isStopped field is never updated when stop is called from within a thread in 'interrupted state'.

I wrote four failing test cases:
https://github.com/the-thing/quickfixj/commit/5f85381f93af17281dbcc5acbe41cf3587cb1eed

shouldCleanUpAcceptorQFJMessageProcessorThreadAfterInterrupt
shouldCleanUpInitiatorQFJMessageProcessorThreadAfterInterrupt

To fix these two we need to make a small change to SingleThreadedEventHandlingStrategy.onMethod():

@Override
public void onMessage(Session quickfixSession, Message message) {
    if (message == END_OF_STREAM && isStopped) {
        return;
    }
    try {
        eventQueue.put(new SessionMessageEvent(quickfixSession, message));
    } catch (InterruptedException e) {
        // Thread.currentThread().interrupt();
	// OR
	// isStopped = true;
	// throw new RuntimeException(e);
    }
}

shouldCleanUpAcceptorQFJMessageProcessorThreadAfterStop
shouldCleanUpInitiatorQFJMessageProcessorThreadAfterStop

I'm not sure about these two, but it looks like it might be separate issue. I seems that it's not possible to stop blocking SocketAcceptor or SocketInitiator by neither calling stop from another thread nor interrupting the thread which called block() method, unless I'm not doing it properly. Can somebody take a look at the last two?

Comment by Christoph John [ 13/Jan/16 ]

Thanks for the test. I only get failures in the test shouldCleanUpInitiatorQFJMessageProcessorThreadAfterStop. I get the same failures as you if I change the port of the acceptor on each call to createAcceptor(). Otherwise the second acceptor just reports a BindException since the port is already in use. Also see another comment here: https://github.com/the-thing/quickfixj/commit/5f85381f93af17281dbcc5acbe41cf3587cb1eed#commitcomment-15420123
I think the problem with these two tests is that the poll() call to the blocking queue can not be interrupted. I tried moving the call to

                eventHandlingStrategy.stopHandlingMessages();

out of the synchronized block in the stop() methods of SocketAcceptor/Initiator and that resolves the issue by placing the END_OF_STREAM message into the blocking queue. But I need to think about if this will cause synchronization problems of some sort when concurrently starting/stopping the same Initiator/Acceptor.

Comment by Marcin L [ 13/Jan/16 ]

I forgot to mention that the tests should be executed individually with a delay.

Generated at Sun Apr 28 14:30:07 UTC 2024 using JIRA 7.5.2#75007-sha1:9f5725bb824792b3230a5d8716f0c13e296a3cae.