Client: register a callback
This example will use the ‘oven’ store and the item ‘TEMP’. The objective is to handle all broadcasts for ‘oven.TEMP’; a callback method will be defined that performs any/all desired operations whenever the value changes. This callback method will be invoked asynchronously whenever a new broadcast arrives.
The toy example here will calculate and print an exponentially weighted average.
Getting started
See the Getting started section of the Client: get a value example for more details. We’ll get right to it:
import mktl
temp = mktl.get('oven.TEMP')
Defining a callback
The expected signature of a callback method is:
def my_callback(item, new_value, new_timestamp):
These arguments are always passed to the callback; that doesn’t necessarily
mean the callback has to use them, a common pattern is to define a callback
that ignores the arguments provided, an approach made easier by the use of
mktl.get()
. First, a callback that uses the arguments:
def average(item, temp, time):
factor = 0.01
new_weight = factor
old_weight = 1 - factor
if average.computed is None:
average.computed = temp
else:
average.computed = new_weight * temp + old_weight * average.computed
print("%.3f %s average: %.1f" % (time, item.key, average.computed)
average.computed = None
…and an alternate version, using mktl.get()
:
def average(*args, **kwargs):
temp = mktl.get('oven', 'TEMP')
factor = 0.01
new_weight = factor
old_weight = 1 - factor
if average.computed is None:
average.computed = float(temp)
else:
average.computed = new_weight * temp + old_weight * average.computed
timestamp = temp.cached_timestamp
print("%.3f %s average: %.1f" % (timestamp, temp.key, average.computed)
average.computed = None
These two approaches are functionally identical for this simple example.
The second approach, relying on mktl.get()
, becomes appealing
when multiple items need to be inspected in a given callback; for example,
if the current temperature were being compared to the current setpoint.
It is possible to block up a queue of events if the events arrive more
rapidly than their callbacks can be processed. Each different Item
has its own processing queue, and events are processed sequentially on
a per-item basis.
Calling Item.register()
Once the callback method is defined it needs to be associated with the
Item
instance, so that the callback is invoked every time the value
of that item changes. This is accomplished via Item.register()
:
temp.register(average)
Item.register()
will invoke Item.subscribe()
if the caller did
not already do so in some other context.
Full example
Putting it all together:
import mktl
temp = mktl.get('oven.TEMP')
def callback(*args, **kwargs):
temp = mktl.get('oven.TEMP')
value = float(temp)
time = temp.cached_timestamp
print ("%.3f oven.TEMP: %.1f" % (time, value))
temp.register(callback)