Protocol interface
Each aspect of the mKTL protocol is split into two different components: the client-facing half, and a server-facing half. The interfaces described here are generally not exposed to direct usage, they support the end-user functionality exposed in the Client interface and Daemon interface code.
Message classes
- class mktl.protocol.message.Message(type, target=None, payload=None, bulk=None, id=None)
The
Message
provides a very thin encapsulation of what it means to be a message in an mKTL context. This class will be used to represent mKTL messages that do not result in a response.The fields are largely in order of how they are represented on the wire: the message type, the key/store target for the request, the payload of the message (expected to be a Python dictionary), any bulk as-bytes component of the message, and an identification number unique to this correspondence. The identification number is the one field that is out-of-order compared to the multipart sequence on the wire; this is because some message types (publish messages, in particular) do not have an identification number, and it is automatically generated for request messages. Rather than force the caller to pass an explicit None, the id is left as the last field, so that the arguments for all
Message
instances can have a similar structure.- Variables:
payload – The item-specific data, if any, for the message.
bulk – The item-specific bulk data, if any, for the message.
valid_types – A set of valid strings for the message type.
timestamp – A UNIX epoch timestamp for the message send time.
- class mktl.protocol.message.Request(type, target=None, payload=None, bulk=None, id=None)
A
Request
has a little extra functionality, focusing on local caching of response values and signaling that a request is complete. This is the class that will be used on the client side when a server is expected to provide a response, such as returning a requested value, or signaling that a set operation is complete.- Variables:
response – The final response to a request (also a Message).
- poll()
Return True if the request is complete, otherwise return False.
- wait(timeout=60)
Block until the request has been handled. The response to the request is always returned; the response will be None if the original request is still pending.
- wait_ack(timeout)
Block until the request has been acknowledged. This is a wrapper to a class:threading.Event instance; if the event has occurred it will return True, otherwise it returns False after the requested timeout. If the timeout argument is None it will block indefinitely.
Request client
- mktl.protocol.request.client(address, port)
Factory function for a
Client
instance. Use of this method is encouraged to streamline re-use of established connections.
- mktl.protocol.request.send(address, port, message)
Use
client()
to connect to the specified address and port, and send the specifiedmessage.Request
instance. This method blocks until the completion of the request.
- class mktl.protocol.request.Client(address, port)
Issue requests via a ZeroMQ DEALER socket and receive responses. Maintains a persistent connection to a single server; the address and port number must be specified.
- send(message)
A message is a fully populated class:message.Request instance, which normalizes the arguments that will be sent via this method as a multi-part message. The message will also be used for notification of any/all responses from the remote end; this method will block while waiting for the ACK request, but the caller is free to decide whether to block or wait for the full response.
Request server
- class mktl.protocol.request.Server(hostname=None, port=None, avoid={})
Receive requests via a ZeroMQ ROUTER socket, and respond to them. The default behavior is to listen for incoming requests on our locally known fully qualified domain name, on the first available automatically assigned port. The avoid set enumerates port numbers that should not be automatically assigned; this is ignored if a fixed port is specified.
The hostname and port variables associated with a
Server
instance are key pieces of the provenance for an mKTL daemon.- Variables:
hostname – The hostname on which this server can be contacted.
port – The port on which this server is listening for connections.
- req_ack(socket, lock, ident, request)
Acknowledge the incoming request. The client is expecting an immediate ACK for all request types, including errors; this is how a client knows whether a daemon is online to respond to its request.
- req_handler(socket, lock, ident, request)
The default request handler is for debug purposes only, and is effectively a no-op.
mktl.Daemon.Store
leverages a custom subclass ofServer
that properly handles specific types of requests, since it needs to be aware of the actual structure of what’s happening in the daemon code.
- req_incoming(socket, lock, parts)
All inbound requests are filtered through this method. It will parse the request as JSON into a Python dictionary, and hand it off to
req_handler()
for further processing. Error handling is managed here; ifreq_handler()
raises an exception it will be packaged up and returned to the client as an error.req_handler()
is expected to callreq_ack()
to acknowledge the incoming request; ifreq_handler()
is returning a simple payload it will be packged into a REP response. No response will be issued ifreq_handler()
returns None.
- send(ident, message)
Convenience method for subclasses to fire off a message response. Any such subclasses are not using just the
req_incoming()
andreq_handler()
background thread machinery defined here to handle requests, and are handling asynchronous responses that need to be relayed back to the original caller.
Publish client
- mktl.protocol.publish.client(address, port)
Factory function for a
Client
instance. Use of this method is encouraged to streamline re-use of established connections.
- class mktl.protocol.publish.Client(address, port)
Establish a ZeroMQ SUB connection to a ZeroMQ PUB socket and receive broadcasts.
- propagate(topic, message)
Invoke any/all callbacks registered via
register()
for a newly arrived message.
- register(callback, topic=None)
Register a callback that will be invoked every time a new broadcast message arrives. If no topic is specified the callback will be invoked for all broadcast messages. The topic is case-sensitive and must be an exact match. Any callbacks registered in this fashion should be as lightweight as possible, as there is a single thread processing all arriving broadcast messages.
subscribe()
will be invoked for any/all topics registered with a callback, it does not need to be called separately.
- subscribe(topic)
ZeroMQ subscriptions are based on a topic. Filtering of messages happens on the server side, depending on what a client is subscribed to. A client can subscribe to all messages by providing the empty string as the topic.
Publish server
- class mktl.protocol.publish.Server(port=None, avoid={})
Send broadcasts via a ZeroMQ PUB socket. The default behavior is to set up a listener on all available network interfaces on the first available automatically assigned port. The avoid set enumerates port numbers that should not be automatically assigned; this is ignored if a fixed port is specified.
The port variables associated with a
Server
instance is a key pieces of the provenance for an mKTL daemon.- Variables:
port – The port on which this server is listening for connections.
- publish(message)
A message is a
message.Broadcast
instance intended for broadcast to any subscribers.