Ingesting data into mosquitto broker with QoS=1 [message #1858674] |
Fri, 14 April 2023 15:28 |
Ritabrata Saha Messages: 1 Registered: April 2023 |
Junior Member |
|
|
I am trying to ingest data into mosquitto docker image on my local machine with QoS=1. However I can see that the subscriber is getting stuck at record count = 20. For my demo application I am ingesting 500 records every second for a duration of 20 seconds. I can see that the subscriber behaviour is constant.
Here is the data generator code.
Any help to resolve this is highly appreciated.
Here is the publisher code :
import paho.mqtt.client as paho
from paho import mqtt
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import random
import datetime
def task():
pid = getpid()
machine_name = str(pid)
topic_name = "TEMP"
for i in range(20) :
mqttBroker ="localhost"
client = paho.Client("Temperature_Inside")
client.connect(mqttBroker,1883)
for _ in range(500):
sensor_reading = str(random.uniform(10, 12))
ingest_time = str(datetime.datetime.now())
sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
print(sensor_payload)
client.publish(topic_name,sensor_payload,qos=1)
sleep(1)
# entry point
if __name__ == '__main__':
n=1
for i in range(n):
process = Process(target=task)
process.start()
Here is the Subscriber Code :
import paho.mqtt.client as paho
from paho import mqtt
import time
def on_message(client, userdata, message):
print("received message: " ,str(message.payload.decode("utf-8")))
with open('files_sub/data_recv.txt','a+') as f:
f.write("Message received: " + str(message.payload) + "\n")
mqttBroker ="localhost"
client = paho.Client("Smartphone")
client.connect(mqttBroker,1883)
client.loop_start()
client.subscribe("#",qos=1)
client.on_message=on_message
time.sleep(300)
client.loop_stop()
|
|
|
Re: Ingesting data into mosquitto broker with QoS=1 [message #1858684 is a reply to message #1858674] |
Sun, 16 April 2023 08:00 |
Roger Light Messages: 90 Registered: September 2013 |
Member |
|
|
Your publisher is missing a call to loop_start(), which means the QoS 1 messages are never processed and the inflight queue fills up.
import paho.mqtt.client as paho
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import random
import datetime
def task():
pid = getpid()
machine_name = str(pid)
topic_name = "TEMP"
mqttBroker ="localhost"
client = paho.Client("Temperature_Inside")
client.connect(mqttBroker,1883)
client.loop_start()
for _ in range(500):
sensor_reading = str(random.uniform(10, 12))
ingest_time = str(datetime.datetime.now())
sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
print(sensor_payload)
msgid = client.publish(topic_name,sensor_payload,qos=1)
msgid.wait_for_publish()
client.disconnect()
Regards,
Roger
|
|
|
Powered by
FUDForum. Page generated in 0.02846 seconds