[QFJ-510] ThreadPerSessionEventHandlingStrategy.MessageDispatchingThread threads live after disconnect Created: 03/Mar/10 Updated: 15/Nov/12 Resolved: 05/Apr/10 |
|
Status: | Closed |
Project: | QuickFIX/J |
Component/s: | Engine |
Affects Version/s: | 1.4.0 |
Fix Version/s: | None |
Type: | Improvement | Priority: | Default |
Reporter: | Andre Mermegas | Assignee: | Unassigned |
Resolution: | Duplicate | Votes: | 0 |
Labels: | None |
Attachments: | ThreadPerSessionEventHandlingStrategy.java ThreadPerSessionEventHandlingStrategy.java ThreadPerSessionEventHandlingStrategy.java | ||||||||
Issue Links: |
|
Description |
This patch ends dispatcher thread after logout. /*******************************************************************************
package quickfix.mina; import quickfix.LogUtil; import java.util.Collection; /**
public void onMessage(Session quickfixSession, Message message) { dispatcher.enqueue(message); /** There is no such thing as a SesionConnector for thread-per-session handler - we don't multiplex
protected void startDispatcherThread(MessageDispatchingThread dispatcher) { dispatcher.start(); } public void stopDispatcherThreads() { } private final class MessageDispatchingThread extends Thread { private MessageDispatchingThread(Session session) { public void onDisconnect() { public void onLogon() { public void onLogout() { stopped = true; interrupt(); dispatchers.remove(quickfixSession.getSessionID()); } public void onReset() { public void onRefresh() { public void onMissedHeartBeat() { public void onHeartBeatTimeout() { public void enqueue(Message message) { catch (InterruptedException e) { quickfixSession.getLog().onErrorEvent(e.toString()); }} public int getQueueSize() { return messages.size(); } public void run() { } catch (InterruptedException e) { } catch (Throwable e) { LogUtil.logThrowable(quickfixSession.getSessionID(), "Error during message processing", e); } } public void stopDispatcher() { stopped = true; }} private BlockingQueue<Message> getMessages(SessionID sessionID) { MessageDispatchingThread dispatcher = getDispatcher(sessionID); return dispatcher.messages; }private MessageDispatchingThread getDispatcher(SessionID sessionID) { return dispatchers.get(sessionID); }private Message getNextMessage(BlockingQueue<Message> messages) throws InterruptedException { return messages.take(); } public int getQueueSize() { return ret; } |
Comments |
Comment by Andre Mermegas [ 03/Mar/10 ] |
alternate impl |
Comment by Andre Mermegas [ 03/Mar/10 ] |
to clarify attachments are 2 different implementations of same functionality |