Monitoring Streams

When consuming a stream of signals, it's possible that the source of those signals may malfunction, become unreachable, or otherwise stop supplying valid signals. This condition, a timeout, often requires action such as alerts or alternative logic, and for this purpose SignalTimeout blocks can be used to generate new signals when input signals have stopped.


For every signal processed, the SignalTimeout resets an internal timer for every configured Interval, which if allowed to expire will emit the last signal received, with a timeout attribute added to it. If the incoming signal already has a timeout attribute, it will be overwritten with this new value. As long as signals are received at least as often as the configured interval(s), no signals are emitted.

For example, 1 minute after the important_data stream stops (perhaps the publisher has stopped, or a network error has ocurred) a signal will be published to alerts, and then repeat every minute until the stream is restored. While important_data is working normally, no signals are published to alerts.

+----------------------------------+
| Subscriber                       |
|   Topic: important_data          |
|                                  |
|                                  |
+-----------------O----------------+
                  |
                  |
                  V
+-----------------O----------------+
| SignalTimeout                    |
|   Intervals:                     |
|     Minutes: 1                   |
|     Repeatable: True             |
+-----------------O----------------+
                  |
                  |
                  V
+-----------------O----------------+
| Publisher                        |
|   Topic: alerts                  |
|                                  |
|                                  |
+----------------------------------+

Every signal emitted from a SignalTimeout has a new attribute, timeout, added to it, which contains a datetime.timedelta object. The value of this object is equal to the configured Interval, and does not increment if configured Repeatable. That is to say, the timeout attribute represents the configured Interval that was triggered, not the cumulative time elapsed.

In this example we will use a StateChange to evaluate if a stream is dry or not, and build an alert message based on that. We will also use signal grouping to monitor multiple streams identified by the value of an id attribute. The defined State for each Group is the value of the incoming signals' timeout attribute, which is False if the signal was emitted by the subscriber into the Modifier. Any value of timeout that isn't explicitly False, such as a datetime.timedelta assigned by the SignalTimeout, will be evaluated as True and provide a change of state. This Truth Value Test is also used in the ConditionalModifier to determine which message to build. In this configuration a single, customized alert can be sent when a stream goes dry, and another distinct message when it resumes.

                     +----------------------------------+
                     | Subscriber                       |
                     |   Topic: important_data          |
                     |                                  |
                     |                                  |
                     +----------------O-----------------+
                                      |
                  +-------------------+--------------------+
                  |                                        |
                  V                                        V
+-----------------O----------------+     +-----------------O----------------+
| Modifier                         |     | SignalTimeout                    |
|   Fields:                        |     |   Group By: {{ $id }}            |
|     Attribute Name: timeout      |     |   Intervals:                     |
|     Attribute Value: {{ False }} |     |     Seconds: 10                  |
+-----------------O----------------+     +-----------------O----------------+
                  |                                        |
                  +-------------------+--------------------+
                                      |
                                      V
                     +----------------O-----------------+
                     | StateChange                      |
                     |   Group By: {{ $id }}            |
                     |   State: {{ $timeout }}          |
                     |                                  |
                     +----------------O-----------------+
                                      |
                                      |
                                      V
                +---------------------O----------------------+
                | ConditionalModifier                        |
                |   Fields:                                  |
                |     Attribute Name: message                |
                |     Lookup:                                |
                |       Formula: {{ $state }}                |
                |       Attribute Value: Alert! {{ $group }} |
                |                                            |
                |       Formula: {{ not $state }}            |
                |       Attribute Value": {{ $group }} OK    |
                +---------------------O----------------------+
                                      |
                                      |
                                      V
                                     ...

The More You Know:

A datetime.timedelta object is represented as three integers in the service logs, such as {timeout: datetime.timedelta(<days>, <seconds>, <microseconds>)}, and can be easily converted into seconds: {{ $timeout.total_seconds() }}. For more information see the Python Standard Library.

results matching ""

    No results matching ""