[QFJ-510] ThreadPerSessionEventHandlingStrategy.MessageDispatchingThread threads live after disconnect Created: 03/Mar/10  Updated: 15/Nov/12  Resolved: 05/Apr/10

Status: Closed
Project: QuickFIX/J
Component/s: Engine
Affects Version/s: 1.4.0
Fix Version/s: None

Type: Improvement Priority: Default
Reporter: Andre Mermegas Assignee: Unassigned
Resolution: Duplicate Votes: 0
Labels: None

Attachments: Text File ThreadPerSessionEventHandlingStrategy.java     Text File ThreadPerSessionEventHandlingStrategy.java     Text File ThreadPerSessionEventHandlingStrategy.java    
Issue Links:
Duplicate
duplicates QFJ-410 ThreadPerSessionEventHandlingStrategy... Closed

 Description   

This patch ends dispatcher thread after logout.

/*******************************************************************************

  • Copyright (c) quickfixengine.org All rights reserved.
    *
  • This file is part of the QuickFIX FIX Engine
    *
  • This file may be distributed under the terms of the quickfixengine.org
  • license as defined by quickfixengine.org and appearing in the file
  • LICENSE included in the packaging of this file.
    *
  • This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING
  • THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A
  • PARTICULAR PURPOSE.
    *
  • See http://www.quickfixengine.org/LICENSE for licensing information.
    *
  • Contact [email protected] if any conditions of this licensing
  • are not clear to you.
    ******************************************************************************/

package quickfix.mina;

import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.SessionStateListener;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

/**

  • Processes messages in a session-specific thread.
    */
    public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
    private final Map<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap<SessionID, MessageDispatchingThread>();

public void onMessage(Session quickfixSession, Message message) {
MessageDispatchingThread dispatcher = dispatchers.get(quickfixSession.getSessionID());
if (dispatcher == null)

{ dispatcher = new MessageDispatchingThread(quickfixSession); dispatchers.put(quickfixSession.getSessionID(), dispatcher); startDispatcherThread(dispatcher); }

dispatcher.enqueue(message);
}

/** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex

  • between multiple sessions here so this is null
    */
    public SessionConnector getSessionConnector() { return null; }

protected void startDispatcherThread(MessageDispatchingThread dispatcher)

{ dispatcher.start(); }

public void stopDispatcherThreads() {
Collection<MessageDispatchingThread> dispatchersToShutdown = dispatchers.values();
dispatchers.clear();
for (MessageDispatchingThread dispatcher : dispatchersToShutdown)

{ dispatcher.stopDispatcher(); }

}

private final class MessageDispatchingThread extends Thread {
private final Session quickfixSession;
private final BlockingQueue<Message> messages = new LinkedBlockingQueue<Message>();
private volatile boolean stopped;

private MessageDispatchingThread(Session session) {
super("QF/J Session dispatcher: " + session.getSessionID());
quickfixSession = session;
quickfixSession.addStateListener(new SessionStateListener() {
public void onConnect() {
}

public void onDisconnect() {
}

public void onLogon() {
}

public void onLogout()

{ stopped = true; interrupt(); dispatchers.remove(quickfixSession.getSessionID()); }

public void onReset() {
}

public void onRefresh() {
}

public void onMissedHeartBeat() {
}

public void onHeartBeatTimeout() {
}
});
}

public void enqueue(Message message) {
try

{ messages.put(message); }

catch (InterruptedException e)

{ quickfixSession.getLog().onErrorEvent(e.toString()); }

}

public int getQueueSize()

{ return messages.size(); }

public void run() {
while (!stopped) {
try {
Message message = getNextMessage(messages);
if (quickfixSession.hasResponder())

{ quickfixSession.next(message); }

} catch (InterruptedException e) {
if (!stopped)

{ LogUtil.logThrowable(quickfixSession.getSessionID(), "Message dispatcher interrupted", e); return; }

} catch (Throwable e)

{ LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); }

}
}

public void stopDispatcher()

{ stopped = true; }

}

private BlockingQueue<Message> getMessages(SessionID sessionID)

{ MessageDispatchingThread dispatcher = getDispatcher(sessionID); return dispatcher.messages; }

private MessageDispatchingThread getDispatcher(SessionID sessionID)

{ return dispatchers.get(sessionID); }

private Message getNextMessage(BlockingQueue<Message> messages) throws InterruptedException

{ return messages.take(); }

public int getQueueSize() {
int ret = 0;
for(MessageDispatchingThread mdt : dispatchers.values())

{ ret+=mdt.getQueueSize(); }

return ret;
}

}



 Comments   
Comment by Andre Mermegas [ 03/Mar/10 ]

alternate impl

Comment by Andre Mermegas [ 03/Mar/10 ]

to clarify attachments are 2 different implementations of same functionality

Generated at Sat Nov 23 12:29:12 UTC 2024 using JIRA 7.5.2#75007-sha1:9f5725bb824792b3230a5d8716f0c13e296a3cae.