Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[volttron-dev] Christmas Present from ChatGPT 🎅

Hi all,

I've been playing around with ChatGPT and I wanted to see if it could generate an MQTT subclass for the Volttron 10 message bus since most of our potential customers want MQTT and not ZMQ or RabbitMQ. The abstract superclass for the message bus seems to have disappeared since the last time I looked at the Volttron 10 repo and the repo seems to have become very ZMQ specific, so I used the PubSubService instead. I told it to write the class using the  paho-mqtt 1.6.1 library which is also from the Eclipse Foundation. Then I told it to write unit tests for the class.

It told me that the MQTT bus would basically bypass the Volttron platform message bus. Also, the unit tests look rather thin. As I am not an expert Python programmer so I can't  tell how good the code is and of course the ultimate test is if it works and I don't have Vottron 10 up and running yet.

Merry Christmas and enjoy!

           jak
import paho.mqtt.client as mqtt
from volttron.platform.agent import utils
from volttron.platform.vip.agent import PubSub

class MqttPubSub(PubSub):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = mqtt.Client()
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc):
        """Callback for when the client connects to the broker"""
        self.client.subscribe(self.core.identity)

    def _on_message(self, client, userdata, message):
        """Callback for when a message is received from the broker"""
        topic = message.topic
        payload = message.payload.decode()
        headers = {}
        headers[headers_mod.CONTENT_TYPE] = headers_mod.CONTENT_TYPE.PLAIN_TEXT
        self.vip.pubsub.publish(peer='pubsub', topic=topic, message=payload, headers=headers)

    def _on_disconnect(self, client, userdata, rc):
        """Callback for when the client disconnects from the broker"""
        if rc != 0:
            self.client.reconnect()

    def publish(self, topic, message, headers=None):
        """Publish a message to the MQTT broker"""
        self.client.publish(topic, message)

def main(argv=sys.argv):
    """Main method called by the platform."""
    utils.vip_main(MqttPubSub, identity='mqtt.pubsub')

if __name__ == '__main__':
    # Entry point for script
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        pass
Here is an example of a Python unit test suite for the MqttPubSubService class that I provided in the previous response:

import unittest
import mock

from mqtt_pubsub_service import MqttPubSubService

class TestMqttPubSubService(unittest.TestCase):
    def setUp(self):
        self.service = MqttPubSubService(identity='test-service')

    @mock.patch('mqtt_pubsub_service.mqtt.Client')
    def test_on_connect(self, mock_client):
        """Test that the _on_connect method subscribes to the service's identity"""
        self.service._on_connect(mock_client, None, None, 0)
        mock_client.subscribe.assert_called_with(self.service.core.identity)

    @mock.patch('mqtt_pubsub_service.mqtt.Client')
    def test_on_message(self, mock_client):
        """Test that the _on_message method publishes the message to the platform pubsub"""
        message = mock.Mock()
        message.topic = 'test-topic'
        message.payload = b'test-message'
        self.service._on_message(mock_client, None, message)
        self.service.vip.pubsub.publish.assert_called_with(peer='pubsub', topic='test-topic', message='test-message', headers={'Content-Type': 'text/plain'})

    @mock.patch('mqtt_pubsub_service.mqtt.Client')
    def test_on_disconnect(self, mock_client):
        """Test that the _on_disconnect method reconnects if the disconnection was not intentional"""
        self.service._on_disconnect(mock_client, None, 1)
        mock_client.reconnect.assert_called_once()

        self.service._on_disconnect(mock_client, None, 0)
        mock_client.reconnect.assert_called_once()

    @mock.patch('mqtt_pubsub_service.mqtt.Client')
    def test_publish_to_platform(self, mock_client):
        """Test that the publish_to_platform method publishes the message to the MQTT broker"""
        self.service.publish_to_platform('test-topic', 'test-message', {})
        mock_client.publish.assert_called_with('pubsub', 'test-message')

if __name__ == '__main__':
    unittest.main()


This test suite includes four test cases that test the _on_connect, _on_message, _on_disconnect, and publish_to_platform methods of the MqttPubSubService class. It uses the mock library to mock the mqtt.Client class and the vip.pubsub attribute of the PubSubService class, allowing the test cases to focus on the behavior of the methods under test.

To run the test suite, you can use the unittest module's main method:


Back to the top