Protocol interface

Each aspect of the mKTL protocol is split into two different components: the client-facing half, and a server-facing half. The interfaces described here are generally not exposed to direct usage, they support the end-user functionality exposed in the Client interface and Daemon interface code.

Payload class

class mktl.protocol.message.Payload(value, time=None, error=None, bulk=None, shape=None, dtype=None, refresh=None, **kwargs)

This is a lightweight class to properly encapsulate a Python-native value for later inclusion in a Message instance. All attributes of this class, except for the bulk attribute, or any attrbiutes set to None, will be added to as a JSON dictionary via encapsulate(), which is called when a Message instance finalizes itself before generating its final on-the-wire representation.

No interpretation of the Payload contents is performed, any interpretation (such as converting a numpy array to a form suitable for enapsulation) must occur before the Payload is instantiated.

Variables:
  • bulk – A bulk data value, in bytes, or None

  • omit – A set of fields to omit from encapsulation

encapsulate()

Add all non-omitted local attributes to a dictionary, and return the JSON encoding of that dictionary. For example, if the .value and .time attributes of a Payload are assigned, the caller will receive a value like:

b'{"value": 12, "time": 1761100609.234571}'

Message classes

class mktl.protocol.message.Message(type, target=None, payload=None, id=None)

The Message provides a very thin encapsulation of what it means to be a message in an mKTL context. This class will be used to represent mKTL messages that do not result in a response.

The fields are largely in order of how they are represented on the wire: the message type, the key/store target for the request, the payload of the message (a Payload instance), and an identification number unique to this correspondence. The identification number is the one field that is out-of-order compared to the multipart sequence on the wire; this is because some message types (publish messages, in particular) do not have an identification number, and it is automatically generated for request messages. Rather than force the caller to pass an explicit None, the id is left as the last field, so that the arguments for all Message instances can have a similar structure.

Variables:
  • payload – The Payload, if any, for the message.

  • valid_types – A set of valid strings for the message type.

  • timestamp – A UNIX epoch timestamp for the message send time.

class mktl.protocol.message.Broadcast(type, target=None, payload=None, id=None)

A Broadcast is a minor variant of a Message, with a change to format the multipart tuple in a PUB/SUB specific fashion.

class mktl.protocol.message.Request(type, target=None, payload=None, id=None)

A Request has a little extra functionality, focusing on local caching of response values and signaling that a request is complete. This is the class that will be used on the client side when a server is expected to provide a response, such as returning a requested value, or signaling that a set operation is complete.

Variables:

response – The response (as a Message) to this request

poll()

Return True if the request is complete, otherwise return False.

wait(timeout=60)

Block until the request has been handled. The response to the request is always returned; the response will be None if the original request is still pending.

wait_ack(timeout)

Block until the request has been acknowledged. This is a wrapper to a threading.Event instance; if the event has occurred it will return True, otherwise it returns False after the requested timeout. If the timeout argument is None it will block indefinitely.

Request client

mktl.protocol.request.client(address, port)

Factory function for a Client instance. Use of this method is encouraged to streamline re-use of established connections.

mktl.protocol.request.send(address, port, request)

Use client() to connect to the specified address and port, and send the specified mktl.protocol.message.Request instance. This method blocks until the completion of the request.

class mktl.protocol.request.Client(address, port)

Issue requests via a ZeroMQ DEALER socket and receive responses. Maintains a persistent connection to a single server; the address and port number must be specified.

run()

All send() and recv() calls are sequestered to this thread in order to satisfy ZeroMQ. Even with a lock around the socket it will sometimes seg fault when multiple threads act on a single socket; in particular, if other threads (like client operations) call send() while a background thread (like this thread) is running poll() and recv(). Thus, all incoming local requests are filtered through a queue, with notification happening on a PAIR socket to allow a single poll() call to wake up the thread for either type of event.

Example reference:

https://github.com/zeromq/libzmq/issues/1108

send(message)

A message is a fully populated mktl.protocol.message.Request instance, which normalizes the arguments that will be sent via this method as a multi-part message. The message instance will also be used for notification of any/all responses from the remote end; this method will block while waiting for the ACK request, but will never block waiting for the full response; the caller is free to decide whether to block or wait for the full response, using the methods in the mktl.protocol.message.Request instance.

Request server

class mktl.protocol.request.Server(hostname=None, port=None, avoid={})

Receive requests via a ZeroMQ ROUTER socket, and respond to them. The default behavior is to listen for incoming requests on our locally known fully qualified domain name, on the first available automatically assigned port. The avoid set enumerates port numbers that should not be automatically assigned; this is ignored if a fixed port is specified.

The hostname and port variables associated with a Server instance are key pieces of the provenance for an mKTL daemon.

Variables:
  • hostname – The hostname on which this server can be contacted.

  • port – The port on which this server is listening for connections.

req_ack(request)

Acknowledge the incoming request. The client is expecting an immediate ACK for all request types, including errors; this is how a client knows whether a daemon is online to respond to its request.

req_handler(request)

The default request handler is for debug purposes only, and is effectively a no-op. mktl.Daemon leverages a custom subclass of Server that properly handles specific types of requests, since it needs to be aware of the actual structure of what’s happening in the daemon code.

req_incoming(parts)

All inbound requests are filtered through this method. It will parse the request as JSON into a Python dictionary, and hand it off to req_handler() for further processing. Error handling is managed here; if req_handler() raises an exception it will be packaged up and returned to the client as an error.

req_handler() is expected to call req_ack() to acknowledge the incoming request; if req_handler() is returning a simple payload it will be packged into a REP response. No response will be issued if req_handler() returns None.

send(response)

Queue a response to be sent back to the original requestor.

Publish client

mktl.protocol.publish.client(address, port)

Factory function for a Client instance. Use of this method is encouraged to streamline re-use of established connections.

class mktl.protocol.publish.Client(address, port)

Establish a ZeroMQ SUB connection to a ZeroMQ PUB socket and receive broadcasts.

propagate(topic, message)

Invoke any/all callbacks registered via register() for a newly arrived message.

register(callback, topic=None)

Register a callback that will be invoked every time a new broadcast message arrives. If no topic is specified the callback will be invoked for all broadcast messages. The topic is case-sensitive and must be an exact match. Any callbacks registered in this fashion should be as lightweight as possible, as there is a single thread processing all arriving broadcast messages.

subscribe() will be invoked for any/all topics registered with a callback, it does not need to be called separately.

subscribe(topic)

ZeroMQ subscriptions are based on a topic. Filtering of messages happens on the server side, depending on what a client is subscribed to. A client can subscribe to all messages by providing the empty string as the topic.

Publish server

class mktl.protocol.publish.Server(port=None, avoid={})

Send broadcasts via a ZeroMQ PUB socket. The default behavior is to set up a listener on all available network interfaces on the first available automatically assigned port. The avoid set enumerates port numbers that should not be automatically assigned; this is ignored if a fixed port is specified.

The port variables associated with a Server instance is a key pieces of the provenance for an mKTL daemon.

Variables:

port – The port on which this server is listening for connections.

publish(message)

A message is a mktl.protocol.message.Broadcast instance intended for broadcast to any/all subscribers.