Initiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "Initiator.h"
27 #include "Utility.h"
28 #include "Session.h"
29 #include "SessionFactory.h"
30 #include "HttpServer.h"
31 #include <algorithm>
32 #include <fstream>
33 
34 namespace FIX
35 {
37  MessageStoreFactory& messageStoreFactory,
38  const SessionSettings& settings ) throw( ConfigError )
39 : m_threadid( 0 ),
40  m_application( application ),
41  m_messageStoreFactory( messageStoreFactory ),
42  m_settings( settings ),
43  m_pLogFactory( 0 ),
44  m_pLog( 0 ),
45  m_firstPoll( true ),
46  m_stop( true )
47 { initialize(); }
48 
50  MessageStoreFactory& messageStoreFactory,
51  const SessionSettings& settings,
52  LogFactory& logFactory ) throw( ConfigError )
53 : m_threadid( 0 ),
54  m_application( application ),
55  m_messageStoreFactory( messageStoreFactory ),
56  m_settings( settings ),
57  m_pLogFactory( &logFactory ),
58  m_pLog( logFactory.create() ),
59  m_firstPoll( true ),
60  m_stop( true )
61 { initialize(); }
62 
64 {
65  std::set < SessionID > sessions = m_settings.getSessions();
66  std::set < SessionID > ::iterator i;
67 
68  if ( !sessions.size() )
69  throw ConfigError( "No sessions defined" );
70 
72  m_pLogFactory );
73 
74  for ( i = sessions.begin(); i != sessions.end(); ++i )
75  {
76  if ( m_settings.get( *i ).getString( "ConnectionType" ) == "initiator" )
77  {
78  m_sessionIDs.insert( *i );
79  m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
80  setDisconnected( *i );
81  }
82  }
83 
84  if ( !m_sessions.size() )
85  throw ConfigError( "No sessions defined for initiator" );
86 }
87 
89 {
90  Sessions::iterator i;
91  for ( i = m_sessions.begin(); i != m_sessions.end(); ++i )
92  delete i->second;
93 
94  if( m_pLogFactory && m_pLog )
96 }
97 
99  Responder& responder )
100 {
101  Sessions::iterator i = m_sessions.find( sessionID );
102  if ( i != m_sessions.end() )
103  {
104  i->second->setResponder( &responder );
105  return i->second;
106  }
107  return 0;
108 }
109 
110 Session* Initiator::getSession( const SessionID& sessionID ) const
111 {
112  Sessions::const_iterator i = m_sessions.find( sessionID );
113  if( i != m_sessions.end() )
114  return i->second;
115  else
116  return 0;
117 }
118 
119 const Dictionary* const Initiator::getSessionSettings( const SessionID& sessionID ) const
120 {
121  try
122  {
123  return &m_settings.get( sessionID );
124  }
125  catch( ConfigError& )
126  {
127  return 0;
128  }
129 }
130 
132 {
133  Locker l(m_mutex);
134 
135  SessionIDs disconnected = m_disconnected;
136  SessionIDs::iterator i = disconnected.begin();
137  for ( ; i != disconnected.end(); ++i )
138  {
139  Session* pSession = Session::lookupSession( *i );
140  if ( pSession->isEnabled() && pSession->isSessionTime(UtcTimeStamp()) )
141  doConnect( *i, m_settings.get( *i ));
142  }
143 }
144 
145 void Initiator::setPending( const SessionID& sessionID )
146 {
147  Locker l(m_mutex);
148 
149  m_pending.insert( sessionID );
150  m_connected.erase( sessionID );
151  m_disconnected.erase( sessionID );
152 }
153 
154 void Initiator::setConnected( const SessionID& sessionID )
155 {
156  Locker l(m_mutex);
157 
158  m_pending.erase( sessionID );
159  m_connected.insert( sessionID );
160  m_disconnected.erase( sessionID );
161 }
162 
163 void Initiator::setDisconnected( const SessionID& sessionID )
164 {
165  Locker l(m_mutex);
166 
167  m_pending.erase( sessionID );
168  m_connected.erase( sessionID );
169  m_disconnected.insert( sessionID );
170 }
171 
172 bool Initiator::isPending( const SessionID& sessionID )
173 {
174  Locker l(m_mutex);
175  return m_pending.find( sessionID ) != m_pending.end();
176 }
177 
178 bool Initiator::isConnected( const SessionID& sessionID )
179 {
180  Locker l(m_mutex);
181  return m_connected.find( sessionID ) != m_connected.end();
182 }
183 
184 bool Initiator::isDisconnected( const SessionID& sessionID )
185 {
186  Locker l(m_mutex);
187  return m_disconnected.find( sessionID ) != m_disconnected.end();
188 }
189 
191 {
192  m_stop = false;
195 
197 
198  if( !thread_spawn( &startThread, this, m_threadid ) )
199  throw RuntimeError("Unable to spawn thread");
200 }
201 
202 
204 {
205  m_stop = false;
208 
209  startThread(this);
210 }
211 
212 bool Initiator::poll( double timeout ) throw ( ConfigError, RuntimeError )
213 {
214  if( m_firstPoll )
215  {
216  m_stop = false;
217  onConfigure( m_settings );
218  onInitialize( m_settings );
219  connect();
220  m_firstPoll = false;
221  }
222 
223  return onPoll( timeout );
224 }
225 
226 void Initiator::stop( bool force )
227 {
228  if( isStopped() ) return;
229 
231 
232  std::vector<Session*> enabledSessions;
233 
234  SessionIDs connected = m_connected;
235  SessionIDs::iterator i = connected.begin();
236  for ( ; i != connected.end(); ++i )
237  {
238  Session* pSession = Session::lookupSession(*i);
239  if( pSession && pSession->isEnabled() )
240  {
241  enabledSessions.push_back( pSession );
242  pSession->logout();
243  }
244  }
245 
246  if( !force )
247  {
248  for ( int second = 1; second <= 10 && isLoggedOn(); ++second )
249  process_sleep( 1 );
250  }
251 
252  {
253  Locker l(m_mutex);
254  for ( i = connected.begin(); i != connected.end(); ++i )
256  }
257 
258  m_stop = true;
259  onStop();
260  if( m_threadid )
262  m_threadid = 0;
263 
264  std::vector<Session*>::iterator session = enabledSessions.begin();
265  for( ; session != enabledSessions.end(); ++session )
266  (*session)->logon();
267 }
268 
270 {
271  Locker l(m_mutex);
272 
273  SessionIDs connected = m_connected;
274  SessionIDs::iterator i = connected.begin();
275  for ( ; i != connected.end(); ++i )
276  {
278  return true;
279  }
280  return false;
281 }
282 
284 {
285  Initiator * pInitiator = static_cast < Initiator* > ( p );
286  pInitiator->onStart();
287  return 0;
288 }
289 }
static THREAD_PROC startThread(void *p)
Definition: Initiator.cpp:283
bool isEnabled()
Definition: Session.h:59
static void startGlobal(const SessionSettings &)
Definition: HttpServer.cpp:37
void thread_join(thread_id thread)
Definition: Utility.cpp:414
void initialize()
Definition: Initiator.cpp:63
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
virtual void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
Definition: Initiator.h:110
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
Session * getSession(const SessionID &sessionID, Responder &)
Definition: Initiator.cpp:98
Interface implements sending on and disconnecting a transport.
Definition: Responder.h:34
bool isDisconnected(const SessionID &)
Definition: Initiator.cpp:184
SessionIDs m_connected
Definition: Initiator.h:131
bool isPending(const SessionID &)
Definition: Initiator.cpp:172
Sessions m_sessions
Definition: Initiator.h:128
thread_id m_threadid
Definition: Initiator.h:135
void block()
Block on the initiator.
Definition: Initiator.cpp:203
void process_sleep(double s)
Definition: Utility.cpp:443
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Initiator.cpp:36
virtual void doConnect(const SessionID &, const Dictionary &)=0
Implemented to connect a session to its target.
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:393
LogFactory * m_pLogFactory
Definition: Initiator.h:139
virtual void destroy(Log *)=0
Application & m_application
Definition: Initiator.h:136
const Dictionary & get(const SessionID &) const
Get a dictionary for a session.
std::set< SessionID > SessionIDs
Definition: Initiator.h:124
virtual ~Initiator()
Definition: Initiator.cpp:88
This interface must be implemented to define what your FIX application does.
Definition: Application.h:43
const Dictionary *const getSessionSettings(const SessionID &sessionID) const
Definition: Initiator.cpp:119
Responsible for creating Session objects.
Application encountered serious error during runtime
Definition: Exceptions.h:94
Application is not configured correctly
Definition: Exceptions.h:87
Locks/Unlocks a mutex using RAII.
Definition: Mutex.h:95
Container for setting dictionaries mapped to sessions.
SessionIDs m_sessionIDs
Definition: Initiator.h:129
SessionIDs m_pending
Definition: Initiator.h:130
const SessionID & getSessionID() const
Definition: Session.h:75
void start()
Start initiator.
Definition: Initiator.cpp:190
This interface must be implemented to create a Log.
Definition: Log.h:42
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Definition: Dictionary.cpp:32
virtual void onStart()=0
Implemented to start connecting to targets.
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:41
MessageStoreFactory & m_messageStoreFactory
Definition: Initiator.h:137
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:269
virtual void onStop()=0
Implemented to stop a running initiator.
static void stopGlobal()
Definition: HttpServer.cpp:53
void logout(const std::string &reason="")
Definition: Session.h:57
Session * create(const SessionID &sessionID, const Dictionary &settings)
void setConnected(const SessionID &)
Definition: Initiator.cpp:154
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:36
SessionSettings m_settings
Definition: Initiator.h:138
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30
virtual void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
Definition: Initiator.h:112
bool isSessionTime(const UtcTimeStamp &time)
Definition: Session.h:108
#define THREAD_PROC
Definition: Utility.h:147
bool isStopped()
Definition: Initiator.h:83
SessionIDs m_disconnected
Definition: Initiator.h:132
std::set< SessionID > getSessions() const
void stop(bool force=false)
Stop initiator.
Definition: Initiator.cpp:226
bool poll(double timeout=0.0)
Poll the initiator.
Definition: Initiator.cpp:212
Base for classes which act as an initiator for establishing connections.
Definition: Initiator.h:51
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163
bool isLoggedOn()
Definition: Session.h:65
bool isConnected(const SessionID &)
Definition: Initiator.cpp:178

Generated on Sat Mar 29 2014 15:13:32 for QuickFIX by doxygen 1.8.5 written by Dimitri van Heesch, © 1997-2001