Pubsub Subsystem
VOLTTRON core provides a ‘pubsub’ subsystem to publish and subscribe to topics on the message bus. The agent’s VIP connection can be used to access the pubsub subsystem. The message published and topic to publish to are determined by the agent implementation. For example, the platform driver agent publishes scraped device data in a pre-defined format to topic names that start with the prefix “devices”.
Publish/Subscribe based on topic name
Publish to a topic
Below line shows an example of an agent publishing to a specific topic ‘some/topic’, a json string as message and dictionary object as header.
self.vip.pubsub.publish('pubsub',
'some/topic',
message=f'{"value": "{self.publish_value}"',
headers={"datetime":"2015-12-02T00:00:00")
Subscribe to a topic
Below line shows an example of an agent subscribing to a topic “this/topic” and registering a method onmessage() to callback when a message gets published to the topic “this/topic”. More than one callback method can be registered to the same topic
self.vip.pubsub.subscribe(my_id, "this/topic", self.onmessage)
When a message is published to any topic that starts with the prefix “this/topic”, the onmessage() method of the agent will get called with the details of the actual sender, the message bus name, the actual topic name, the headers and the message that got posted to the topic
def onmessage(self, peer, sender, bus, topic, headers, message):
print("received: peer=%r, sender=%r, bus=%r, topic=%r, headers=%r, message=%r" %
(peer, sender, bus, topic, headers, message))
Unsubscribe to a topic
Below line shows an example of an agent unsubscribing to a topic “devices/campus1/building1/device1”. If a callback method is provided, only that method is removed from the list of methods that get called when a message is sent to this topic prefix. If a specific callback method is not provided, all registered callback methods will stop getting called when messages are sent to this topic.
self.vip.pubsub.unsubscribe_by_tags('pubsub',
"devices/campus1/building1/device1",
self.callback_method).get(timeout=5)