SocketInitiator.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 ** Copyright (c) 2001-2014
3 **
4 ** This file is part of the QuickFIX FIX Engine
5 **
6 ** This file may be distributed under the terms of the quickfixengine.org
7 ** license as defined by quickfixengine.org and appearing in the file
8 ** LICENSE included in the packaging of this file.
9 **
10 ** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11 ** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12 **
13 ** See http://www.quickfixengine.org/LICENSE for licensing information.
14 **
15 ** Contact ask@quickfixengine.org if any conditions of this licensing are
16 ** not clear to you.
17 **
18 ****************************************************************************/
19 
20 #ifdef _MSC_VER
21 #include "stdafx.h"
22 #else
23 #include "config.h"
24 #endif
25 
26 #include "SocketInitiator.h"
27 #include "Session.h"
28 #include "Settings.h"
29 
30 namespace FIX
31 {
33  MessageStoreFactory& factory,
34  const SessionSettings& settings )
35 throw( ConfigError )
36 : Initiator( application, factory, settings ),
37  m_connector( 1 ), m_lastConnect( 0 ),
38  m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
39  m_rcvBufSize( 0 )
40 {
41 }
42 
44  MessageStoreFactory& factory,
45  const SessionSettings& settings,
46  LogFactory& logFactory )
47 throw( ConfigError )
48 : Initiator( application, factory, settings, logFactory ),
49  m_connector( 1 ), m_lastConnect( 0 ),
50  m_reconnectInterval( 30 ), m_noDelay( false ), m_sendBufSize( 0 ),
51  m_rcvBufSize( 0 )
52 {
53 }
54 
56 {
57  SocketConnections::iterator i;
58  for (i = m_connections.begin();
59  i != m_connections.end(); ++i)
60  delete i->second;
61 
62  for (i = m_pendingConnections.begin();
63  i != m_pendingConnections.end(); ++i)
64  delete i->second;
65 }
66 
68 throw ( ConfigError )
69 {
70  const Dictionary& dict = s.get();
71 
72  if( dict.has( RECONNECT_INTERVAL ) )
73  m_reconnectInterval = dict.getInt( RECONNECT_INTERVAL );
74  if( dict.has( SOCKET_NODELAY ) )
75  m_noDelay = dict.getBool( SOCKET_NODELAY );
76  if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
77  m_sendBufSize = dict.getInt( SOCKET_SEND_BUFFER_SIZE );
78  if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
79  m_rcvBufSize = dict.getInt( SOCKET_RECEIVE_BUFFER_SIZE );
80 }
81 
83 throw ( RuntimeError )
84 {
85 }
86 
88 {
89  connect();
90 
91  while ( !isStopped() ) {
92  m_connector.block( *this, false, 1.0 );
94  }
95 
96  time_t start = 0;
97  time_t now = 0;
98 
99  ::time( &start );
100  while ( isLoggedOn() )
101  {
102  m_connector.block( *this );
103  if( ::time(&now) -5 >= start )
104  break;
105  }
106 }
107 
108 bool SocketInitiator::onPoll( double timeout )
109 {
110  time_t start = 0;
111  time_t now = 0;
112 
113  if( isStopped() )
114  {
115  if( start == 0 )
116  ::time( &start );
117  if( !isLoggedOn() )
118  return false;
119  if( ::time(&now) - 5 >= start )
120  return false;
121  }
122 
123  m_connector.block( *this, true, timeout );
124  return true;
125 }
126 
128 {
129 }
130 
132 {
133  try
134  {
135  std::string address;
136  short port = 0;
137  Session* session = Session::lookupSession( s );
138  if( !session->isSessionTime(UtcTimeStamp()) ) return;
139 
140  Log* log = session->getLog();
141 
142  getHost( s, d, address, port );
143 
144  log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
145  int result = m_connector.connect( address, port, m_noDelay, m_sendBufSize, m_rcvBufSize );
146  setPending( s );
147 
148  m_pendingConnections[ result ]
149  = new SocketConnection( *this, s, result, &m_connector.getMonitor() );
150  }
151  catch ( std::exception& ) {}
152 }
153 
155 {
156  SocketConnections::iterator i = m_pendingConnections.find( s );
157  if( i == m_pendingConnections.end() ) return;
158  SocketConnection* pSocketConnection = i->second;
159 
160  m_connections[s] = pSocketConnection;
161  m_pendingConnections.erase( i );
162  setConnected( pSocketConnection->getSession()->getSessionID() );
163  pSocketConnection->onTimeout();
164 }
165 
166 void SocketInitiator::onWrite( SocketConnector& connector, int s )
167 {
168  SocketConnections::iterator i = m_connections.find( s );
169  if ( i == m_connections.end() ) return ;
170  SocketConnection* pSocketConnection = i->second;
171  if( pSocketConnection->processQueue() )
172  pSocketConnection->unsignal();
173 }
174 
175 bool SocketInitiator::onData( SocketConnector& connector, int s )
176 {
177  SocketConnections::iterator i = m_connections.find( s );
178  if ( i == m_connections.end() ) return false;
179  SocketConnection* pSocketConnection = i->second;
180  return pSocketConnection->read( connector );
181 }
182 
184 {
185  SocketConnections::iterator i = m_connections.find( s );
186  SocketConnections::iterator j = m_pendingConnections.find( s );
187 
188  SocketConnection* pSocketConnection = 0;
189  if( i != m_connections.end() )
190  pSocketConnection = i->second;
191  if( j != m_pendingConnections.end() )
192  pSocketConnection = j->second;
193  if( !pSocketConnection )
194  return;
195 
196  setDisconnected( pSocketConnection->getSession()->getSessionID() );
197 
198  Session* pSession = pSocketConnection->getSession();
199  if ( pSession )
200  {
201  pSession->disconnect();
202  setDisconnected( pSession->getSessionID() );
203  }
204 
205  delete pSocketConnection;
206  m_connections.erase( s );
207  m_pendingConnections.erase( s );
208 }
209 
211 {
212  onTimeout( connector );
213 }
214 
216 {
217  time_t now;
218  ::time( &now );
219 
220  if ( (now - m_lastConnect) >= m_reconnectInterval )
221  {
222  connect();
223  m_lastConnect = now;
224  }
225 
226  SocketConnections::iterator i;
227  for ( i = m_connections.begin(); i != m_connections.end(); ++i )
228  i->second->onTimeout();
229 }
230 
232  std::string& address, short& port )
233 {
234  int num = 0;
235  SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
236  if ( i != m_sessionToHostNum.end() ) num = i->second;
237 
238  std::stringstream hostStream;
239  hostStream << SOCKET_CONNECT_HOST << num;
240  std::string hostString = hostStream.str();
241 
242  std::stringstream portStream;
243  portStream << SOCKET_CONNECT_PORT << num;
244  std::string portString = portStream.str();
245 
246  if( d.has(hostString) && d.has(portString) )
247  {
248  address = d.getString( hostString );
249  port = ( short ) d.getInt( portString );
250  }
251  else
252  {
253  num = 0;
254  address = d.getString( SOCKET_CONNECT_HOST );
255  port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
256  }
257 
258  m_sessionToHostNum[ s ] = ++num;
259 }
260 }
void onConnect(SocketConnector &, int)
void onStart()
Implemented to start connecting to targets.
static Session * lookupSession(const SessionID &)
Definition: Session.cpp:1403
Maintains the state and implements the logic of a FIX session.
Definition: Session.h:45
SocketConnections m_pendingConnections
SocketConnector m_connector
const char RECONNECT_INTERVAL[]
void doConnect(const SessionID &, const Dictionary &d)
Implemented to connect a session to its target.
Connects sockets to remote ports and addresses.
const char SOCKET_NODELAY[]
SessionToHostNum m_sessionToHostNum
const char SOCKET_CONNECT_HOST[]
const char SOCKET_SEND_BUFFER_SIZE[]
void onConfigure(const SessionSettings &)
Implemented to configure acceptor.
void onStop()
Implemented to stop a running initiator.
bool read(SocketConnector &s)
int getInt(const std::string &) const
Get a value as a int.
Definition: Dictionary.cpp:45
const char SOCKET_CONNECT_PORT[]
static std::string convert(signed_int value)
const char SOCKET_RECEIVE_BUFFER_SIZE[]
This interface must be implemented to log messages and events.
Definition: Log.h:81
SocketMonitor & getMonitor()
SocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)
virtual void onEvent(const std::string &)=0
Log * getLog()
Definition: Session.h:214
This interface must be implemented to define what your FIX application does.
Definition: Application.h:43
void block(Strategy &strategy, bool poll=0, double timeout=0.0)
Application encountered serious error during runtime
Definition: Exceptions.h:94
Application is not configured correctly
Definition: Exceptions.h:87
Container for setting dictionaries mapped to sessions.
void disconnect()
Definition: Session.cpp:542
bool onPoll(double timeout)
Implemented to connect and poll for events.
bool has(const std::string &) const
Check if the dictionary contains a value for key.
Definition: Dictionary.cpp:150
const SessionID & getSessionID() const
Definition: Session.h:75
void start()
Start initiator.
Definition: Initiator.cpp:190
This interface must be implemented to create a Log.
Definition: Log.h:42
std::string getString(const std::string &, bool capitalize=false) const
Get a value as a string.
Definition: Dictionary.cpp:32
This interface must be implemented to create a MessageStore.
Definition: MessageStore.h:41
void getHost(const SessionID &, const Dictionary &, std::string &, short &)
bool getBool(const std::string &) const
Get a value as a bool.
Definition: Dictionary.cpp:71
SocketConnections m_connections
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Initiator.cpp:269
void onDisconnect(SocketConnector &, int)
bool onData(SocketConnector &, int)
void setConnected(const SessionID &)
Definition: Initiator.cpp:154
Date and Time represented in UTC.
Definition: FieldTypes.h:399
For storage and retrieval of key/value pairs.
Definition: Dictionary.h:36
Encapsulates a socket file descriptor (single-threaded).
Session * getSession() const
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition: SessionID.h:30
void onInitialize(const SessionSettings &)
Implemented to initialize initiator.
bool isSessionTime(const UtcTimeStamp &time)
Definition: Session.h:108
void onWrite(SocketConnector &, int)
bool isStopped()
Definition: Initiator.h:83
int connect(const std::string &address, int port, bool noDelay, int sendBufSize, int rcvBufSize)
void onError(SocketConnector &)
Base for classes which act as an initiator for establishing connections.
Definition: Initiator.h:51
void setPending(const SessionID &)
Definition: Initiator.cpp:145
void onTimeout(SocketConnector &)
void setDisconnected(const SessionID &)
Definition: Initiator.cpp:163

Generated on Sat Mar 29 2014 15:13:32 for QuickFIX by doxygen 1.8.5 written by Dimitri van Heesch, © 1997-2001