[QFJ-410] ThreadPerSessionEventHandlingStrategy still leaks threads Created: 04/Mar/09  Updated: 15/Nov/12  Resolved: 05/Apr/10

Status: Closed
Project: QuickFIX/J
Component/s: Engine
Affects Version/s: 1.4.0
Fix Version/s: 1.5.0

Type: Bug Priority: Default
Reporter: Jerry Shea Assignee: Unassigned
Resolution: Fixed Votes: 1
Labels: None

Attachments: Text File 410-2.patch     Text File 410.patch     Text File ThreadPerSessionEventHandlingStrategy.java    
Issue Links:
Duplicate
is duplicated by QFJ-491 ThreadPerSessionEventHandlingStrategy... Closed
is duplicated by QFJ-475 ThreadPerSessionEventHandlingStrategy... Closed
is duplicated by QFJ-510 ThreadPerSessionEventHandlingStrategy... Closed
Relates
relates to QFJ-491 ThreadPerSessionEventHandlingStrategy... Closed

 Description   

See http://www.quickfixj.org/jira/browse/QFJ-326

I'd like this to be reopened as threads still leak.
1. in ThreadPerSessionEventHandlingStrategy.stopDispatcherThreads the dispatchers.clear() is done before looping through the collection and so threads' stopDispatcher is never called
2. ThreadPerSessionEventHandlingStrategy.MessageDispatchingThread.run never terminates as this method repeatedly calls getNextMessage in a loop and getNextMessage calls messages.take which blocks indefinitely
3. also, stopDispatcherThreads should wait for threads to terminate before it returns so that everything gets cleaned up properly and in order

I've got a patch which fixes these by:
1. reordering statements in stopDispatcherThreads
2. using messages.poll which times out. Have set up a constant timeout of 10 seconds
3. waiting for threads to finish

What do people think about the timeout value? We could make this timeout configurable, or higher.



 Comments   
Comment by Jerry Shea [ 04/Mar/09 ]

Here's the patch. I changed the timeout to 2 seconds in my testing.

Comment by Jerry Shea [ 15/Jun/09 ]

Have reduced the wait timeout to 250ms to make shutdown more responsive.

Comment by Jerry Shea [ 15/Jun/09 ]

Guys, is there any chance we could get this change assigned to 1.4.1 release? Let me know if there's anything else I can do to help this along

Comment by Andre Mermegas [ 18/Dec/09 ]

I have a similiar bugfix, i can't attach for some reason.

/*******************************************************************************

  • Copyright (c) quickfixengine.org All rights reserved.
    *
  • This file is part of the QuickFIX FIX Engine
    *
  • This file may be distributed under the terms of the quickfixengine.org
  • license as defined by quickfixengine.org and appearing in the file
  • LICENSE included in the packaging of this file.
    *
  • This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
  • THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
  • PARTICULAR PURPOSE.
    *
  • See http://www.quickfixengine.org/LICENSE for licensing information.
    *
  • Contact [email protected] if any conditions of this licensing
  • are not clear to you.
    ******************************************************************************/

package quickfix.mina;

import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**

  • Processes messages in a session-specific thread.
    */
    public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
    private final Map<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap<SessionID, MessageDispatchingThread>();

public void onMessage(Session quickfixSession, Message message) {
MessageDispatchingThread dispatcher = dispatchers
.get(quickfixSession.getSessionID());
if (dispatcher == null)

{ dispatcher = new MessageDispatchingThread(quickfixSession); dispatchers.put(quickfixSession.getSessionID(), dispatcher); startDispatcherThread(dispatcher); }

dispatcher.enqueue(message);
}

/** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex

  • between multiple sessions here so this is null
    */
    public SessionConnector getSessionConnector() { return null; }

protected void startDispatcherThread(MessageDispatchingThread dispatcher)

{ dispatcher.start(); }

public void stopDispatcherThreads() {
Collection<MessageDispatchingThread> dispatchersToShutdown = dispatchers.values();
dispatchers.clear();
for (MessageDispatchingThread dispatcher : dispatchersToShutdown)

{ dispatcher.stopDispatcher(); }

}

private class MessageDispatchingThread extends Thread {
private final Session quickfixSession;
private final BlockingQueue<Message> messages = new LinkedBlockingQueue<Message>();
private volatile boolean stopped;

MessageDispatchingThread(Session session)

{ super("QF/J Session dispatcher: " + session.getSessionID()); quickfixSession = session; }

public void enqueue(Message message) {
try

{ messages.put(message); }

catch (InterruptedException e)

{ quickfixSession.getLog().onEvent(e.getMessage()); }

}

public Message getNextMessage(BlockingQueue<Message> messages) throws InterruptedException

{ return messages.poll(1000L, TimeUnit.MILLISECONDS); }

public int getQueueSize()

{ return messages.size(); }

public void run() {
while (!stopped) {
try {
Message message = getNextMessage(messages);
if (message != null && quickfixSession.hasResponder())

{ quickfixSession.next(message); }

} catch (InterruptedException e)

{ LogUtil.logThrowable(quickfixSession.getSessionID(), "Message dispatcher interrupted", e); return; }

catch (Throwable e)

{ LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); }

stopped = !quickfixSession.hasResponder();
}
dispatchers.remove(quickfixSession.getSessionID());
}

public void stopDispatcher()

{ stopped = true; }

}
public int getQueueSize() {
int ret = 0;
for(MessageDispatchingThread mdt : dispatchers.values())

{ ret+=mdt.getQueueSize(); }

return ret;
}

}

Comment by Andre Mermegas [ 06/Apr/10 ]

that patch didn't work for me, threads were not ending.

attached my fix.

Comment by Andre Mermegas [ 06/Apr/10 ]

should probably make getNextMessage(...) private too.

Generated at Sat Nov 23 11:16:24 UTC 2024 using JIRA 7.5.2#75007-sha1:9f5725bb824792b3230a5d8716f0c13e296a3cae.