TSDuck v3.45-4686
MPEG Transport Stream Toolkit
Loading...
Searching...
No Matches
ts::ReactiveTCPConnection Class Reference

TCP connected socket for use in a Reactor environment. More...

#include <tsReactiveTCPConnection.h>

Inheritance diagram for ts::ReactiveTCPConnection:
Collaboration diagram for ts::ReactiveTCPConnection:

Public Member Functions

 ReactiveTCPConnection (Reactor &reactor, TCPConnection &socket, Object *owner=nullptr)
 Constructor.
 
virtual ~ReactiveTCPConnection () override
 Destructor.
 
void cancelSendReceive (bool silent=false)
 Cancel any pending send or receive operation on this socket.
 
bool isOpen () const
 Check if the reactive socket is open.
 
bool isOwned ()
 Check if the object is owned.
 
template<class OBJECT >
requires std::derived_from<OBJECT, ts::Object>
bool isOwned ()
 Check if the object is owned by an object of a given type.
 
Objectowner ()
 Get the address of the optional "owner" object which was specified in the constructor.
 
template<class OBJECT >
requires std::derived_from<OBJECT, ts::Object>
OBJECT * owner ()
 Get the address of the "owner" object which was specified in the constructor.
 
Reactorreactor ()
 Get a reference to the associated reactor.
 
Reportreport ()
 Get a reference to the associated report.
 
TCPConnectionsocket ()
 Get a reference to the associated socket.
 
bool startClose (ReactiveTCPConnectionHandlerInterface *handler, bool silent=false, const ObjectPtr &user_data=ObjectPtr())
 Start closing the socket.
 
bool startCloseWriter (ReactiveTCPConnectionHandlerInterface *handler, bool silent=false, const ObjectPtr &user_data=ObjectPtr())
 Start closing the send direction of the socket.
 
bool startConnect (ReactiveTCPConnectionHandlerInterface *handler, const IPSocketAddress &addr, const ObjectPtr &user_data=ObjectPtr())
 Start the operation of connecting to a TCP server.
 
bool startReceive (ReactiveTCPConnectionHandlerInterface *handler, size_t buffer_size=DEFAULT_RECEIVE_BUFFER_SIZE, const ObjectPtr &user_data=ObjectPtr())
 Start the operation of receiving messages from the socket.
 
bool startSend (ReactiveTCPConnectionHandlerInterface *handler, const void *data, size_t size, const ObjectPtr &user_data=ObjectPtr())
 Start the operation of sending data over the TCP connection.
 
void whenAccepted (ReactiveTCPConnectionHandlerInterface *handler, const ObjectPtr &user_data=ObjectPtr())
 Define the handler to call when accepted as a client session by a TCP server.
 

Static Public Attributes

static constexpr size_t DEFAULT_RECEIVE_BUFFER_SIZE = 4096
 Default buffer size for receive operations.
 

Protected Types

using IOQueue = std::list< std::shared_ptr< IOSB > >
 Queues of I/O requests are queues of shared_ptr to IOSB.
 
using IOSB = NonBlockingDevice::IOSB
 IOSB shortcut fpr subclasses.
 
using IOSet = std::set< std::shared_ptr< IOSB > >
 Unordered set of I/O requests, set of shared_ptr to IOSB.
 

Protected Member Functions

bool activateAsynchronousIO ()
 Activate notification for asynchronous I/O.
 
bool activateReadReady ()
 Activate read-ready notification for non-blocking I/O.
 
bool activateWriteReady ()
 Activate write-ready notification for non-blocking I/O.
 
bool cancelAndWaitAsynchronousIO (NonBlockingDevice::IOSB &iosb, bool silent)
 Cancel one specific pending asynchronous I/O and wait for its completion.
 
void cancelAsynchronousIO (bool silent)
 Cancel all asynchronous I/O in progress.
 
template<class REQUEST >
requires std::derived_from<REQUEST, ts::Object>
void cancelQueue (IOQueue &inqueue, IOQueue &outqueue)
 Transfer all requests from one queue to another and mark all I/O as canceled.
 
void deactivateAll (bool silent)
 Deactivate all registrations for non-blocking and asynchronous I/O.
 
void deactivateAsynchronousIO (bool silent)
 Deactivate notification for asynchronous I/O.
 
void deactivateCompletedIO (bool silent)
 Deactivate the execution of processCompletedIO() in the context of a Reactor handler.
 
void deactivateReadReady (bool silent)
 Deactivate read-ready notification for non-blocking I/O.
 
void deactivateWriteReady (bool silent)
 Deactivate write-ready notification for non-blocking I/O.
 
virtual void handleTimer (Reactor &reactor, EventId id)
 Handle a timer in a Reactor.
 
virtual void handleUserEvent (Reactor &, EventId) override
 Handle a user-defined event in a Reactor.
 
std::shared_ptr< IOSBremoveFromQueue (IOQueue &queue, IOSB *iosb)
 Search and remove a shared_ptr to IOSB, based on an IOSB address.
 
bool signalCompletedIO ()
 Trigger the execution of processCompletedIO() in the context of a Reactor handler.
 

Detailed Description

TCP connected socket for use in a Reactor environment.

The class ReactiveTCPConnection is a wrapper around TCPConnection to handle reactive I/O.

The actual socket is a separate object. It is initialized and configured by the application. The application shall not directly call connect(), send(), receive(), closeWriter(), or close() on this socket and delegate these operations to startConnect(), startSend(), startReceive(), startCloseWriter() and startClose() in class ReactiveTCPConnection.

Member Typedef Documentation

◆ IOQueue

using ts::ReactiveBase::IOQueue = std::list<std::shared_ptr<IOSB> >
protectedinherited

Queues of I/O requests are queues of shared_ptr to IOSB.

This is typically used with non-blocking I/O where we must process requests in order. Send and receive requests are structures which are stored in the react_data of the IOSB.

◆ IOSet

using ts::ReactiveBase::IOSet = std::set<std::shared_ptr<IOSB> >
protectedinherited

Unordered set of I/O requests, set of shared_ptr to IOSB.

This is typically used with asynchronous I/O. The ordering is enforced because I/O are started in order of calls from applications. The completion processing is likely the same, but driven by the system I/O Completion Ports and we must not assume any order. Send and receive requests are structures which are stored in the react_data of the IOSB.

Constructor & Destructor Documentation

◆ ReactiveTCPConnection()

ts::ReactiveTCPConnection::ReactiveTCPConnection ( Reactor reactor,
TCPConnection socket,
Object owner = nullptr 
)

Constructor.

Parameters
[in,out]reactorAssociated reactor. The reactor object must remain valid as long as this object is valid.
[in,out]socketAssociated socket. The socket object must remain valid as long as this object is valid. The ReactiveTCPConnection must be initialized before the socket is opened.
[in]ownerOptional address of an "owner" object, typically an instance of class containing this object.

Member Function Documentation

◆ socket()

TCPConnection & ts::ReactiveTCPConnection::socket ( )
inline

Get a reference to the associated socket.

Returns
A reference to the associated socket.

◆ isOpen()

bool ts::ReactiveTCPConnection::isOpen ( ) const
inline

Check if the reactive socket is open.

This is different from Socket::isOpen() during the closing phase, after startClose() has been called but before the underlying socket is fully closed.

Returns
True if the reactive socket is open, false if the underlying socket is closed or if startClose() has been called.

◆ startConnect()

bool ts::ReactiveTCPConnection::startConnect ( ReactiveTCPConnectionHandlerInterface handler,
const IPSocketAddress addr,
const ObjectPtr user_data = ObjectPtr() 
)

Start the operation of connecting to a TCP server.

Parameters
[in]handlerHandler class to call when the connect operation completes. The method handleTCPConnected() will be called. If nullptr, no handler is called.
[in]addrIP address and port of the server to connect to.
[in]user_dataA shared pointer which will be passed unmodified to handler.
Returns
True on success, false on error. Success means that the connection was successfully started. The final status of the I/O will be transmitted in the handler.

◆ whenAccepted()

void ts::ReactiveTCPConnection::whenAccepted ( ReactiveTCPConnectionHandlerInterface handler,
const ObjectPtr user_data = ObjectPtr() 
)

Define the handler to call when accepted as a client session by a TCP server.

Typically called from the constructor of an enclosing object which is used as client session context.

Parameters
[in]handlerHandler class to call when this object is connected to a remote client by a server. The method handleTCPAccepted() will be called. If nullptr, no handler is called.
[in]user_dataA shared pointer which will be passed unmodified to handler.

◆ startSend()

bool ts::ReactiveTCPConnection::startSend ( ReactiveTCPConnectionHandlerInterface handler,
const void *  data,
size_t  size,
const ObjectPtr user_data = ObjectPtr() 
)

Start the operation of sending data over the TCP connection.

Parameters
[in]handlerHandler class to call when the send operation completes. The method handleTCPSend() will be called. If nullptr, no handler is called.
[in]dataAddress of the data to send. The corresponding memory area must remain valid until the completion or cancelation of the send operation.
[in]sizeSize in bytes of the data to send.
[in]user_dataA shared pointer which will be passed unmodified to handler.
Returns
True on success, false on error. Success means that the I/O was successfully started. The final status of the I/O will be transmitted in the handler.

◆ startCloseWriter()

bool ts::ReactiveTCPConnection::startCloseWriter ( ReactiveTCPConnectionHandlerInterface handler,
bool  silent = false,
const ObjectPtr user_data = ObjectPtr() 
)

Start closing the send direction of the socket.

The peer will receive an end-of-file condition. All pending send operations are guaranteed to complete before that end-of-file is sent.

Parameters
[in]handlerHandler class to call when the close-writer operation completes. The method handleTCPSend() will be called with its parameter error_code containing SYS_EOF. If nullptr, no handler is called.
[in]silentIf true, do not report errors through the logger.
[in]user_dataA shared pointer which will be passed unmodified to handler.
Returns
True on success, false on error.

◆ startReceive()

bool ts::ReactiveTCPConnection::startReceive ( ReactiveTCPConnectionHandlerInterface handler,
size_t  buffer_size = DEFAULT_RECEIVE_BUFFER_SIZE,
const ObjectPtr user_data = ObjectPtr() 
)

Start the operation of receiving messages from the socket.

Parameters
[in]handlerHandler class to call each time data are received. The method handleTCPReceive() will be called for each new message. Cannot be null. If a previous receive handler was registered, it is replaced.
[in]buffer_sizeSize of input buffers to receive data.
[in]user_dataA shared pointer which will be passed unmodified to handler.
Returns
True on success, false on error. Success means that the I/O was successfully started. The final status of the I/O will be transmitted in the handler.

◆ cancelSendReceive()

void ts::ReactiveTCPConnection::cancelSendReceive ( bool  silent = false)

Cancel any pending send or receive operation on this socket.

If a repeated reception operation is in progress, the repetition is canceled as well.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ startClose()

bool ts::ReactiveTCPConnection::startClose ( ReactiveTCPConnectionHandlerInterface handler,
bool  silent = false,
const ObjectPtr user_data = ObjectPtr() 
)

Start closing the socket.

Pending asynchronous operations are canceled. The actual cancelation will take place later. In the meantime, the user's data buffers for these pending operations are busy and shall not be destroyed / deallocated by the application. The close operation terminates when the handler handleTCPClosed() is invoked. At this point, no more operation is pending and the application may get rid of data buffers.

Note that the application, usually inside a handler, can call disconnect() on the TCPConnection object. Only closeWriter() and close() shall not be called and replaced by startCloseWriter() and startClose() on the ReactiveTCPConnection object.

Parameters
[in]handlerHandler class to call when the close operation completes. The method handleTCPClosed() will be called. If nullptr, no handler is called.
[in]silentIf true, do not report errors through the logger.
[in]user_dataA shared pointer which will be passed unmodified to handler.
Returns
True on success, false on error.

◆ reactor()

Reactor & ts::ReactiveBase::reactor ( )
inlineinherited

Get a reference to the associated reactor.

Returns
A reference to the associated reactor.

◆ report()

Report & ts::ReactiveBase::report ( )
inlineinherited

Get a reference to the associated report.

Returns
A reference to the associated report.

◆ removeFromQueue()

std::shared_ptr< IOSB > ts::ReactiveBase::removeFromQueue ( IOQueue queue,
IOSB iosb 
)
protectedinherited

Search and remove a shared_ptr to IOSB, based on an IOSB address.

Search from the front (end) of the queue since a completed I/O is likely on the front.

Parameters
[in,out]queueThe queue from which to remove iosb.
[in]iosbStandard pointer to an IOSB to search and remove.
Returns
The removed shared_ptr to IOSB, or a null pointer if iosb is not found.

◆ cancelQueue()

template<class REQUEST >
requires std::derived_from<REQUEST, ts::Object>
void ts::ReactiveBase::cancelQueue ( IOQueue inqueue,
IOQueue outqueue 
)
protectedinherited

Transfer all requests from one queue to another and mark all I/O as canceled.

Template Parameters
REQUESTThe subclass of Object which is set in react_data of all requests in inqueue.
Parameters
[in,out]inqueueThe queue from which all requests are removed.
[in,out]outqueueThe queue which receives all canceled requests.

◆ signalCompletedIO()

bool ts::ReactiveBase::signalCompletedIO ( )
protectedinherited

Trigger the execution of processCompletedIO() in the context of a Reactor handler.

Create if necessary and then signal a dedicated user event.

Returns
True on success, false on error.

◆ deactivateCompletedIO()

void ts::ReactiveBase::deactivateCompletedIO ( bool  silent)
protectedinherited

Deactivate the execution of processCompletedIO() in the context of a Reactor handler.

Deactivate and delete the dedicated user event.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ activateReadReady()

bool ts::ReactiveBase::activateReadReady ( )
protectedinherited

Activate read-ready notification for non-blocking I/O.

Returns
True on success, false on error.

◆ deactivateReadReady()

void ts::ReactiveBase::deactivateReadReady ( bool  silent)
protectedinherited

Deactivate read-ready notification for non-blocking I/O.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ activateWriteReady()

bool ts::ReactiveBase::activateWriteReady ( )
protectedinherited

Activate write-ready notification for non-blocking I/O.

Returns
True on success, false on error.

◆ deactivateWriteReady()

void ts::ReactiveBase::deactivateWriteReady ( bool  silent)
protectedinherited

Deactivate write-ready notification for non-blocking I/O.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ activateAsynchronousIO()

bool ts::ReactiveBase::activateAsynchronousIO ( )
protectedinherited

Activate notification for asynchronous I/O.

Returns
True on success, false on error.

◆ deactivateAsynchronousIO()

void ts::ReactiveBase::deactivateAsynchronousIO ( bool  silent)
protectedinherited

Deactivate notification for asynchronous I/O.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ cancelAsynchronousIO()

void ts::ReactiveBase::cancelAsynchronousIO ( bool  silent)
protectedinherited

Cancel all asynchronous I/O in progress.

The cancelation occurs in the background and end of canceled asynchronous I/O will be notified.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ cancelAndWaitAsynchronousIO()

bool ts::ReactiveBase::cancelAndWaitAsynchronousIO ( NonBlockingDevice::IOSB iosb,
bool  silent 
)
protectedinherited

Cancel one specific pending asynchronous I/O and wait for its completion.

Warning: This is a blocking call. It shall be used in case of trouble only.

Parameters
[in,out]iosbThe asynchronous I/O status block.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ deactivateAll()

void ts::ReactiveBase::deactivateAll ( bool  silent)
protectedinherited

Deactivate all registrations for non-blocking and asynchronous I/O.

Parameters
[in]silentIf true, do not report errors through the logger.

◆ handleUserEvent()

virtual void ts::ReactiveBase::handleUserEvent ( Reactor reactor,
EventId  id 
)
overrideprotectedvirtualinherited

Handle a user-defined event in a Reactor.

Parameters
[in,out]reactorReactor into which the handler is invoked.
[in]idId of the event which was signaled.

Reimplemented from ts::ReactorHandlerInterface.

◆ owner() [1/2]

Object * ts::OwnedObject::owner ( )
inlineinherited

Get the address of the optional "owner" object which was specified in the constructor.

Returns
Address of the "owner" object or a null pointer if there was none.

◆ owner() [2/2]

template<class OBJECT >
requires std::derived_from<OBJECT, ts::Object>
OBJECT * ts::OwnedObject::owner ( )
inherited

Get the address of the "owner" object which was specified in the constructor.

This template version requires that the owner objet is set and of type OBJECT, or some subclass of it. If there is no owner object or if it is not compatible with the template class OBJECT, this is a fatal error and the application is terminated.

Template Parameters
OBJECTA subclass of Object
Returns
Address of the "owner" object or a null pointer if there was none.

◆ isOwned() [1/2]

bool ts::OwnedObject::isOwned ( )
inlineinherited

Check if the object is owned.

Returns
True if this object has an owner, false otherwise.

◆ isOwned() [2/2]

template<class OBJECT >
requires std::derived_from<OBJECT, ts::Object>
bool ts::OwnedObject::isOwned ( )
inlineinherited

Check if the object is owned by an object of a given type.

Template Parameters
OBJECTA subclass of Object
Returns
True if this object has an owner by an object of type OBJECT, false otherwise.

◆ handleTimer()

virtual void ts::ReactorHandlerInterface::handleTimer ( Reactor reactor,
EventId  id 
)
virtualinherited

Handle a timer in a Reactor.

Parameters
[in,out]reactorReactor into which the handler is invoked.
[in]idId of the timer which expires.

Member Data Documentation

◆ DEFAULT_RECEIVE_BUFFER_SIZE

constexpr size_t ts::ReactiveTCPConnection::DEFAULT_RECEIVE_BUFFER_SIZE = 4096
staticconstexpr

Default buffer size for receive operations.

See also
startReceive()

The documentation for this class was generated from the following file: