[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[paho-dev] Publish message from MQTT broker topics to Google cloud PubSub topics
|
Hi Team,
I am working with mqtt-paho. MQTT Version 3.1.1 on Ubuntu machine.
Project Requirement:
1. Subscribe to mqtt broker topics (there are multiple topics)
2. Publish message of each MQTT topic to Google cloud PubSub topics. Where pubsub topic and mqtt topic have 1-1 relationship.
Please have a look at the script that I have attached.
Questions:
Can I parallely send messages to different pubsub topics based on from which broker topic the message is arriving?
Thanks,
Urvashi Chaudhary
import paho.mqtt.client as mqtt
from google.cloud import pubsub_v1
project_id = "project_sample"
topic_name1 = "topic-01"
topic_name2 = "topic-02"
topic_name3 = "topic-03"
topic_name4 = "topic-04"
publisher = pubsub_v1.PublisherClient()
topic_path1 = publisher.topic_path(project_id, topic_name1)
topic_path2 = publisher.topic_path(project_id, topic_name2)
topic_path3 = publisher.topic_path(project_id, topic_name3)
topic_path4 = publisher.topic_path(project_id, topic_name4)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code" +str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("mqtt/topic-01")
client.subscribe("mqtt/topic-02")
client.subscribe("mqtt/topic-03")
client.subscribe("mqtt/topic-04")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
# print(msg.topic+" "+str(msg.payload))
future = publisher.publish(topic_path1, data=msg.payload)
future = publisher.publish(topic_path2, data=msg.payload)
future = publisher.publish(topic_path3, data=msg.payload)
future = publisher.publish(topic_path4, data=msg.payload)
print(future.result())
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("IP_of_mqtt", Port, interval)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()