Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [mosquitto-dev] Mosquitto broker consumes too much memory and never release it.

Hi Roger,

As suggested, I just tried it with 1.4 and it's still reproducible. The difference is on 1.3.1 memory consumption goes up even when I try to publish 25000 messages, but it takes 50000 messages to reproduce same issue with 1.4.

And it does not releases memory in any of the two versions.

Attached is my source code for my sample test case.

On Wed, Jan 28, 2015 at 6:21 PM, Roger Light <roger@xxxxxxxxxx> wrote:

Hi Prashant,

Do you have some code you can share that helps.reproduce this?

I notice you're using mosquitto 1.3.1, could you try again using the 1.4 branch please?

Thanks,

Roger

On Jan 28, 2015 12:09 PM, "Prashant Kedia" <prashantkedia22@xxxxxxxxx> wrote:
Hi All,

Recently I observed that, on my test environment (AWS micro instance) some of the mosquitto brokers are consuming about more than 250 MB of memory.

I have done some analysis and found that it consumes more memory while publishing QoS 0 messages with log_type information.

Below are the results of analysis done on my local machine with a simple test case.

Platform: Linux Mint 17 Qiana
Mosquitto version: 1.3.1
Mqtt Client used: Paho APIs 1.0.1
Number of message published: 25000 X 5 (5 threads simultaneously publishing 25000 messages each, on the same broker, on the same topic)
Number of clients subscribed: 4

QoSLog_TypeMemoryCPU %Approx time for each thread to publish (millis)
0Info42 MB845500
0Debug8.6 MB323300
1Info11.7 MB4460000
1Debug6.2 MB40105000
2Info1.6 MB8430000
2Debug2.8 MB8218000
 
It does not even release consumed memory after all published messages are received by the client.

So, I want to know if there is any way, I can publish QoS 0 messages with log level info, without consuming so much memory.

And is there any way to make broker, to release the consumed memory once all messages are received.

Note: retained parameter is false while publishing messages.

--
Thanks and Regards,

Prashant Kedia
Co-Founder and Developer
Bizlers Technologies Pvt. Ltd.

_______________________________________________
mosquitto-dev mailing list
mosquitto-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/mosquitto-dev

_______________________________________________
mosquitto-dev mailing list
mosquitto-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/mosquitto-dev



--
Thanks and Regards,

Prashant Kedia
Co-Founder and Developer
Bizlers Technologies Pvt. Ltd.
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;

public class MqttOperations {

	private MqttClient client;

	public MqttOperations(String clientId) {
		try {
			client = new MqttClient("tcp://192.168.0.129:1883", clientId, null);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	public void connect() {
		try {
			client.setCallback(new TestMqttCallback());
			MqttConnectOptions options = new MqttConnectOptions();
			options.setKeepAliveInterval(100000);
			options.setConnectionTimeout(10);
			options.setCleanSession(true);
			client.setTimeToWait(10 * 1000);
			client.connect(options);
			if (client.isConnected()) {
				System.out.println("Connected: " + client.isConnected());
				client.subscribe("abc");
			}
		} catch (MqttSecurityException e) {
			e.printStackTrace();
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	public void subscribe(String topic) {
		try {
			client.subscribe(topic);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	public void publish(String message) {
		try {
			// client.publish("abc", message.getBytes(), 0, false);
			client.getTopic("abc").publish(message.getBytes(), 0, false);
		} catch (MqttException e) {
			e.printStackTrace();
		}
	}

	private class TestMqttCallback implements MqttCallback {

		@Override
		public void connectionLost(Throwable arg0) {
			System.out.println("Connection Lost");
		}

		@Override
		public void deliveryComplete(IMqttDeliveryToken arg0) {
		}

		@Override
		public void messageArrived(String topic, MqttMessage message)
				throws Exception {
			System.out.println(topic + " : " + message);
		}
	}
}
public class TestMosquitto {

	public static void main(String args[]) {
		TestMosquitto testMosquitto = new TestMosquitto();
		new Thread(testMosquitto.new MyThread("client1")).start();
		new Thread(testMosquitto.new MyThread("client2")).start();
		new Thread(testMosquitto.new MyThread("client3")).start();
		new Thread(testMosquitto.new MyThread("client4")).start();
		new Thread(testMosquitto.new MyThread("client5")).start();
	}

	class MyThread implements Runnable {

		private String clientId;

		public MyThread(String clientId) {
			this.clientId = clientId;
		}

		@Override
		public void run() {
			MqttOperations mqtt = new MqttOperations(clientId);
			mqtt.connect();
			mqtt.subscribe("abc");
			long startTime = System.currentTimeMillis();
			for (int i = 0; i < 50000; i++) {
				mqtt.publish("Client: " + clientId + ",Message: " + i);
			}
			long endTime = System.currentTimeMillis();
			System.out.println("Time To Execute:  " + (endTime - startTime));
		}
	}
}

Back to the top