Details
-
Type: Improvement
-
Status: Closed
-
Priority: Default
-
Resolution: Duplicate
-
Affects Version/s: 1.4.0
-
Fix Version/s: None
-
Component/s: Engine
-
Labels:None
Description
This patch ends dispatcher thread after logout.
/*******************************************************************************
- Copyright (c) quickfixengine.org All rights reserved.
* - This file is part of the QuickFIX FIX Engine
* - This file may be distributed under the terms of the quickfixengine.org
- license as defined by quickfixengine.org and appearing in the file
- LICENSE included in the packaging of this file.
* - This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
- THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
- PARTICULAR PURPOSE.
* - See http://www.quickfixengine.org/LICENSE for licensing information.
* - Contact [email protected] if any conditions of this licensing
- are not clear to you.
******************************************************************************/
package quickfix.mina;
import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionStateListener;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
- Processes messages in a session-specific thread.
*/
public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
private final Map<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap<SessionID, MessageDispatchingThread>();
public void onMessage(Session quickfixSession, Message message) {
MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID());
if (dispatcher == null)
dispatcher.enqueue(message);
}
/** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex
- between multiple sessions here so this is null
*/
public SessionConnector getSessionConnector() { return null; }
protected void startDispatcherThread(MessageDispatchingThread dispatcher)
{ dispatcher.start(); } public void stopDispatcherThreads() {
Collection<MessageDispatchingThread> dispatchersToShutdown = dispatchers.values();
dispatchers.clear();
for (MessageDispatchingThread dispatcher : dispatchersToShutdown)
}
private final class MessageDispatchingThread extends Thread {
private final Session quickfixSession;
private final BlockingQueue<Message> messages = new LinkedBlockingQueue<Message>();
private volatile boolean stopped;
private MessageDispatchingThread(Session session) {
super("QF/J Session dispatcher: " + session.getSessionID());
quickfixSession = session;
quickfixSession.addStateListener(new SessionStateListener() {
public void onConnect() {
}
public void onDisconnect() {
}
public void onLogon() {
}
public void onLogout()
{ stopped = true; interrupt(); dispatchers.remove(quickfixSession.getSessionID()); } public void onReset() {
}
public void onRefresh() {
}
public void onMissedHeartBeat() {
}
public void onHeartBeatTimeout() {
}
});
}
public void enqueue(Message message) {
try
catch (InterruptedException e)
{ quickfixSession.getLog().onErrorEvent(e.toString()); }}
public int getQueueSize()
{ return messages.size(); } public void run() {
while (!stopped) {
try {
Message message = getNextMessage(messages);
if (quickfixSession.hasResponder())
} catch (InterruptedException e) {
if (!stopped)
} catch (Throwable e)
{ LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); } }
}
public void stopDispatcher()
{ stopped = true; }}
private BlockingQueue<Message> getMessages(SessionID sessionID)
{ MessageDispatchingThread dispatcher = getDispatcher(sessionID); return dispatcher.messages; }private MessageDispatchingThread getDispatcher(SessionID sessionID)
{ return dispatchers.get(sessionID); }private Message getNextMessage(BlockingQueue<Message> messages) throws InterruptedException
{ return messages.take(); } public int getQueueSize() {
int ret = 0;
for(MessageDispatchingThread mdt : dispatchers.values())
return ret;
}
}
Attachments
Issue Links
- duplicates
-
QFJ-410 ThreadPerSessionEventHandlingStrategy still leaks threads
- Closed