|
|
|
Re: "double free or corruption" after publish message [message #1837149 is a reply to message #1837088] |
Fri, 22 January 2021 18:27 |
Xiao B Messages: 3 Registered: January 2021 |
Junior Member |
|
|
Here are some codes I am using.
For MQTT thread
void * kbMqttThread(void *arg)
{
_initMqtt();
while(true)
{
if(_startMqtt() == true)
{
_wait4Connected();
while((!mosquitto_loop(mosq,-1,1)) && _bConnected)
{
sleep(1);
}
_stopMqtt(); // Something is wrong when we reach this point. We'd like to clean everything and restart MQTT.
}
else
{
LOG_WITHOUT_MASK("\nMQTT ---- Connot start MQTT!\n");
}
sleep(10); // Sleep 10 seconds and try restart MQTT again.
}
//mosquitto_lib_cleanup();
return 0;
}
For publish. Error happens randomly after publish.
uint8_t kbMqttPublish(SJsonMessage * p)
{
int rc;
int payloadLen;
if(kbMqttReady2Publish() == false)
{
return false; // It is not ready to publish
}
payloadLen = strlen(p->Message);
if((payloadLen < 5) || (payloadLen >= JSON_EVENT_BUFFER_SIZE))
{
FAST_LOG_WITHOUT_MASK("\nMQTT ERROR: Message length WRONG!");
while(1)
{ // Trap here. Need to be removed.
};
}
rc = mosquitto_publish(mosq, NULL, p->Topic, payloadLen, p->Message, MQTT_QOS_0, false);
if(rc != MOSQ_ERR_SUCCESS)
{
fprintf(stderr, "Error publishing: %s\n", mosquitto_strerror(rc));
return false;
}
else
{
return true;
}
}
For mqtt start
static uint8_t _startMqtt(void)
{
int rc;
_bConnected = false;
// Required before calling other mosquitto functions
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, NULL);
if(mosq == NULL)
{
fprintf(stderr, "\nError: Out of memory.\n");
}
else
{
//Do nothing
}
/* Configure callbacks. This should be done before connecting ideally. */
mosquitto_connect_callback_set(mosq, _onConnect);
mosquitto_publish_callback_set(mosq, _onPublish);
mosquitto_subscribe_callback_set(mosq, _onSubscribe);
mosquitto_message_callback_set(mosq, _onMessage);
mosquitto_disconnect_callback_set(mosq,_onDisconnect);
rc = mosquitto_connect(mosq, _mqttConfig.brokerIP, _mqttConfig.Port, _mqttConfig.keepLiveSeconds);
if(rc != MOSQ_ERR_SUCCESS)
{
mosquitto_destroy(mosq);
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
return false;
}
return true;
}
For mqtt stop
static void _stopMqtt(void)
{
if(mosq)
{
mosquitto_disconnect(mosq);
mosquitto_loop_stop(mosq,false);
// Reset callback functions
mosquitto_connect_callback_set(mosq, NULL);
mosquitto_publish_callback_set(mosq, NULL);
mosquitto_subscribe_callback_set(mosq, NULL);
mosquitto_message_callback_set(mosq, NULL);
mosquitto_disconnect_callback_set(mosq,NULL);
// Release mosquitto library
mosquitto_destroy(mosq);
mosq = NULL;
}
mosquitto_lib_cleanup();
LOG_WITHOUT_MASK("\nMQTT ---- STOPPED!");
}
Regards,
Xiao
|
|
|
Powered by
FUDForum. Page generated in 5.03798 seconds