Client: register a callback

This example will use the ‘oven’ store and the item ‘TEMP’. The objective is to handle all broadcasts for ‘oven.TEMP’; a callback method will be defined that performs any/all desired operations whenever the value changes. This callback method will be invoked asynchronously whenever a new broadcast arrives. Callbacks arriving via these mechanisms will not be serialized across items, though within an item they will be invoked in the order they were originally registered. If you have the same callback method registered with multiple items there is no inherent mKTL-based guarantee to prevent that method from being called multiple times simultaneously.

The toy example here will calculate and print an exponentially weighted average.

Getting started

See the Getting started section of the Client: get a value example for more details. We’ll get right to it:

import mktl
temp = mktl.get('oven.TEMP')

Defining a callback

The expected signature of a callback method is:

def my_callback(item, new_value, new_timestamp):

These arguments are always passed to the callback; that doesn’t necessarily mean the callback has to use them, a common pattern is to define a callback that ignores the arguments provided, an approach made easier by the use of get(). First, a callback that uses the arguments:

def average(item, temp, time):

    factor = 0.01
    new_weight = factor
    old_weight = 1 - factor

    if average.computed is None:
        average.computed = temp
    else:
        new = new_weight * temp
        old = old_weight * average.computed
        average.computed = new + old

    print("%.3f %s average: %.1f" % (time, item.key, average.computed)

average.computed = None

…and an alternate version, ignoring the arguments:

def average(*args, **kwargs):

    temp = mktl.get('oven', 'TEMP')

    factor = 0.01
    new_weight = factor
    old_weight = 1 - factor

    if average.computed is None:
        average.computed = float(temp)
    else:
        new = new_weight * temp
        old = old_weight * average.computed
        average.computed = new + old

    timestamp = temp.cached_timestamp
    print("%.3f %s average: %.1f" % (timestamp, temp.key, average.computed)

average.computed = None

These two approaches are functionally identical for this simple example. The second approach, relying on get(), becomes appealing when multiple items need to be inspected in a given callback; for example, if the current temperature were being compared to the current setpoint. There is no provision for calling a callback with arguments from multiple items, a single invocation of a callback is only ever triggered by a broadcast event associated with a single item.

It is possible to block up a queue of events if the events arrive more rapidly than their callbacks can be processed. Each different Item has its own processing queue on the client side, and events are processed sequentially on a per-item basis. Callbacks should avoid delays in order to prevent the local queue from backing up.

Calling Item.register()

Once the callback method is defined it needs to be associated with the Item instance, so that the callback is invoked every time the value of that item changes. This is accomplished via Item.register():

temp.register(average)

Item.register() will invoke Item.subscribe() if necessary, though all client-facing Item instances generally invoke Item.subscribe() when they are first instantiated.

Full example

Putting it all together:

import mktl
import time
temp = mktl.get('oven.TEMP')

def just_print(*args, **kwargs):

    temp = mktl.get('oven.TEMP')
    value = float(temp)
    time = temp.cached_timestamp
    print ("%.3f oven.TEMP: %.1f" % (time, value))

def average(*args, **kwargs):

    temp = mktl.get('oven', 'TEMP')

    factor = 0.01
    new_weight = factor
    old_weight = 1 - factor

    if average.computed is None:
        average.computed = float(temp)
    else:
        new = new_weight * temp
        old = old_weight * average.computed
        average.computed = new + old

    timestamp = temp.cached_timestamp
    print("%.3f %s average: %.1f" % (timestamp, temp.full_key, average.computed)

average.computed = None


temp.register(just_print)
temp.register(average)
time.sleep(30)