Skip to main content


Eclipse Community Forums
Forum Search:

Search      Help    Register    Login    Home
Home » Eclipse Projects » Mosquitto » Ingesting data into mosquitto broker with QoS=1(Ingesting data into mosquitto broker with QoS=1)
Ingesting data into mosquitto broker with QoS=1 [message #1858674] Fri, 14 April 2023 15:28 Go to next message
Ritabrata Saha is currently offline Ritabrata SahaFriend
Messages: 1
Registered: April 2023
Junior Member
I am trying to ingest data into mosquitto docker image on my local machine with QoS=1. However I can see that the subscriber is getting stuck at record count = 20. For my demo application I am ingesting 500 records every second for a duration of 20 seconds. I can see that the subscriber behaviour is constant.

Here is the data generator code.

Any help to resolve this is highly appreciated.

Here is the publisher code :

import paho.mqtt.client as paho
from paho import mqtt
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import random
import datetime


def task():
pid = getpid()
machine_name = str(pid)
topic_name = "TEMP"


for i in range(20) :

mqttBroker ="localhost"
client = paho.Client("Temperature_Inside")
client.connect(mqttBroker,1883)

for _ in range(500):
sensor_reading = str(random.uniform(10, 12))
ingest_time = str(datetime.datetime.now())
sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
print(sensor_payload)
client.publish(topic_name,sensor_payload,qos=1)

sleep(1)



# entry point
if __name__ == '__main__':

n=1
for i in range(n):
process = Process(target=task)
process.start()


Here is the Subscriber Code :

import paho.mqtt.client as paho
from paho import mqtt
import time

def on_message(client, userdata, message):
print("received message: " ,str(message.payload.decode("utf-8")))
with open('files_sub/data_recv.txt','a+') as f:
f.write("Message received: " + str(message.payload) + "\n")

mqttBroker ="localhost"
client = paho.Client("Smartphone")
client.connect(mqttBroker,1883)


client.loop_start()

client.subscribe("#",qos=1)
client.on_message=on_message

time.sleep(300)
client.loop_stop()
Re: Ingesting data into mosquitto broker with QoS=1 [message #1858684 is a reply to message #1858674] Sun, 16 April 2023 08:00 Go to previous message
Roger Light is currently offline Roger LightFriend
Messages: 90
Registered: September 2013
Member
Your publisher is missing a call to loop_start(), which means the QoS 1 messages are never processed and the inflight queue fills up.

import paho.mqtt.client as paho
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import random
import datetime


def task():
    pid = getpid()
    machine_name = str(pid)
    topic_name = "TEMP"

    mqttBroker ="localhost"
    client = paho.Client("Temperature_Inside")
    client.connect(mqttBroker,1883)
    client.loop_start()

    for _ in range(500):
        sensor_reading = str(random.uniform(10, 12))
        ingest_time = str(datetime.datetime.now())
        sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
        print(sensor_payload)
        msgid = client.publish(topic_name,sensor_payload,qos=1)

     msgid.wait_for_publish()
    client.disconnect()


Regards,

Roger
Previous Topic:Server does not start
Next Topic:Wireless Sensor+ Google Sheet+MQTT
Goto Forum:
  


Current Time: Thu May 02 18:41:05 GMT 2024

Powered by FUDForum. Page generated in 0.02846 seconds
.:: Contact :: Home ::.

Powered by: FUDforum 3.0.2.
Copyright ©2001-2010 FUDforum Bulletin Board Software

Back to the top