Classes | Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends | List of all members
FIX::ThreadedSocketAcceptor Class Reference

Threaded Socket implementation of Acceptor. More...

#include <ThreadedSocketAcceptor.h>

Inheritance diagram for FIX::ThreadedSocketAcceptor:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketAcceptor:
Collaboration graph
[legend]

Classes

struct  AcceptorThreadInfo
 
struct  ConnectionThreadInfo
 

Public Member Functions

 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketAcceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketAcceptor ()
 
- Public Member Functions inherited from FIX::Acceptor
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Acceptor (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Acceptor ()
 
LoggetLog ()
 
void start () throw ( ConfigError, RuntimeError )
 Start acceptor. More...
 
void block () throw ( ConfigError, RuntimeError )
 Block on the acceptor. More...
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the acceptor. More...
 
void stop (bool force=false)
 Stop acceptor. More...
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on. More...
 
SessiongetSession (const std::string &msg, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 

Private Types

typedef std::set< int > Sockets
 
typedef std::set< SessionIDSessions
 
typedef std::map< int, SessionsPortToSessions
 
typedef std::map< int, int > SocketToPort
 
typedef std::map< int, thread_idSocketToThread
 

Private Member Functions

bool readSettings (const SessionSettings &)
 
void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor. More...
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize acceptor. More...
 
void onStart ()
 Implemented to start listening for connections. More...
 
bool onPoll (double timeout)
 Implemented to connect and poll for events. More...
 
void onStop ()
 Implemented to stop a running acceptor. More...
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 

Static Private Member Functions

static THREAD_PROC socketAcceptorThread (void *p)
 
static THREAD_PROC socketConnectionThread (void *p)
 

Private Attributes

Sockets m_sockets
 
PortToSessions m_portToSessions
 
SocketToPort m_socketToPort
 
SocketToThread m_threads
 
Mutex m_mutex
 

Friends

class SocketConnection
 

Detailed Description

Threaded Socket implementation of Acceptor.

Definition at line 36 of file ThreadedSocketAcceptor.h.

Member Typedef Documentation

typedef std::map< int, Sessions > FIX::ThreadedSocketAcceptor::PortToSessions
private

Definition at line 73 of file ThreadedSocketAcceptor.h.

typedef std::set< SessionID > FIX::ThreadedSocketAcceptor::Sessions
private

Definition at line 72 of file ThreadedSocketAcceptor.h.

typedef std::set< int > FIX::ThreadedSocketAcceptor::Sockets
private

Definition at line 71 of file ThreadedSocketAcceptor.h.

typedef std::map< int, int > FIX::ThreadedSocketAcceptor::SocketToPort
private

Definition at line 74 of file ThreadedSocketAcceptor.h.

typedef std::map< int, thread_id > FIX::ThreadedSocketAcceptor::SocketToThread
private

Definition at line 75 of file ThreadedSocketAcceptor.h.

Constructor & Destructor Documentation

FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 32 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

36 : Acceptor( application, factory, settings )
37 { socket_init(); }
void socket_init()
Definition: Utility.cpp:81
Acceptor(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Acceptor.cpp:36
FIX::ThreadedSocketAcceptor::ThreadedSocketAcceptor ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 39 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_init().

44 : Acceptor( application, factory, settings, logFactory )
45 {
46  socket_init();
47 }
void socket_init()
Definition: Utility.cpp:81
Acceptor(Application &, MessageStoreFactory &, const SessionSettings &)
Definition: Acceptor.cpp:36
FIX::ThreadedSocketAcceptor::~ThreadedSocketAcceptor ( )
virtual

Definition at line 49 of file ThreadedSocketAcceptor.cpp.

References FIX::socket_term().

50 {
51  socket_term();
52 }
void socket_term()
Definition: Utility.cpp:96

Member Function Documentation

void FIX::ThreadedSocketAcceptor::addThread ( int  s,
thread_id  t 
)
private

Definition at line 168 of file ThreadedSocketAcceptor.cpp.

References m_mutex, and m_threads.

Referenced by onStart().

169 {
170  Locker l(m_mutex);
171 
172  m_threads[ s ] = t;
173 }
void FIX::ThreadedSocketAcceptor::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 54 of file ThreadedSocketAcceptor.cpp.

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::SOCKET_ACCEPT_PORT, FIX::SOCKET_NODELAY, and FIX::SOCKET_REUSE_ADDRESS.

56 {
57  std::set<SessionID> sessions = s.getSessions();
58  std::set<SessionID>::iterator i;
59  for( i = sessions.begin(); i != sessions.end(); ++i )
60  {
61  const Dictionary& settings = s.get( *i );
62  settings.getInt( SOCKET_ACCEPT_PORT );
63  if( settings.has(SOCKET_REUSE_ADDRESS) )
64  settings.getBool( SOCKET_REUSE_ADDRESS );
65  if( settings.has(SOCKET_NODELAY) )
66  settings.getBool( SOCKET_NODELAY );
67  }
68 }
const char SOCKET_NODELAY[]
const char SOCKET_ACCEPT_PORT[]
const char SOCKET_REUSE_ADDRESS[]
void FIX::ThreadedSocketAcceptor::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize acceptor.

Reimplemented from FIX::Acceptor.

Definition at line 70 of file ThreadedSocketAcceptor.cpp.

References FIX::IntConvertor::convert(), FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::SOCKET_ACCEPT_PORT, FIX::socket_close(), FIX::socket_createAcceptor(), FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, FIX::SOCKET_REUSE_ADDRESS, FIX::SOCKET_SEND_BUFFER_SIZE, and FIX::socket_setsockopt().

72 {
73  short port = 0;
74  std::set<int> ports;
75 
76  std::set<SessionID> sessions = s.getSessions();
77  std::set<SessionID>::iterator i = sessions.begin();
78  for( ; i != sessions.end(); ++i )
79  {
80  const Dictionary& settings = s.get( *i );
81  port = (short)settings.getInt( SOCKET_ACCEPT_PORT );
82 
83  m_portToSessions[port].insert( *i );
84 
85  if( ports.find(port) != ports.end() )
86  continue;
87  ports.insert( port );
88 
89  const bool reuseAddress = settings.has( SOCKET_REUSE_ADDRESS ) ?
90  settings.getBool( SOCKET_REUSE_ADDRESS ) : true;
91 
92  const bool noDelay = settings.has( SOCKET_NODELAY ) ?
93  settings.getBool( SOCKET_NODELAY ) : false;
94 
95  const int sendBufSize = settings.has( SOCKET_SEND_BUFFER_SIZE ) ?
96  settings.getInt( SOCKET_SEND_BUFFER_SIZE ) : 0;
97 
98  const int rcvBufSize = settings.has( SOCKET_RECEIVE_BUFFER_SIZE ) ?
99  settings.getInt( SOCKET_RECEIVE_BUFFER_SIZE ) : 0;
100 
101  int socket = socket_createAcceptor( port, reuseAddress );
102  if( socket < 0 )
103  {
104  SocketException e;
105  socket_close( socket );
106  throw RuntimeError( "Unable to create, bind, or listen to port "
107  + IntConvertor::convert( (unsigned short)port ) + " (" + e.what() + ")" );
108  }
109  if( noDelay )
110  socket_setsockopt( socket, TCP_NODELAY );
111  if( sendBufSize )
112  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
113  if( rcvBufSize )
114  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
115 
116  m_socketToPort[socket] = port;
117  m_sockets.insert( socket );
118  }
119 }
const char SOCKET_NODELAY[]
const char SOCKET_ACCEPT_PORT[]
const char SOCKET_SEND_BUFFER_SIZE[]
static std::string convert(signed_int value)
const char SOCKET_RECEIVE_BUFFER_SIZE[]
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:186
int socket_createAcceptor(int port, bool reuse)
Definition: Utility.cpp:103
const char SOCKET_REUSE_ADDRESS[]
void socket_close(int s)
Definition: Utility.cpp:158
bool FIX::ThreadedSocketAcceptor::onPoll ( double  second)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Acceptor.

Definition at line 135 of file ThreadedSocketAcceptor.cpp.

136 {
137  return false;
138 }
void FIX::ThreadedSocketAcceptor::onStart ( )
privatevirtual

Implemented to start listening for connections.

Implements FIX::Acceptor.

Definition at line 121 of file ThreadedSocketAcceptor.cpp.

References addThread(), m_mutex, m_sockets, m_socketToPort, socketAcceptorThread(), and FIX::thread_spawn().

122 {
123  Sockets::iterator i;
124  for( i = m_sockets.begin(); i != m_sockets.end(); ++i )
125  {
126  Locker l( m_mutex );
127  int port = m_socketToPort[*i];
128  AcceptorThreadInfo* info = new AcceptorThreadInfo( this, *i, port );
129  thread_id thread;
130  thread_spawn( &socketAcceptorThread, info, thread );
131  addThread( *i, thread );
132  }
133 }
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:393
static THREAD_PROC socketAcceptorThread(void *p)
void addThread(int s, thread_id t)
pthread_t thread_id
Definition: Utility.h:153
void FIX::ThreadedSocketAcceptor::onStop ( )
privatevirtual

Implemented to stop a running acceptor.

Implements FIX::Acceptor.

Definition at line 140 of file ThreadedSocketAcceptor.cpp.

References FIX::Acceptor::isLoggedOn(), m_mutex, m_threads, FIX::socket_close(), FIX::Acceptor::start(), and FIX::thread_join().

141 {
142  SocketToThread threads;
143  SocketToThread::iterator i;
144 
145  {
146  Locker l(m_mutex);
147 
148  time_t start = 0;
149  time_t now = 0;
150 
151  ::time( &start );
152  while ( isLoggedOn() )
153  {
154  if( ::time(&now) -5 >= start )
155  break;
156  }
157 
158  threads = m_threads;
159  m_threads.clear();
160  }
161 
162  for ( i = threads.begin(); i != threads.end(); ++i )
163  socket_close( i->first );
164  for ( i = threads.begin(); i != threads.end(); ++i )
165  thread_join( i->second );
166 }
bool isLoggedOn()
Check to see if any sessions are currently logged on.
Definition: Acceptor.cpp:230
void thread_join(thread_id thread)
Definition: Utility.cpp:414
std::map< int, thread_id > SocketToThread
void start()
Start acceptor.
Definition: Acceptor.cpp:158
void socket_close(int s)
Definition: Utility.cpp:158
bool FIX::ThreadedSocketAcceptor::readSettings ( const SessionSettings )
private
void FIX::ThreadedSocketAcceptor::removeThread ( int  s)
private

Definition at line 175 of file ThreadedSocketAcceptor.cpp.

References m_mutex, m_threads, and FIX::thread_detach().

176 {
177  Locker l(m_mutex);
178  SocketToThread::iterator i = m_threads.find( s );
179  if ( i != m_threads.end() )
180  {
181  thread_detach( i->second );
182  m_threads.erase( i );
183  }
184 }
void thread_detach(thread_id thread)
Definition: Utility.cpp:424
THREAD_PROC FIX::ThreadedSocketAcceptor::socketAcceptorThread ( void *  p)
staticprivate

Definition at line 186 of file ThreadedSocketAcceptor.cpp.

References FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_pAcceptor, FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_port, FIX::ThreadedSocketAcceptor::AcceptorThreadInfo::m_socket, FIX::socket_accept(), FIX::socket_getsockopt(), FIX::socket_peername(), FIX::socket_setsockopt(), socketConnectionThread(), and FIX::thread_spawn().

Referenced by onStart().

187 {
188  AcceptorThreadInfo * info = reinterpret_cast < AcceptorThreadInfo* > ( p );
189 
190  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
191  int s = info->m_socket;
192  int port = info->m_port;
193  delete info;
194 
195  int noDelay = 0;
196  int sendBufSize = 0;
197  int rcvBufSize = 0;
198  socket_getsockopt( s, TCP_NODELAY, noDelay );
199  socket_getsockopt( s, SO_SNDBUF, sendBufSize );
200  socket_getsockopt( s, SO_RCVBUF, rcvBufSize );
201 
202  int socket = 0;
203  while ( ( !pAcceptor->isStopped() && ( socket = socket_accept( s ) ) >= 0 ) )
204  {
205  if( noDelay )
206  socket_setsockopt( socket, TCP_NODELAY );
207  if( sendBufSize )
208  socket_setsockopt( socket, SO_SNDBUF, sendBufSize );
209  if( rcvBufSize )
210  socket_setsockopt( socket, SO_RCVBUF, rcvBufSize );
211 
212  Sessions sessions = pAcceptor->m_portToSessions[port];
213 
214  ThreadedSocketConnection * pConnection =
215  new ThreadedSocketConnection
216  ( socket, sessions, pAcceptor->getLog() );
217 
218  ConnectionThreadInfo* info = new ConnectionThreadInfo( pAcceptor, pConnection );
219 
220  {
221  Locker l( pAcceptor->m_mutex );
222 
223  std::stringstream stream;
224  stream << "Accepted connection from " << socket_peername( socket ) << " on port " << port;
225 
226  if( pAcceptor->getLog() )
227  pAcceptor->getLog()->onEvent( stream.str() );
228 
229  thread_id thread;
230  if ( !thread_spawn( &socketConnectionThread, info, thread ) )
231  delete info;
232  pAcceptor->addThread( socket, thread );
233  }
234  }
235 
236  if( !pAcceptor->isStopped() )
237  pAcceptor->removeThread( s );
238 
239  return 0;
240 }
int socket_getsockopt(int s, int opt, int &optval)
Definition: Utility.cpp:211
int socket_accept(int s)
Definition: Utility.cpp:147
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition: Utility.cpp:393
int socket_setsockopt(int s, int opt)
Definition: Utility.cpp:186
static THREAD_PROC socketConnectionThread(void *p)
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)
pthread_t thread_id
Definition: Utility.h:153
const char * socket_peername(int socket)
Definition: Utility.cpp:331
THREAD_PROC FIX::ThreadedSocketAcceptor::socketConnectionThread ( void *  p)
staticprivate

Definition at line 242 of file ThreadedSocketAcceptor.cpp.

References FIX::ThreadedSocketConnection::getSocket(), FIX::ThreadedSocketAcceptor::ConnectionThreadInfo::m_pAcceptor, FIX::ThreadedSocketAcceptor::ConnectionThreadInfo::m_pConnection, and FIX::ThreadedSocketConnection::read().

Referenced by socketAcceptorThread().

243 {
244  ConnectionThreadInfo * info = reinterpret_cast < ConnectionThreadInfo* > ( p );
245 
246  ThreadedSocketAcceptor* pAcceptor = info->m_pAcceptor;
247  ThreadedSocketConnection* pConnection = info->m_pConnection;
248  delete info;
249 
250  int socket = pConnection->getSocket();
251 
252  while ( pConnection->read() ) {}
253  delete pConnection;
254  if( !pAcceptor->isStopped() )
255  pAcceptor->removeThread( socket );
256  return 0;
257 }
ThreadedSocketAcceptor(Application &, MessageStoreFactory &, const SessionSettings &)

Friends And Related Function Documentation

friend class SocketConnection
friend

Definition at line 38 of file ThreadedSocketAcceptor.h.

Member Data Documentation

Mutex FIX::ThreadedSocketAcceptor::m_mutex
private

Definition at line 93 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStart(), onStop(), and removeThread().

PortToSessions FIX::ThreadedSocketAcceptor::m_portToSessions
private

Definition at line 90 of file ThreadedSocketAcceptor.h.

Sockets FIX::ThreadedSocketAcceptor::m_sockets
private

Definition at line 89 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

SocketToPort FIX::ThreadedSocketAcceptor::m_socketToPort
private

Definition at line 91 of file ThreadedSocketAcceptor.h.

Referenced by onStart().

SocketToThread FIX::ThreadedSocketAcceptor::m_threads
private

Definition at line 92 of file ThreadedSocketAcceptor.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Sat Mar 29 2014 15:13:35 for QuickFIX by doxygen 1.8.5 written by Dimitri van Heesch, © 1997-2001