TSDuck v3.45-4686
MPEG Transport Stream Toolkit
Loading...
Searching...
No Matches
ts::Reactor Class Reference

Event-driven reactor class implementing the "event loop" pattern. More...

#include <tsReactor.h>

Inheritance diagram for ts::Reactor:
Collaboration diagram for ts::Reactor:

Public Member Functions

 Reactor (Report *report=nullptr)
 Constructor.
 
 Reactor (ReporterBase *delegate)
 Constructor.
 
virtual ~Reactor () override
 Destructor.
 
bool cancelAndWaitAsynchronousIO (EventId id, NonBlockingDevice::IOSB &iosb, bool silent=false)
 Cancel one specific pending asynchronous I/O.
 
bool cancelAsynchronousIO (EventId id, bool silent=false)
 Cancel all pending asynchronous I/O on a system file descriptor or handle.
 
bool cancelTimer (EventId id, bool silent=false)
 Cancel a timer.
 
bool close (bool silent=false)
 Close the Reactor.
 
bool deleteAsynchronousIO (EventId id, bool silent=false)
 Delete a notification of asynchronous I/O.
 
bool deleteEvent (EventId id, bool silent=false)
 Delete a user event.
 
bool deleteReadNotify (EventId id, bool silent=false)
 Delete a notification of read-ready or read-completion.
 
bool deleteWriteNotify (EventId id, bool silent=false)
 Delete a notification of write-ready or write-completion.
 
void exitEventLoop (bool success=true)
 Exit processEventLoop() as soon as possible.
 
bool isActiveEvent (EventId id)
 Check if an event (user-defined, timer, I/O) is still active in the reactor.
 
bool isOpen () const
 Check if the Reactor is open.
 
bool muteReport (bool mute)
 Temporarily mute the associated report.
 
EventId newAsynchronousIO (ReactorHandlerInterface *handler, SysSocketType sock)
 Add in the reactor a notification of asynchronous I/O on a system file descriptor or handle.
 
EventId newEvent (ReactorHandlerInterface *handler)
 Add a user event in the reactor.
 
EventId newReadNotify (ReactorHandlerInterface *handler, SysSocketType sock)
 Add in the reactor a notification of read-ready on a system file descriptor.
 
template<class Rep , class Period >
EventId newTimer (ReactorHandlerInterface *handler, cn::duration< Rep, Period > duration, bool repeat)
 Add a timer in the reactor.
 
EventId newWriteNotify (ReactorHandlerInterface *handler, SysSocketType sock)
 Add in the reactor a notification of write-ready or read-completion on a system file descriptor.
 
bool open ()
 Open and initialize the Reactor.
 
bool processEventLoop ()
 Process events until exit is requested.
 
Reportreport () const
 Access the Report which is associated with this object.
 
ReportsetReport (Report *report)
 Associate this object with another Report to log errors.
 
ReporterBasesetReport (ReporterBase *delegate)
 Associate this object with another ReporterBase to log errors.
 
bool signalEvent (EventId id)
 Signal a user event in the reactor.
 
template<class... Args>
void trace (const UChar *fmt, Args &&... args)
 Issue a low-level trace message if environment variable TS_REACTOR_TRACE is defined with a non-empty value.
 

Static Public Member Functions

static int SilentLevel (bool silent)
 Compute a log severity level from a "silent" parameter.
 
static consteval bool UseAsynchronousIO ()
 This static function returns whether the Reactor uses an asynchronous I/O model.
 
static consteval bool UseNonBlockingIO ()
 This static function returns whether the Reactor uses a non-blocking I/O model.
 

Detailed Description

Event-driven reactor class implementing the "event loop" pattern.

A reactor is a single-threaded design pattern based on an "event loop". The application classes register handlers to be called when "events" occur in the future. The application-defined handlers typically start other background tasks (timers, I/O) and register other handlers to be called when these tasks complete.

Timers and user-defined events (which can be triggered from other tasks) are directly handled by the class Reactor. Other types of features such as message queues and input/output are handled in "reactive classes" which execute on top of the Reactor.

I/O multiplexing

There are two distinct reactive I/O models, with different implementations:

  • Non-blocking I/O (UNIX systems)
    • kqueue (macOS, FreeBSD)
    • epoll (Linux)
  • Asynchronous I/O (Windows)
    • I/O completion ports (aka IOCP)

See the documentation of class NonBlockingDevice for a detailed explanation of the differences.

The class Reactor encapsulates the various implementations and proposes a portable interface. However, while it is possible to unify the various types of non-blocking I/O (kqueue and epoll) in one single interface, it is impossible to unify non-blocking I/O and asynchronous I/O into the same interface. The way they shall be used, as well as the way the data buffers are managed, are too different. Therefore, the class Reactor exposes interfaces for both models. The application shall check the current I/O model using the "consteval" static methods UseNonBlockingIO() and UseAsynchronousIO() and then adopt the correct strategy.

In practice, the I/O multiplexing features of the class Reactor are not used by applications. They are used in a few specialized "reactive I/O" classes such as ReactiveUDPSocket or ReactiveTCPConnection. These classes are implemented on top of Reactor and have fully portable and homogeneous interfaces.

Synchronisation

The Reactor shall be used from one single thread. Unless specified otherwise, all methods shall be invoked from the thread of the reactor, where the event loop is run. All handlers are invoked in the context of the thread which invoked processEventLoop(), except worker handlers which are invoked in the context of a worker thread. Therefore, worker handlers are not allowed to call methods of the reactor.

Constructor & Destructor Documentation

◆ Reactor() [1/2]

ts::Reactor::Reactor ( Report report = nullptr)

Constructor.

Parameters
[in]reportWhere to report errors. The report object must remain valid as long as this object exists or setReport() is used with another Report object. If report is null, log messages are discarded.

◆ Reactor() [2/2]

ts::Reactor::Reactor ( ReporterBase delegate)

Constructor.

Parameters
[in]delegateUse the report of another ReporterBase. If delegate is null, log messages are discarded.

◆ ~Reactor()

virtual ts::Reactor::~Reactor ( )
overridevirtual

Destructor.

All resources, such as pending timers or user events, are deleted.

Member Function Documentation

◆ UseAsynchronousIO()

static consteval bool ts::Reactor::UseAsynchronousIO ( )
inlinestatic

This static function returns whether the Reactor uses an asynchronous I/O model.

Returns
True when asynchronous I/O are used, false when non-blocking I/O are used.

Reactive classes which manage I/O shall start I/O operations and, if the operation completes with a "pending" status, the reactive class shall request the Reactor to be notified when the I/O completes. In the meantime, the reactive class shall ensure that the I/O buffers remain valid, as they are used in the background by the I/O.

This method is typically used in "if constexpr" structures, which are preferred to conditional compilation using the macro TS_USE_ASYNCHRONOUS_IO.

Example:

if constexpr (Reactor::UseAsynchronousIO()) {
....
}
static consteval bool UseAsynchronousIO()
This static function returns whether the Reactor uses an asynchronous I/O model.
Definition tsReactor.h:174
See also
UseNonBlockingIO()
NonBlockingDevice

◆ UseNonBlockingIO()

static consteval bool ts::Reactor::UseNonBlockingIO ( )
inlinestatic

This static function returns whether the Reactor uses a non-blocking I/O model.

Returns
True when non-blocking I/O are used, false when asynchronous I/O are used.

Reactive classes which manage I/O shall repeatedly attempt I/O operations as long as they succeed. When they fail with a "would block" status, the reactive class shall request the Reactor to be notified when the I/O becomes possible.

This method is typically used in "if constexpr" structures, which are preferred to conditional compilation using the macro TS_USE_NON_BLOCKING_IO.

Example:

if constexpr (Reactor::UseNonBlockingIO()) {
....
}
static consteval bool UseNonBlockingIO()
This static function returns whether the Reactor uses a non-blocking I/O model.
Definition tsReactor.h:205
See also
UseAsynchronousIO()
NonBlockingDevice

◆ open()

bool ts::Reactor::open ( )

Open and initialize the Reactor.

Must be invoked before running the event loop or registering events.

Returns
True on success, false on error.

◆ close()

bool ts::Reactor::close ( bool  silent = false)

Close the Reactor.

Must not be called from within the event loop. Automatically done in the destructor.

Parameters
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ isOpen()

bool ts::Reactor::isOpen ( ) const
inline

Check if the Reactor is open.

Returns
True if the Reactor is open, false otherwise.

◆ isActiveEvent()

bool ts::Reactor::isActiveEvent ( EventId  id)

Check if an event (user-defined, timer, I/O) is still active in the reactor.

Parameters
[in]idEnvent id to check.
Returns
True if id is still an active event, false if it has been deleted, completed, canceled.

◆ processEventLoop()

bool ts::Reactor::processEventLoop ( )

Process events until exit is requested.

Returns
The status from exitEventLoop(). If exitEventLoop() is invoked several times, the returned value is false if exitEventLoop() has been called at least once with success being false.

◆ exitEventLoop()

void ts::Reactor::exitEventLoop ( bool  success = true)

Exit processEventLoop() as soon as possible.

This method is typically invoked from a handler.

Parameters
[in]successThe value that processEventLoop() shall return.

◆ newTimer()

template<class Rep , class Period >
EventId ts::Reactor::newTimer ( ReactorHandlerInterface handler,
cn::duration< Rep, Period >  duration,
bool  repeat 
)
inline

Add a timer in the reactor.

If the timer is one-shot (the default), the timer is automatically deleted after its expiration, after the handler is invoked. If the timer is repeated, it shall be explicitly canceled if needed.

Parameters
[in]handlerAddress of a handler to call when the timer expires. Return an error if set as nullptr.
[in]durationDuration of the timer. Must be strictly positive.
[in]repeatIf true, the timer is repeated at regular intervals, every duration, until canceled.
Returns
The identity of the timer. Invalid in case of error.

◆ cancelTimer()

bool ts::Reactor::cancelTimer ( EventId  id,
bool  silent = false 
)

Cancel a timer.

Parameters
[in]idTimer to cancel.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ newEvent()

EventId ts::Reactor::newEvent ( ReactorHandlerInterface handler)

Add a user event in the reactor.

Parameters
[in]handlerAddress of a handler to call when the event is signalled. Return an error if set as nullptr.
Returns
The identity of the user event. Invalid in case of error.

◆ signalEvent()

bool ts::Reactor::signalEvent ( EventId  id)

Signal a user event in the reactor.

As an exception to the single-thread-reactor rule, this method can be invoked from any thread.

Parameters
[in]idEvent to signal.
Returns
True on success, false on error.

◆ deleteEvent()

bool ts::Reactor::deleteEvent ( EventId  id,
bool  silent = false 
)

Delete a user event.

Parameters
[in]idEvent to delete.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ newAsynchronousIO()

EventId ts::Reactor::newAsynchronousIO ( ReactorHandlerInterface handler,
SysSocketType  sock 
)

Add in the reactor a notification of asynchronous I/O on a system file descriptor or handle.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the asynchronous I/O model.

Parameters
[in]handlerAddress of a handler to call when any asynchronous I/O completes on the specified system handle. Return an error if set as nullptr.
[in]sockA system-specific file descriptor or handle. This can be a socket or something else.
Returns
The identity of the asynchronous I/O. Invalid in case of error.

◆ cancelAsynchronousIO()

bool ts::Reactor::cancelAsynchronousIO ( EventId  id,
bool  silent = false 
)

Cancel all pending asynchronous I/O on a system file descriptor or handle.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the asynchronous I/O model.

Parameters
[in]idAsynchronous I/O id, as previously returned by newAsynchronousIO().
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

IMPORTANT: After canceling all asynchronous I/O, the application shall wait for the reception of all I/O completions (presumably with an error status) before releasing the data buffers.

See also
NonBlockingDevice

◆ cancelAndWaitAsynchronousIO()

bool ts::Reactor::cancelAndWaitAsynchronousIO ( EventId  id,
NonBlockingDevice::IOSB iosb,
bool  silent = false 
)

Cancel one specific pending asynchronous I/O.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the asynchronous I/O model. Warning: This is a blocking call. It shall be used in case of trouble only.

Parameters
[in]idAsynchronous I/O id, as previously returned by newAsynchronousIO().
[in,out]iosbThe asynchronous I/O status block.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ deleteAsynchronousIO()

bool ts::Reactor::deleteAsynchronousIO ( EventId  id,
bool  silent = false 
)

Delete a notification of asynchronous I/O.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the asynchronous I/O model.

Parameters
[in]idAsynchronous I/O id, as previously returned by newAsynchronousIO().
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ newReadNotify()

EventId ts::Reactor::newReadNotify ( ReactorHandlerInterface handler,
SysSocketType  sock 
)

Add in the reactor a notification of read-ready on a system file descriptor.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the non-blocking I/O model.

Parameters
[in]handlerAddress of a handler to call when the operation is ready. Return an error if set as nullptr.
[in]sockA system-specific file descriptor or handle. This can be a socket or something else.
Returns
The identity of the user event. Invalid in case of error.

◆ deleteReadNotify()

bool ts::Reactor::deleteReadNotify ( EventId  id,
bool  silent = false 
)

Delete a notification of read-ready or read-completion.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the non-blocking I/O model.

Parameters
[in]idEvent to delete.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ newWriteNotify()

EventId ts::Reactor::newWriteNotify ( ReactorHandlerInterface handler,
SysSocketType  sock 
)

Add in the reactor a notification of write-ready or read-completion on a system file descriptor.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the non-blocking I/O model.

Parameters
[in]handlerAddress of a handler to call when the operation is ready. Return an error if set as nullptr.
[in]sockA system-specific file descriptor or handle. This can be a socket or something else.
Returns
The identity of the user event. Invalid in case of error.

◆ deleteWriteNotify()

bool ts::Reactor::deleteWriteNotify ( EventId  id,
bool  silent = false 
)

Delete a notification of write-ready or write-completion.

This method is normally never used in applications. It is used only by "reactive I/O classes", in the non-blocking I/O model.

Parameters
[in]idEvent to delete.
[in]silentIf true, do not report errors through the logger.
Returns
True on success, false on error.

◆ trace()

template<class... Args>
void ts::Reactor::trace ( const UChar fmt,
Args &&...  args 
)
inline

Issue a low-level trace message if environment variable TS_REACTOR_TRACE is defined with a non-empty value.

This is typically use to troubleshoot the internals of a Reactor on a given platform.

Parameters
[in]fmtFormat string with embedded '%' sequences.
[in]argsList of arguments to substitute in the format string.
See also
UString::format()

◆ report()

Report & ts::ReporterBase::report ( ) const
inherited

Access the Report which is associated with this object.

Can be called from another thread only if the Report object is thread-safe.

Returns
A reference to the associated report.

◆ setReport() [1/2]

Report * ts::ReporterBase::setReport ( Report report)
inherited

Associate this object with another Report to log errors.

Parameters
[in]reportWhere to report errors. The report object must remain valid as long as this object exists or setReport() is used with another Report object. If report is null, log messages are discarded.
Returns
The address of the previous Report object or a null pointer if there was none.

◆ setReport() [2/2]

ReporterBase * ts::ReporterBase::setReport ( ReporterBase delegate)
inherited

Associate this object with another ReporterBase to log errors.

Parameters
[in]delegateUse the report of another ReporterBase. If delegate is null, the previous explicit Report is used..
Returns
The address of the previous ReporterBase object or a null pointer if there was none.

◆ muteReport()

bool ts::ReporterBase::muteReport ( bool  mute)
inherited

Temporarily mute the associated report.

Parameters
[in]muteIt true, report() will return a null report (log messages are discarded), until muteReport() is invoked again with mute set to false.
Returns
Previous state of the mute field.

◆ SilentLevel()

static int ts::ReporterBase::SilentLevel ( bool  silent)
inlinestaticinherited

Compute a log severity level from a "silent" parameter.

Some subclass methods have a "silent" parameter to avoid reporting errors which may be insignificant, typically when closing a device after an error, in which case the close operation may produce other errors if the previous error left the device in an inconsistent state. While those errors should not be displayed as errors, we still display them at debug level.

Parameters
[in]silentIf true, do not report errors, report debug messages instead.
Returns
Error when silent is false, Debug otherwise.

The documentation for this class was generated from the following file: