[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
Re: [paho-dev] pthread_mutex_lock
|
Tom,
do you think that the program might be getting disconnected from
time to time, and that might be happening on a publish?
If so, I have a theory why that would cause this problem. You
could try replacing MQTTAsync.c and MQTTAsync.h with the attached,
to see if they help.
Ian
On 12/09/13 17:19, Thomas Bitsky Jr wrote:
Ian,
I think it's happening randomly when I publish on a topic.
I'll try and run a trace on it, as well.
Thanks!
(Sent from my mobile device.)
Hi Tom,
I've seen this occasionally, but not had chance to track it
down. Do you have any idea at what point it's happening
in your program?
I'll try to track it down. If it's possible for you to get
a trace of the time just before it fails, that would be
great (but I realize how hard that might be). The best way
would be to set an environment variable
export MQTT_C_CLIENT_TRACE=ON
and let it print to stdout. You can have it write to a
file, but if the program is crashing then the last few
entries are likely to be lost.
Ian
On 12/09/13 16:24, Thomas Bitsky Jr wrote:
Hello,
I’ve integrated the paho.c library
into one of my programs on Ubuntu 10.04LTS kernel 2.6
with the preempt patches, using it to connect to a
mosquito broker running on the same machine. I am using
the asynchronous API. To build the paho.c library, I
cloned the git repository and built from source. I use
the library to both subscribe and publish on topics with
the broker.
The paho library program frequently
crashes with the error:
pthread_mutex_lock.c
: 62 : __pthread_mutex_lock: Assertion `mutex-
__data.__owner = 0` Failed
Does anyone have any experience with
this error? My program has no threads or mutexes, so the
error must be coming from Paho. I’ve tried various
solutions to no avail. Sometime the program runs for 6
shours, sometimes 20 seconds. The relevant source code
is below, which largely replicates the async examples.
Any insight is appreciated. Thank you
in advance.
Tom
#include <MQTTAsync.h>
#define ADDRESS
"tcp://localhost:1883"
#define CLIENTID "MyClientId"
#define QOS 1
#define TIMEOUT 10000L
static MQTTAsync client_;
static BOOL isConnected_ = FALSE;
void
mqTraceCallback(enum
MQTTASYNC_TRACE_LEVELS level, char *message)
{
printf("TRACE:
%s\n", message);
}
//
// Start the client connection to the
MQTT broker.
//
BOOL
mqCoreLink_start(void)
{
int rc;
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
MQTTAsync_create(
&client_,
ADDRESS,
CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE,
NULL
);
MQTTAsync_setCallbacks(
client_, // client handle
NULL,
// context (will be passed to every
callback function)
connlost, // pointer to connection lost
callback
messageArrived, // pointer
to message-received callback
NULL
); // pointer to delivery-complete
callback
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts._onSuccess_ =
onConnect;
conn_opts._onFailure_ =
onConnectFailure;
conn_opts.context =
client_;
rc =
MQTTAsync_connect(client_, &conn_opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
return FALSE;
}
//
// Create the default
command interface.
//
mqCoreLink_registerSignal("_command", NULL);
//
// Enable tracing
// If no callback is
set (NULL), the trace values
// are not broadcast.
//
MQTTAsync_setTraceCallback (&mqTraceCallback);
// The default is
minimum.
MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
return TRUE;
}
void
mqCoreLink_stop(void)
{
MQTTAsync_destroy(&client_);
}
BOOL
mqCoreLink_sendMessage(char* topic,
char* msg, int msgLen)
{
int rc;
BOOL ret = FALSE;
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
MQTTAsync_message
pubmsg = MQTTAsync_message_initializer;
opts._onSuccess_ =
onSend;
opts.context =
client_;
pubmsg.payload = msg;
pubmsg.payloadlen =
msgLen;
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
rc =
MQTTAsync_sendMessage(client_, topic, &pubmsg,
&opts);
if ( rc !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, \n"
"\t%s\n"
"\treturn code %d\n", topic, rc);
goto
quit;
}
ret = TRUE;
quit:
return ret;
}
void
connlost(void *context, char *cause)
{
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_connectOptions conn_opts =
MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection
lost\n");
printf(" cause:
%s\n", cause);
printf("Reconnecting\n");
isConnected_ = FALSE;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc =
MQTTAsync_connect(client, &conn_opts)) !=
MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
}
}
void
onDisconnect(void* context,
MQTTAsync_successData* response)
{
printf("Successful
disconnection\n");
isConnected_ = FALSE;
}
void
onSend(void* context,
MQTTAsync_successData* response)
{
// printf("Message
with token value %d delivery confirmed\n",
response->token);
}
////////////////////////////////////////////////////////////
// Subscription callbacks.
////////////////////////////////////////////////////////////
void
onSubscribe(void* context,
MQTTAsync_successData* response)
{
printf("Subscribe
succeeded\n");
}
void
onSubscribeFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Subscribe
failed, rc %d\n", response ? response->code : 0);
}
////////////////////////////////////////////////////////////
// Connection Callbacks
////////////////////////////////////////////////////////////
void
onConnectFailure(void* context,
MQTTAsync_failureData* response)
{
printf("Connect
failed, rc %d\n", response ? response->code : 0);
isConnected_ = FALSE;
}
//
// Called back from the paho mqtt
library upon successful connection
// to the MQTT broker. We subscribe
all registered topic names at this
// point.
//
void
onConnect(void* context,
MQTTAsync_successData* response)
{
int
topicCount;
int tmpCount;
MQTTAsync client =
(MQTTAsync)context;
MQTTAsync_responseOptions opts =
MQTTAsync_responseOptions_initializer;
int rc;
int i;
CoreLinkEvent*
cle_elt;
isConnected_ = TRUE;
topicCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if (
cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
topicCount += 1;
}
topicNames_ =
alloca(sizeof *topicNames_ * topicCount);
qos_ = alloca(
(topicCount + 1) * sizeof(int) );
for (i = 0; i <
topicCount; ++i)
{
topicNames_[i] = alloca(CORELINK_MAXTOPICLEN);
}
//
// Copy in all
registered topic names
//
tmpCount = 0;
DL_FOREACH(coreLinkEvents_head_,cle_elt)
{
if (
cle_elt->eventType == CORELINKEVENTTYPE_SIGNAL )
{
strncpy(
topicNames_[tmpCount],
cle_elt->topic,
CORELINK_MAXTOPICLEN );
tmpCount += 1;
}
}
//
// Make a quality of
service array
//
for
(i=0;i<topicCount;++i)
{
qos_[i] = 1;
printf("subscribe: %s\n", &(*topicNames_[i]) );
}
//
// Subscribe to all
topics
//
printf("Successful
connection\n");
printf("Subscribing
to many topics..\n");
opts._onSuccess_ =
onSubscribe;
opts._onFailure_ =
onSubscribeFailure;
opts.context =
client;
rc =
MQTTAsync_subscribeMany(
client_,
topicCount,
topicNames_,
&(qos_[0]),
&opts);
if ( rc !=
MQTTASYNC_SUCCESS )
{
printf("Failed to subscribe to topics! %d\n", rc);
}
}
//////////////////////////////////////////////////////////////////////////
// Handle receiving messages from the
MQTT broker
// Generally, this means that a user
interface running in a web-browser
// has published to a topic listed in
the cmdTopics[] array.
//////////////////////////////////////////////////////////////////////////
int
messageArrived(
void *context, char
*topicName, int topicLen, MQTTAsync_message *message)
{
char*
payloadptr;
int i;
printf("Message
arrived\n");
printf("
topic: %s\n", topicName);
printf("
message: ");
payloadptr =
message->payload;
for(i=0;
i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
…
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev
|
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial implementation and documentation
* Ian Craggs, Allan Stockdill-Mander - SSL support
* Ian Craggs - multiple server connection support
* Ian Craggs - fix for bug 413429 - connectionLost not called
* Ian Craggs - fix for bug# 415042 - using already freed structure
*******************************************************************************/
/**
* @file
* \brief Asynchronous API implementation
*
*/
#include <stdlib.h>
#if !defined(WIN32)
#include <sys/time.h>
#endif
#if !defined(NO_PERSISTENCE)
#include "MQTTPersistence.h"
#endif
#include "MQTTAsync.h"
#include "utf-8.h"
#include "MQTTProtocol.h"
#include "MQTTProtocolOut.h"
#include "Thread.h"
#include "SocketBuffer.h"
#include "StackTrace.h"
#include "Heap.h"
#define URI_TCP "tcp://"
#define BUILD_TIMESTAMP "201309121734"
#define CLIENT_VERSION "1.0.0.2"
char* client_timestamp_eye = "MQTTAsyncV3_Timestamp " BUILD_TIMESTAMP;
char* client_version_eye = "MQTTAsyncV3_Version " CLIENT_VERSION;
extern Sockets s;
static ClientStates ClientState =
{
CLIENT_VERSION, /* version */
NULL /* client list */
};
ClientStates* bstate = &ClientState;
MQTTProtocol state;
enum MQTTAsync_threadStates
{
STOPPED, STARTING, RUNNING, STOPPING
};
enum MQTTAsync_threadStates sendThread_state = STOPPED;
enum MQTTAsync_threadStates receiveThread_state = STOPPED;
#if defined(WIN32)
static mutex_type mqttasync_mutex = NULL;
static mutex_type mqttcommand_mutex = NULL;
static sem_type send_sem = NULL;
extern mutex_type stack_mutex;
extern mutex_type heap_mutex;
extern mutex_type log_mutex;
BOOL APIENTRY DllMain(HANDLE hModule,
DWORD ul_reason_for_call,
LPVOID lpReserved)
{
switch (ul_reason_for_call)
{
case DLL_PROCESS_ATTACH:
Log(TRACE_MAX, -1, "DLL process attach");
if (mqttasync_mutex == NULL)
{
mqttasync_mutex = CreateMutex(NULL, 0, NULL);
mqttcommand_mutex = CreateMutex(NULL, 0, NULL);
send_sem = CreateEvent(
NULL, // default security attributes
FALSE, // manual-reset event?
FALSE, // initial state is nonsignaled
NULL // object name
);
stack_mutex = CreateMutex(NULL, 0, NULL);
heap_mutex = CreateMutex(NULL, 0, NULL);
log_mutex = CreateMutex(NULL, 0, NULL);
}
case DLL_THREAD_ATTACH:
Log(TRACE_MAX, -1, "DLL thread attach");
case DLL_THREAD_DETACH:
Log(TRACE_MAX, -1, "DLL thread detach");
case DLL_PROCESS_DETACH:
Log(TRACE_MAX, -1, "DLL process detach");
}
return TRUE;
}
#else
static pthread_mutex_t mqttasync_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttasync_mutex = &mqttasync_mutex_store;
static pthread_mutex_t mqttcommand_mutex_store = PTHREAD_MUTEX_INITIALIZER;
static mutex_type mqttcommand_mutex = &mqttcommand_mutex_store;
static cond_type_struct send_cond_store = { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER };
static cond_type send_cond = &send_cond_store;
#define WINAPI
#endif
static volatile int initialized = 0;
static List* handles = NULL;
static int tostop = 0;
static List* commands = NULL;
MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc);
int MQTTAsync_cleanSession(Clients* client);
void MQTTAsync_stop();
int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout);
void MQTTAsync_closeOnly(Clients* client);
void MQTTAsync_closeSession(Clients* client);
void MQTTProtocol_closeSession(Clients* client, int sendwill);
void MQTTAsync_writeComplete(int socket);
#if defined(WIN32)
#define START_TIME_TYPE DWORD
START_TIME_TYPE MQTTAsync_start_clock(void)
{
return GetTickCount();
}
#elif defined(AIX)
#define START_TIME_TYPE struct timespec
START_TIME_TYPE MQTTAsync_start_clock(void)
{
static struct timespec start;
clock_gettime(CLOCK_REALTIME, &start);
return start;
}
#else
#define START_TIME_TYPE struct timeval
START_TIME_TYPE MQTTAsync_start_clock(void)
{
static struct timeval start;
gettimeofday(&start, NULL);
return start;
}
#endif
#if defined(WIN32)
long MQTTAsync_elapsed(DWORD milliseconds)
{
return GetTickCount() - milliseconds;
}
#elif defined(AIX)
#define assert(a)
long MQTTAsync_elapsed(struct timespec start)
{
struct timespec now, res;
clock_gettime(CLOCK_REALTIME, &now);
ntimersub(now, start, res);
return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
}
#else
long MQTTAsync_elapsed(struct timeval start)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&now, &start, &res);
return (res.tv_sec)*1000 + (res.tv_usec)/1000;
}
#endif
typedef struct
{
MQTTAsync_message* msg;
char* topicName;
int topicLen;
unsigned int seqno; /* only used on restore */
} qEntry;
typedef struct
{
int type;
MQTTAsync_onSuccess* onSuccess;
MQTTAsync_onFailure* onFailure;
MQTTAsync_token token;
void* context;
START_TIME_TYPE start_time;
union
{
struct
{
int count;
char** topics;
int* qoss;
} sub;
struct
{
int count;
char** topics;
} unsub;
struct
{
char* destinationName;
int payloadlen;
void* payload;
int qos;
int retained;
} pub;
struct
{
int internal;
int timeout;
} dis;
struct
{
int timeout;
int serverURIcount;
char** serverURIs;
int currentURI;
} conn;
} details;
} MQTTAsync_command;
typedef struct MQTTAsync_struct
{
char* serverURI;
int ssl;
Clients* c;
/* "Global", to the client, callback definitions */
MQTTAsync_connectionLost* cl;
MQTTAsync_messageArrived* ma;
MQTTAsync_deliveryComplete* dc;
void* context; /* the context to be associated with the main callbacks*/
MQTTAsync_command connect; /* Connect operation properties */
MQTTAsync_command disconnect; /* Disconnect operation properties */
MQTTAsync_command* pending_write; /* Is there a socket write pending? */
List* responses;
unsigned int command_seqno;
MQTTPacket* pack;
} MQTTAsyncs;
typedef struct
{
MQTTAsync_command command;
MQTTAsyncs* client;
unsigned int seqno; /* only used on restore */
} MQTTAsync_queuedCommand;
void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command);
void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command);
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm);
#if !defined(NO_PERSISTENCE)
int MQTTAsync_restoreCommands(MQTTAsyncs* client);
int MQTTAsync_unpersistQueueEntry(Clients*, qEntry*);
int MQTTAsync_restoreMessageQueue(MQTTAsyncs* client);
#endif
void MQTTAsync_sleep(long milliseconds)
{
FUNC_ENTRY;
#if defined(WIN32)
Sleep(milliseconds);
#else
usleep(milliseconds*1000);
#endif
FUNC_EXIT;
}
/**
* List callback function for comparing clients by socket
* @param a first integer value
* @param b second integer value
* @return boolean indicating whether a and b are equal
*/
int clientSockCompare(void* a, void* b)
{
MQTTAsyncs* m = (MQTTAsyncs*)a;
return m->c->net.socket == *(int*)b;
}
int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
int persistence_type, void* persistence_context)
{
int rc = 0;
MQTTAsyncs *m = NULL;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttasync_mutex);
if (serverURI == NULL || clientId == NULL)
{
rc = MQTTASYNC_NULL_PARAMETER;
goto exit;
}
if (!UTF8_validateString(clientId))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
}
if (!initialized)
{
#if defined(HEAP_H)
Heap_initialize();
#endif
Log_initialize((Log_nameValue*)MQTTAsync_getVersionInfo());
bstate->clients = ListInitialize();
Socket_outInitialize();
Socket_setWriteCompleteCallback(MQTTAsync_writeComplete);
handles = ListInitialize();
commands = ListInitialize();
#if defined(OPENSSL)
SSLSocket_initialize();
#endif
#if !defined(WIN32)
//send_cond = Thread_create_cond();
#else
//send_sem = Thread_create_sem();
#endif
initialized = 1;
}
m = malloc(sizeof(MQTTAsyncs));
*handle = m;
memset(m, '\0', sizeof(MQTTAsyncs));
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
m->ssl = 1;
}
#endif
m->serverURI = malloc(strlen(serverURI)+1);
strcpy(m->serverURI, serverURI);
m->responses = ListInitialize();
ListAppend(handles, m, sizeof(MQTTAsyncs));
m->c = malloc(sizeof(Clients));
memset(m->c, '\0', sizeof(Clients));
m->c->context = m;
m->c->outboundMsgs = ListInitialize();
m->c->inboundMsgs = ListInitialize();
m->c->messageQueue = ListInitialize();
m->c->clientID = malloc(strlen(clientId)+1);
strcpy(m->c->clientID, clientId);
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_create(&(m->c->persistence), persistence_type, persistence_context);
if (rc == 0)
{
rc = MQTTPersistence_initialize(m->c, m->serverURI);
if (rc == 0)
{
MQTTAsync_restoreCommands(m);
MQTTAsync_restoreMessageQueue(m);
}
}
#endif
ListAppend(bstate->clients, m->c, sizeof(Clients) + 3*sizeof(List));
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_terminate(void)
{
FUNC_ENTRY;
MQTTAsync_stop();
if (initialized)
{
ListElement* elem = NULL;
#if !defined(WIN32)
//Thread_destroy_cond(send_cond);
#else
//Thread_destroy_sem(send_sem);
#endif
ListFree(bstate->clients);
ListFree(handles);
while (ListNextElement(commands, &elem))
MQTTAsync_freeCommand1((MQTTAsync_queuedCommand*)(elem->content));
ListFree(commands);
handles = NULL;
Socket_outTerminate();
#if defined(OPENSSL)
SSLSocket_terminate();
#endif
#if defined(HEAP_H)
Heap_terminate();
#endif
Log_terminate();
initialized = 0;
}
FUNC_EXIT;
}
#if !defined(NO_PERSISTENCE)
int MQTTAsync_unpersistCommand(MQTTAsync_queuedCommand* qcmd)
{
int rc = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY;
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, qcmd->seqno);
if ((rc = qcmd->client->c->persistence->premove(qcmd->client->c->phandle, key)) != 0)
Log(LOG_ERROR, 0, "Error %d removing command from persistence", rc);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_persistCommand(MQTTAsync_queuedCommand* qcmd)
{
int rc = 0;
MQTTAsyncs* aclient = qcmd->client;
MQTTAsync_command* command = &qcmd->command;
int* lens = NULL;
void** bufs = NULL;
int bufindex = 0, i, nbufs = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY;
switch (command->type)
{
case SUBSCRIBE:
nbufs = 2 + (command->details.sub.count * 2);
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->details.sub.count;
lens[bufindex++] = sizeof(command->details.sub.count);
for (i = 0; i < command->details.sub.count; ++i)
{
bufs[bufindex] = command->details.sub.topics[i];
lens[bufindex++] = strlen(command->details.sub.topics[i]) + 1;
bufs[bufindex] = &command->details.sub.qoss[i];
lens[bufindex++] = sizeof(command->details.sub.qoss[i]);
}
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break;
case UNSUBSCRIBE:
nbufs = 2 + command->details.unsub.count;
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = &command->details.unsub.count;
lens[bufindex++] = sizeof(command->details.unsub.count);
for (i = 0; i < command->details.unsub.count; ++i)
{
bufs[bufindex] = command->details.unsub.topics[i];
lens[bufindex++] = strlen(command->details.unsub.topics[i]) + 1;
}
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break;
case PUBLISH:
nbufs = 6;
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &command->type;
lens[bufindex++] = sizeof(command->type);
bufs[bufindex] = command->details.pub.destinationName;
lens[bufindex++] = strlen(command->details.pub.destinationName) + 1;
bufs[bufindex] = &command->details.pub.payloadlen;
lens[bufindex++] = sizeof(command->details.pub.payloadlen);
bufs[bufindex] = command->details.pub.payload;
lens[bufindex++] = command->details.pub.payloadlen;
bufs[bufindex] = &command->details.pub.qos;
lens[bufindex++] = sizeof(command->details.pub.qos);
bufs[bufindex] = &command->details.pub.retained;
lens[bufindex++] = sizeof(command->details.pub.retained);
sprintf(key, "%s%d", PERSISTENCE_COMMAND_KEY, ++aclient->command_seqno);
break;
}
if (nbufs > 0)
{
if ((rc = aclient->c->persistence->pput(aclient->c->phandle, key, nbufs, (char**)bufs, lens)) != 0)
Log(LOG_ERROR, 0, "Error persisting command, rc %d", rc);
qcmd->seqno = aclient->command_seqno;
}
if (lens)
free(lens);
if (bufs)
free(bufs);
FUNC_EXIT_RC(rc);
return rc;
}
MQTTAsync_queuedCommand* MQTTAsync_restoreCommand(char* buffer, int buflen)
{
MQTTAsync_command* command = NULL;
MQTTAsync_queuedCommand* qcommand = NULL;
char* ptr = buffer;
int i, data_size;
FUNC_ENTRY;
qcommand = malloc(sizeof(MQTTAsync_queuedCommand));
memset(qcommand, '\0', sizeof(MQTTAsync_queuedCommand));
command = &qcommand->command;
command->type = *(int*)ptr;
ptr += sizeof(int);
switch (command->type)
{
case SUBSCRIBE:
command->details.sub.count = *(int*)ptr;
ptr += sizeof(int);
for (i = 0; i < command->details.sub.count; ++i)
{
data_size = strlen(ptr) + 1;
command->details.sub.topics[i] = malloc(data_size);
strcpy(command->details.sub.topics[i], ptr);
ptr += data_size;
command->details.sub.qoss[i] = *(int*)ptr;
ptr += sizeof(int);
}
break;
case UNSUBSCRIBE:
command->details.sub.count = *(int*)ptr;
ptr += sizeof(int);
for (i = 0; i < command->details.unsub.count; ++i)
{
int data_size = strlen(ptr) + 1;
command->details.unsub.topics[i] = malloc(data_size);
strcpy(command->details.unsub.topics[i], ptr);
ptr += data_size;
}
break;
case PUBLISH:
data_size = strlen(ptr) + 1;
command->details.pub.destinationName = malloc(data_size);
strcpy(command->details.pub.destinationName, ptr);
ptr += data_size;
command->details.pub.payloadlen = *(int*)ptr;
ptr += sizeof(int);
data_size = command->details.pub.payloadlen;
command->details.pub.payload = malloc(data_size);
memcpy(command->details.pub.payload, ptr, data_size);
ptr += data_size;
command->details.pub.qos = *(int*)ptr;
ptr += sizeof(int);
command->details.pub.retained = *(int*)ptr;
ptr += sizeof(int);
break;
default:
free(qcommand);
qcommand = NULL;
}
FUNC_EXIT;
return qcommand;
}
void MQTTAsync_insertInOrder(List* list, void* content, int size)
{
ListElement* index = NULL;
ListElement* current = NULL;
FUNC_ENTRY;
while (ListNextElement(list, ¤t) != NULL && index == NULL)
{
if (((MQTTAsync_queuedCommand*)content)->seqno < ((MQTTAsync_queuedCommand*)current->content)->seqno)
index = current;
}
ListInsert(list, content, size, index);
FUNC_EXIT;
}
int MQTTAsync_restoreCommands(MQTTAsyncs* client)
{
int rc = 0;
char **msgkeys;
int nkeys;
int i = 0;
Clients* c = client->c;
int commands_restored = 0;
FUNC_ENTRY;
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
{
while (rc == 0 && i < nkeys)
{
char *buffer = NULL;
int buflen;
if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) != 0)
;
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{
MQTTAsync_queuedCommand* cmd = MQTTAsync_restoreCommand(buffer, buflen);
if (cmd)
{
cmd->client = client;
cmd->seqno = atoi(msgkeys[i]+2);
MQTTPersistence_insertInOrder(commands, cmd, sizeof(MQTTAsync_queuedCommand));
free(buffer);
client->command_seqno = max(client->command_seqno, cmd->seqno);
commands_restored++;
}
}
if (msgkeys[i])
free(msgkeys[i]);
i++;
}
if (msgkeys != NULL)
free(msgkeys);
}
Log(TRACE_MINIMUM, -1, "%d commands restored for client %s", commands_restored, c->clientID);
FUNC_EXIT_RC(rc);
return rc;
}
#endif
int MQTTAsync_addCommand(MQTTAsync_queuedCommand* command, int command_size)
{
int rc;
FUNC_ENTRY;
rc = Thread_lock_mutex(mqttcommand_mutex);
command->command.start_time = MQTTAsync_start_clock();
if (command->command.type == CONNECT ||
(command->command.type == DISCONNECT && command->command.details.dis.internal))
{
MQTTAsync_queuedCommand* head = NULL;
if (commands->first)
head = (MQTTAsync_queuedCommand*)(commands->first->content);
if (head != NULL && head->client == command->client && head->command.type == command->command.type)
MQTTAsync_freeCommand(command); /* ignore duplicate connect or disconnect command */
else
ListInsert(commands, command, command_size, commands->first); /* add to the head of the list */
}
else
{
ListAppend(commands, command, command_size);
#if !defined(NO_PERSISTENCE)
if (command->client->c->persistence)
MQTTAsync_persistCommand(command);
#endif
}
rc = Thread_unlock_mutex(mqttcommand_mutex);
#if !defined(WIN32)
Thread_signal_cond(send_cond);
#else
if (!Thread_check_sem(send_sem))
Thread_post_sem(send_sem);
#endif
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_checkDisconnect(MQTTAsync handle, MQTTAsync_command* command)
{
MQTTAsyncs* m = handle;
FUNC_ENTRY;
/* wait for all inflight message flows to finish, up to timeout */;
if (m->c->outboundMsgs->count == 0 || MQTTAsync_elapsed(command->start_time) >= command->details.dis.timeout)
{
int was_connected = m->c->connected;
MQTTAsync_closeSession(m->c);
if (command->details.dis.internal && m->cl && was_connected)
{
Log(TRACE_MIN, -1, "Calling connectionLost for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->cl))(m->context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
else if (!command->details.dis.internal && command->onSuccess)
{
Log(TRACE_MIN, -1, "Calling disconnect complete for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->onSuccess))(command->context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
FUNC_EXIT;
}
/**
* See if any pending writes have been completed, and cleanup if so.
* Cleaning up means removing any publication data that was stored because the write did
* not originally complete.
*/
void MQTTProtocol_checkPendingWrites()
{
FUNC_ENTRY;
if (state.pending_writes.count > 0)
{
ListElement* le = state.pending_writes.first;
while (le)
{
if (Socket_noPendingWrites(((pending_write*)(le->content))->socket))
{
MQTTProtocol_removePublication(((pending_write*)(le->content))->p);
state.pending_writes.current = le;
ListRemove(&(state.pending_writes), le->content); /* does NextElement itself */
le = state.pending_writes.current;
}
else
ListNextElement(&(state.pending_writes), &le);
}
}
FUNC_EXIT;
}
void MQTTAsync_freeConnect(MQTTAsync_command command)
{
if (command.type == CONNECT)
{
int i;
for (i = 0; i < command.details.conn.serverURIcount; ++i)
free(command.details.conn.serverURIs[i]);
if (command.details.conn.serverURIs)
free(command.details.conn.serverURIs);
}
}
void MQTTAsync_freeCommand1(MQTTAsync_queuedCommand *command)
{
if (command->command.type == SUBSCRIBE)
{
int i;
for (i = 0; i < command->command.details.sub.count; i++)
{
free(command->command.details.sub.topics[i]);
free(command->command.details.sub.topics);
free(command->command.details.sub.qoss);
}
}
else if (command->command.type == UNSUBSCRIBE)
{
int i;
for (i = 0; i < command->command.details.unsub.count; i++)
{
free(command->command.details.unsub.topics[i]);
free(command->command.details.unsub.topics);
}
}
else if (command->command.type == PUBLISH)
{
/* qos 1 and 2 topics are freed in the protocol code when the flows are completed */
if (command->command.details.pub.destinationName)
free(command->command.details.pub.destinationName);
free(command->command.details.pub.payload);
}
}
void MQTTAsync_freeCommand(MQTTAsync_queuedCommand *command)
{
MQTTAsync_freeCommand1(command);
free(command);
}
void MQTTAsync_writeComplete(int socket)
{
ListElement* found = NULL;
FUNC_ENTRY;
/* a partial write is now complete for a socket - this will be on a publish*/
MQTTProtocol_checkPendingWrites();
/* find the client using this socket */
if ((found = ListFindItem(handles, &socket, clientSockCompare)) != NULL)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
time(&(m->c->net.lastContact));
/* see if there is a pending write flagged */
if (m->pending_write)
{
ListElement* cur_response = NULL;
MQTTAsync_command* command = m->pending_write;
MQTTAsync_queuedCommand* com = NULL;
while (ListNextElement(m->responses, &cur_response))
{
com = (MQTTAsync_queuedCommand*)(cur_response->content);
if (com->client->pending_write == m->pending_write)
break;
}
if (cur_response && command->onSuccess)
{
MQTTAsync_successData data;
data.token = command->token;
data.alt.pub.destinationName = command->details.pub.destinationName;
data.alt.pub.message.payload = command->details.pub.payload;
data.alt.pub.message.payloadlen = command->details.pub.payloadlen;
data.alt.pub.message.qos = command->details.pub.qos;
data.alt.pub.message.retained = command->details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
(*(command->onSuccess))(command->context, &data);
}
m->pending_write = NULL;
ListDetach(m->responses, com);
MQTTAsync_freeCommand(com);
}
}
FUNC_EXIT;
}
void MQTTAsync_processCommand()
{
int rc = 0;
MQTTAsync_queuedCommand* command = NULL;
ListElement* cur_command = NULL;
List* ignored_clients = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
Thread_lock_mutex(mqttcommand_mutex);
/* only the first command in the list must be processed for any particular client, so if we skip
a command for a client, we must skip all following commands for that client. Use a list of
ignored clients to keep track
*/
ignored_clients = ListInitialize();
/* don't try a command until there isn't a pending write for that client, and we are not connecting */
while (ListNextElement(commands, &cur_command))
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(cur_command->content);
if (ListFind(ignored_clients, cmd->client))
continue;
if (cmd->command.type == CONNECT || cmd->command.type == DISCONNECT || (cmd->client->c->connected &&
cmd->client->c->connect_state == 0 && Socket_noPendingWrites(cmd->client->c->net.socket)))
{
if ((cmd->command.type == PUBLISH || cmd->command.type == SUBSCRIBE || cmd->command.type == UNSUBSCRIBE) &&
cmd->client->c->outboundMsgs->count >= MAX_MSG_ID - 1)
; /* no more message ids available */
else
{
command = cmd;
break;
}
}
ListAppend(ignored_clients, cmd->client, sizeof(cmd->client));
}
ListFreeNoContent(ignored_clients);
if (command)
{
ListDetach(commands, command);
#if !defined(NO_PERSISTENCE)
if (command->client->c->persistence)
MQTTAsync_unpersistCommand(command);
#endif
}
Thread_unlock_mutex(mqttcommand_mutex);
if (!command)
goto exit; /* nothing to do */
if (command->command.type == CONNECT)
{
if (command->client->c->connect_state != 0 || command->client->c->connected)
rc = 0;
else
{
char* serverURI = command->client->serverURI;
if (command->command.details.conn.serverURIcount > 0)
{
serverURI = command->command.details.conn.serverURIs[command->command.details.conn.currentURI++];
if (strncmp(URI_TCP, serverURI, strlen(URI_TCP)) == 0)
serverURI += strlen(URI_TCP);
#if defined(OPENSSL)
else if (strncmp(URI_SSL, serverURI, strlen(URI_SSL)) == 0)
{
serverURI += strlen(URI_SSL);
command->client->ssl = 1;
}
#endif
}
Log(TRACE_MIN, -1, "Connecting to serverURI %s", serverURI);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl);
#else
rc = MQTTProtocol_connect(serverURI, command->client->c);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
/* if the TCP connect is pending, then we must call select to determine when the connect has completed,
which is indicated by the socket being ready *either* for reading *or* writing. The next couple of lines
make sure we check for writeability as well as readability, otherwise we wait around longer than we need to
in Socket_getReadySocket() */
if (rc == EINPROGRESS)
Socket_addPendingWrite(command->client->c->net.socket);
}
}
else if (command->command.type == SUBSCRIBE)
{
List* topics = ListInitialize();
List* qoss = ListInitialize();
int i;
for (i = 0; i < command->command.details.sub.count; i++)
{
ListAppend(topics, command->command.details.sub.topics[i], strlen(command->command.details.sub.topics[i]));
ListAppend(qoss, &command->command.details.sub.qoss[i], sizeof(int));
}
rc = MQTTProtocol_subscribe(command->client->c, topics, qoss);
ListFreeNoContent(topics);
ListFreeNoContent(qoss);
}
else if (command->command.type == UNSUBSCRIBE)
{
List* topics = ListInitialize();
int i;
for (i = 0; i < command->command.details.unsub.count; i++)
ListAppend(topics, command->command.details.unsub.topics[i], strlen(command->command.details.unsub.topics[i]));
rc = MQTTProtocol_unsubscribe(command->client->c, topics);
ListFreeNoContent(topics);
}
else if (command->command.type == PUBLISH)
{
Messages* msg = NULL;
Publish* p = NULL;
p = malloc(sizeof(Publish));
p->payload = command->command.details.pub.payload;
p->payloadlen = command->command.details.pub.payloadlen;
p->topic = command->command.details.pub.destinationName;
p->msgId = -1;
rc = MQTTProtocol_startPublish(command->client->c, p, command->command.details.pub.qos, command->command.details.pub.retained, &msg);
if (command->command.details.pub.qos == 0)
{
if (rc == TCPSOCKET_COMPLETE)
{
if (command->command.onSuccess)
{
MQTTAsync_successData data;
data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload;
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", command->client->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
Thread_lock_mutex(mqttasync_mutex);
}
}
else
{
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
command->client->pending_write = &command->command;
}
}
else
command->command.details.pub.destinationName = NULL; /* this will be freed by the protocol code */
free(p); /* should this be done if the write isn't complete? */
}
else if (command->command.type == DISCONNECT)
{
if (command->client->c->connect_state != 0 || command->client->c->connected != 0)
{
command->client->c->connect_state = -2;
MQTTAsync_checkDisconnect(command->client, &command->command);
}
}
if (command->command.type == CONNECT && rc != SOCKET_ERROR && rc != MQTTASYNC_PERSISTENCE_ERROR)
{
command->client->connect = command->command;
MQTTAsync_freeCommand(command);
}
else if (command->command.type == DISCONNECT)
{
command->client->disconnect = command->command;
MQTTAsync_freeCommand(command);
}
else if (command->command.type == PUBLISH && command->command.details.pub.qos == 0)
{
if (rc == TCPSOCKET_INTERRUPTED)
ListAppend(command->client->responses, command, sizeof(command));
else
MQTTAsync_freeCommand(command);
}
else if (rc == SOCKET_ERROR || rc == MQTTASYNC_PERSISTENCE_ERROR)
{
if (command->command.type == CONNECT)
{
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_disconnect(command->client, &opts); /* not "internal" because we don't want to call connection lost */
}
else
MQTTAsync_disconnect_internal(command->client, 0);
if (command->command.type == CONNECT &&
command->command.details.conn.currentURI < command->command.details.conn.serverURIcount)
{
/* put the connect command back to the head of the command queue, using the next serverURI */
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
command->command.details.conn.serverURIs[command->command.details.conn.currentURI]);
rc = MQTTAsync_addCommand(command, sizeof(command->command.details.conn));
}
else
{
if (command->command.onFailure)
{
Log(TRACE_MIN, -1, "Calling command failure for client %s", command->client->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onFailure))(command->command.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeConnect(command->command);
MQTTAsync_freeCommand(command); /* free up the command if necessary */
}
}
else
{
/* put the command into a waiting for response queue for each client, indexed by msgid */
command->command.token = command->client->c->msgID;
ListAppend(command->client->responses, command, sizeof(command));
}
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
void MQTTAsync_checkTimeouts()
{
ListElement* current = NULL;
static time_t last = 0L;
time_t now;
FUNC_ENTRY;
time(&(now));
if (difftime(now, last) < 3)
goto exit;
Thread_lock_mutex(mqttasync_mutex);
last = now;
while (ListNextElement(handles, ¤t)) /* for each client */
{
ListElement* cur_response = NULL;
int i = 0,
timed_out_count = 0;
MQTTAsyncs* m = (MQTTAsyncs*)(current->content);
/* check connect timeout */
if (m->c->connect_state != 0 && MQTTAsync_elapsed(m->connect.start_time) > (m->connect.details.conn.timeout * 1000))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
continue;
}
/* check disconnect timeout */
if (m->c->connect_state == -2)
MQTTAsync_checkDisconnect(m, &m->disconnect);
timed_out_count = 0;
/* check response timeouts */
while (ListNextElement(m->responses, &cur_response))
{
MQTTAsync_queuedCommand* com = (MQTTAsync_queuedCommand*)(cur_response->content);
if (MQTTAsync_elapsed(com->command.start_time) < 30000)
break; /* command has not timed out */
else
{
if (com->command.onFailure)
{
Log(TRACE_MIN, -1, "Calling %s failure for client %s",
MQTTPacket_name(com->command.type), m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(com->command.onFailure))(com->command.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
timed_out_count++;
}
}
for (i = 0; i < timed_out_count; ++i)
ListRemoveHead(m->responses); /* remove the first response in the list */
}
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
thread_return_type WINAPI MQTTAsync_sendThread(void* n)
{
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
sendThread_state = RUNNING;
Thread_unlock_mutex(mqttasync_mutex);
while (!tostop)
{
/*int rc;*/
while (commands->count > 0)
MQTTAsync_processCommand();
#if !defined(WIN32)
/*rc =*/ Thread_wait_cond_timeout(send_cond, 1);
#else
/*rc =*/ Thread_wait_sem_timeout(send_sem, 1);
#endif
MQTTAsync_checkTimeouts();
}
sendThread_state = STOPPING;
Thread_lock_mutex(mqttasync_mutex);
sendThread_state = STOPPED;
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
return 0;
}
void MQTTAsync_emptyMessageQueue(Clients* client)
{
FUNC_ENTRY;
/* empty message queue */
if (client->messageQueue->count > 0)
{
ListElement* current = NULL;
while (ListNextElement(client->messageQueue, ¤t))
{
qEntry* qe = (qEntry*)(current->content);
free(qe->topicName);
free(qe->msg->payload);
free(qe->msg);
}
ListEmpty(client->messageQueue);
}
FUNC_EXIT;
}
void MQTTAsync_removeResponsesAndCommands(MQTTAsyncs* m)
{
int count = 0;
ListElement* current = NULL;
ListElement *next = NULL;
FUNC_ENTRY;
if (m->responses)
{
ListElement* elem = NULL;
while (ListNextElement(m->responses, &elem))
{
MQTTAsync_freeCommand1((MQTTAsync_queuedCommand*) (elem->content));
count++;
}
}
Log(TRACE_MINIMUM, -1, "%d responses removed for client %s", count, m->c->clientID);
/* remove commands in the command queue relating to this client */
count = 0;
current = ListNextElement(commands, &next);
ListNextElement(commands, &next);
while (current)
{
MQTTAsync_queuedCommand* cmd = (MQTTAsync_queuedCommand*)(current->content);
if (cmd->client == m)
{
ListDetach(commands, cmd);
MQTTAsync_freeCommand(cmd);
count++;
}
current = next;
ListNextElement(commands, &next);
}
Log(TRACE_MINIMUM, -1, "%d commands removed for client %s", count, m->c->clientID);
FUNC_EXIT;
}
void MQTTAsync_destroy(MQTTAsync* handle)
{
MQTTAsyncs* m = *handle;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
if (m == NULL)
goto exit;
MQTTAsync_removeResponsesAndCommands(m);
ListFree(m->responses);
if (m->c)
{
int saved_socket = m->c->net.socket;
char* saved_clientid = malloc(strlen(m->c->clientID)+1);
strcpy(saved_clientid, m->c->clientID);
#if !defined(NO_PERSISTENCE)
MQTTPersistence_close(m->c);
#endif
MQTTAsync_emptyMessageQueue(m->c);
MQTTProtocol_freeClient(m->c);
if (!ListRemove(bstate->clients, m->c))
Log(LOG_ERROR, 0, NULL);
else
Log(TRACE_MIN, 1, NULL, saved_clientid, saved_socket);
free(saved_clientid);
}
if (m->serverURI)
free(m->serverURI);
if (!ListRemove(handles, m))
Log(LOG_ERROR, -1, "free error");
*handle = NULL;
if (bstate->clients->count == 0)
MQTTAsync_terminate();
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT;
}
void MQTTAsync_freeMessage(MQTTAsync_message** message)
{
FUNC_ENTRY;
free((*message)->payload);
free(*message);
*message = NULL;
FUNC_EXIT;
}
void MQTTAsync_free(void* memory)
{
FUNC_ENTRY;
free(memory);
FUNC_EXIT;
}
int MQTTAsync_completeConnection(MQTTAsyncs* m, MQTTPacket* pack)
{
int rc = MQTTASYNC_FAILURE;
FUNC_ENTRY;
if (m->c->connect_state == 3) /* MQTT connect sent - wait for CONNACK */
{
Connack* connack = (Connack*)pack;
Log(LOG_PROTOCOL, 1, NULL, m->c->net.socket, m->c->clientID, connack->rc);
if ((rc = connack->rc) == MQTTASYNC_SUCCESS)
{
m->c->connected = 1;
m->c->good = 1;
m->c->connect_state = 0;
if (m->c->cleansession)
rc = MQTTAsync_cleanSession(m->c);
if (m->c->outboundMsgs->count > 0)
{
ListElement* outcurrent = NULL;
while (ListNextElement(m->c->outboundMsgs, &outcurrent))
{
Messages* m = (Messages*)(outcurrent->content);
m->lastTouch = 0;
}
MQTTProtocol_retry(m->c->net.lastContact, 1);
if (m->c->connected != 1)
rc = MQTTASYNC_DISCONNECTED;
}
}
free(connack);
m->pack = NULL;
}
FUNC_EXIT_RC(rc);
return rc;
}
/* This is the thread function that handles the calling of callback functions if set */
thread_return_type WINAPI MQTTAsync_receiveThread(void* n)
{
long timeout = 10L; /* first time in we have a small timeout. Gets things started more quickly */
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
receiveThread_state = RUNNING;
while (!tostop)
{
int rc = SOCKET_ERROR;
int sock = -1;
MQTTAsyncs* m = NULL;
MQTTPacket* pack = NULL;
Thread_unlock_mutex(mqttasync_mutex);
pack = MQTTAsync_cycle(&sock, timeout, &rc);
Thread_lock_mutex(mqttasync_mutex);
if (tostop)
break;
timeout = 1000L;
/* find client corresponding to socket */
if (ListFindItem(handles, &sock, clientSockCompare) == NULL)
{
/* assert: should not happen */
continue;
}
m = (MQTTAsyncs*)(handles->current->content);
if (m == NULL)
{
/* assert: should not happen */
continue;
}
if (rc == SOCKET_ERROR)
{
Thread_unlock_mutex(mqttasync_mutex);
MQTTAsync_disconnect_internal(m, 0);
Thread_lock_mutex(mqttasync_mutex);
}
else
{
if (m->c->messageQueue->count > 0)
{
qEntry* qe = (qEntry*)(m->c->messageQueue->first->content);
int topicLen = qe->topicLen;
if (strlen(qe->topicName) == topicLen)
topicLen = 0;
if (m->ma)
rc = MQTTAsync_deliverMessage(m, qe->topicName, topicLen, qe->msg);
else
rc = 1;
if (rc)
{
ListRemove(m->c->messageQueue, qe);
#if !defined(NO_PERSISTENCE)
if (m->c->persistence)
MQTTAsync_unpersistQueueEntry(m->c, qe);
#endif
}
else
Log(TRACE_MIN, -1, "False returned from messageArrived for client %s, message remains on queue",
m->c->clientID);
}
if (pack)
{
if (pack->header.bits.type == CONNACK)
{
int rc = MQTTAsync_completeConnection(m, pack);
if (rc == MQTTASYNC_SUCCESS)
{
if (m->connect.details.conn.serverURIcount > 0)
Log(TRACE_MIN, -1, "Connect succeeded to %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI - 1]);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onSuccess)
{
Log(TRACE_MIN, -1, "Calling connect success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onSuccess))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
else
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
MQTTAsync_failureData data;
data.token = 0;
data.code = rc;
data.message = "CONNACK return code";
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, &data);
Thread_lock_mutex(mqttasync_mutex);
}
}
}
}
else if (pack->header.bits.type == SUBACK)
{
ListElement* current = NULL;
int handleCalled = 0;
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, ¤t))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == ((Suback*)pack)->msgId)
{
if (!ListDetach(m->responses, command)) /* remove the response from the list */
Log(LOG_ERROR, -1, "Subscribe command not removed from command list");
if (command->command.onSuccess)
{
MQTTAsync_successData data;
Suback* sub = (Suback*)pack;
int* array = NULL;
if (sub->qoss->count == 1)
data.alt.qos = *(int*)(sub->qoss->first->content);
else if (sub->qoss->count > 1)
{
ListElement* cur_qos = NULL;
int* element = array = data.alt.qosList = malloc(sub->qoss->count * sizeof(int));
while (ListNextElement(sub->qoss, &cur_qos))
*element++ = *(int*)(cur_qos->content);
}
data.token = command->command.token;
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
handleCalled = 1;
Log(TRACE_MIN, -1, "Calling subscribe success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
Thread_lock_mutex(mqttasync_mutex);
if (array)
free(array);
}
MQTTAsync_freeCommand(command);
break;
}
}
if (!handleCalled)
rc = MQTTProtocol_handleSubacks(pack, m->c->net.socket);
}
else if (pack->header.bits.type == UNSUBACK)
{
ListElement* current = NULL;
int handleCalled = 0;
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, ¤t))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == ((Unsuback*)pack)->msgId)
{
if (!ListDetach(m->responses, command)) /* remove the response from the list */
Log(LOG_ERROR, -1, "Unsubscribe command not removed from command list");
if (command->command.onSuccess)
{
rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
handleCalled = 1;
Log(TRACE_MIN, -1, "Calling unsubscribe success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeCommand(command);
break;
}
}
if (!handleCalled)
rc = MQTTProtocol_handleUnsubacks(pack, m->c->net.socket);
}
}
}
}
receiveThread_state = STOPPED;
Thread_unlock_mutex(mqttasync_mutex);
#if !defined(WIN32)
if (sendThread_state != STOPPED)
Thread_signal_cond(send_cond);
#else
if (sendThread_state != STOPPED && !Thread_check_sem(send_sem))
Thread_post_sem(send_sem);
#endif
FUNC_EXIT;
return 0;
}
void MQTTAsync_stop()
{
int rc = 0;
FUNC_ENTRY;
if (sendThread_state != STOPPED || receiveThread_state != STOPPED)
{
int conn_count = 0;
ListElement* current = NULL;
if (handles != NULL)
{
/* find out how many handles are still connected */
while (ListNextElement(handles, ¤t))
{
if (((MQTTAsyncs*)(current->content))->c->connect_state > 0 ||
((MQTTAsyncs*)(current->content))->c->connected)
++conn_count;
}
}
Log(TRACE_MIN, -1, "Conn_count is %d", conn_count);
/* stop the background thread, if we are the last one to be using it */
if (conn_count == 0)
{
int count = 0;
tostop = 1;
while ((sendThread_state != STOPPED || receiveThread_state != STOPPED) && ++count < 100)
{
Thread_unlock_mutex(mqttasync_mutex);
Log(TRACE_MIN, -1, "sleeping");
MQTTAsync_sleep(100L);
Thread_lock_mutex(mqttasync_mutex);
}
rc = 1;
tostop = 0;
}
}
FUNC_EXIT_RC(rc);
}
int MQTTAsync_setCallbacks(MQTTAsync handle, void* context,
MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma,
MQTTAsync_deliveryComplete* dc)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
if (m == NULL || ma == NULL || m->c->connect_state != 0)
rc = MQTTASYNC_FAILURE;
else
{
m->context = context;
m->cl = cl;
m->ma = ma;
m->dc = dc;
}
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_closeOnly(Clients* client)
{
FUNC_ENTRY;
client->good = 0;
client->ping_outstanding = 0;
if (client->net.socket > 0)
{
if (client->connected || client->connect_state)
MQTTPacket_send_disconnect(&client->net, client->clientID);
#if defined(OPENSSL)
SSLSocket_close(&client->net);
#endif
Socket_close(client->net.socket);
client->net.socket = 0;
#if defined(OPENSSL)
client->net.ssl = NULL;
#endif
}
client->connected = 0;
client->connect_state = 0;
FUNC_EXIT;
}
void MQTTAsync_closeSession(Clients* client)
{
FUNC_ENTRY;
MQTTAsync_closeOnly(client);
if (client->cleansession)
MQTTAsync_cleanSession(client);
FUNC_EXIT;
}
/**
* List callback function for comparing clients by client structure
* @param a Async structure
* @param b Client structure
* @return boolean indicating whether a and b are equal
*/
int clientStructCompare(void* a, void* b)
{
MQTTAsyncs* m = (MQTTAsyncs*)a;
return m->c == (Clients*)b;
}
int MQTTAsync_cleanSession(Clients* client)
{
int rc = 0;
ListElement* found = NULL;
FUNC_ENTRY;
#if !defined(NO_PERSISTENCE)
rc = MQTTPersistence_clear(client);
#endif
MQTTProtocol_emptyMessageList(client->inboundMsgs);
MQTTProtocol_emptyMessageList(client->outboundMsgs);
MQTTAsync_emptyMessageQueue(client);
client->msgID = 0;
if ((found = ListFindItem(handles, client, clientStructCompare)) != NULL)
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
MQTTAsync_removeResponsesAndCommands(m);
}
else
Log(LOG_ERROR, -1, "cleanSession: did not find client structure in handles list");
FUNC_EXIT_RC(rc);
return rc;
}
#if !defined(NO_PERSISTENCE)
int MQTTAsync_unpersistQueueEntry(Clients* client, qEntry* qe)
{
int rc = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
FUNC_ENTRY;
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, qe->seqno);
if ((rc = client->persistence->premove(client->phandle, key)) != 0)
Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_persistQueueEntry(Clients* aclient, qEntry* qe)
{
int rc = 0;
int nbufs = 8;
int bufindex = 0;
char key[PERSISTENCE_MAX_KEY_LENGTH + 1];
int* lens = NULL;
void** bufs = NULL;
FUNC_ENTRY;
lens = (int*)malloc(nbufs * sizeof(int));
bufs = malloc(nbufs * sizeof(char *));
bufs[bufindex] = &qe->msg->payloadlen;
lens[bufindex++] = sizeof(qe->msg->payloadlen);
bufs[bufindex] = qe->msg->payload;
lens[bufindex++] = qe->msg->payloadlen;
bufs[bufindex] = &qe->msg->qos;
lens[bufindex++] = sizeof(qe->msg->qos);
bufs[bufindex] = &qe->msg->retained;
lens[bufindex++] = sizeof(qe->msg->retained);
bufs[bufindex] = &qe->msg->dup;
lens[bufindex++] = sizeof(qe->msg->dup);
bufs[bufindex] = &qe->msg->msgid;
lens[bufindex++] = sizeof(qe->msg->msgid);
bufs[bufindex] = qe->topicName;
lens[bufindex++] = strlen(qe->topicName) + 1;
bufs[bufindex] = &qe->topicLen;
lens[bufindex++] = sizeof(qe->topicLen);
sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno);
qe->seqno = aclient->qentry_seqno;
if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0)
Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc);
free(lens);
free(bufs);
FUNC_EXIT_RC(rc);
return rc;
}
qEntry* MQTTAsync_restoreQueueEntry(char* buffer, int buflen)
{
qEntry* qe = NULL;
char* ptr = buffer;
int data_size;
FUNC_ENTRY;
qe = malloc(sizeof(qEntry));
memset(qe, '\0', sizeof(qEntry));
qe->msg = malloc(sizeof(MQTTAsync_message));
memset(qe->msg, '\0', sizeof(MQTTAsync_message));
qe->msg->payloadlen = *(int*)ptr;
ptr += sizeof(int);
data_size = qe->msg->payloadlen;
qe->msg->payload = malloc(data_size);
memcpy(qe->msg->payload, ptr, data_size);
ptr += data_size;
qe->msg->qos = *(int*)ptr;
ptr += sizeof(int);
qe->msg->retained = *(int*)ptr;
ptr += sizeof(int);
qe->msg->dup = *(int*)ptr;
ptr += sizeof(int);
qe->msg->msgid = *(int*)ptr;
ptr += sizeof(int);
data_size = strlen(ptr) + 1;
qe->topicName = malloc(data_size);
strcpy(qe->topicName, ptr);
ptr += data_size;
qe->topicLen = *(int*)ptr;
ptr += sizeof(int);
FUNC_EXIT;
return qe;
}
int MQTTAsync_restoreMessageQueue(MQTTAsyncs* client)
{
int rc = 0;
char **msgkeys;
int nkeys;
int i = 0;
Clients* c = client->c;
int entries_restored = 0;
FUNC_ENTRY;
if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0)
{
while (rc == 0 && i < nkeys)
{
char *buffer = NULL;
int buflen;
if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0)
;
else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0)
{
qEntry* qe = MQTTAsync_restoreQueueEntry(buffer, buflen);
if (qe)
{
qe->seqno = atoi(msgkeys[i]+2);
MQTTPersistence_insertInOrder(c->messageQueue, qe, sizeof(qEntry));
free(buffer);
c->qentry_seqno = max(c->qentry_seqno, qe->seqno);
entries_restored++;
}
}
if (msgkeys[i])
free(msgkeys[i]);
i++;
}
if (msgkeys != NULL)
free(msgkeys);
}
Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID);
FUNC_EXIT_RC(rc);
return rc;
}
#endif
int MQTTAsync_deliverMessage(MQTTAsyncs* m, char* topicName, int topicLen, MQTTAsync_message* mm)
{
int rc;
Log(TRACE_MIN, -1, "Calling messageArrived for client %s, queue depth %d",
m->c->clientID, m->c->messageQueue->count);
Thread_unlock_mutex(mqttasync_mutex);
rc = (*(m->ma))(m->context, topicName, topicLen, mm);
Thread_lock_mutex(mqttasync_mutex);
/* if 0 (false) is returned by the callback then it failed, so we don't remove the message from
* the queue, and it will be retried later. If 1 is returned then the message data may have been freed,
* so we must be careful how we use it.
*/
return rc;
}
void Protocol_processPublication(Publish* publish, Clients* client)
{
MQTTAsync_message* mm = NULL;
int rc = 0;
FUNC_ENTRY;
mm = malloc(sizeof(MQTTAsync_message));
/* If the message is QoS 2, then we have already stored the incoming payload
* in an allocated buffer, so we don't need to copy again.
*/
if (publish->header.bits.qos == 2)
mm->payload = publish->payload;
else
{
mm->payload = malloc(publish->payloadlen);
memcpy(mm->payload, publish->payload, publish->payloadlen);
}
mm->payloadlen = publish->payloadlen;
mm->qos = publish->header.bits.qos;
mm->retained = publish->header.bits.retain;
if (publish->header.bits.qos == 2)
mm->dup = 0; /* ensure that a QoS2 message is not passed to the application with dup = 1 */
else
mm->dup = publish->header.bits.dup;
mm->msgid = publish->msgId;
if (client->messageQueue->count == 0 && client->connected)
{
ListElement* found = NULL;
if ((found = ListFindItem(handles, client, clientStructCompare)) == NULL)
Log(LOG_ERROR, -1, "processPublication: did not find client structure in handles list");
else
{
MQTTAsyncs* m = (MQTTAsyncs*)(found->content);
if (m->ma)
rc = MQTTAsync_deliverMessage(m, publish->topic, publish->topiclen, mm);
}
}
if (rc == 0) /* if message was not delivered, queue it up */
{
qEntry* qe = malloc(sizeof(qEntry));
qe->msg = mm;
qe->topicName = publish->topic;
qe->topicLen = publish->topiclen;
ListAppend(client->messageQueue, qe, sizeof(qe) + sizeof(mm) + mm->payloadlen + strlen(qe->topicName)+1);
#if !defined(NO_PERSISTENCE)
if (client->persistence)
MQTTAsync_persistQueueEntry(client, qe);
#endif
}
publish->topic = NULL;
FUNC_EXIT;
}
int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options)
{
MQTTAsyncs* m = handle;
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* conn;
FUNC_ENTRY;
if (options == NULL)
{
rc = MQTTASYNC_NULL_PARAMETER;
goto exit;
}
if (strncmp(options->struct_id, "MQTC", 4) != 0 ||
(options->struct_version != 0 && options->struct_version != 1 && options->struct_version != 2))
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
if (options->will) /* check validity of will options structure */
{
if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || options->will->struct_version != 0)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
if (options->will->qos < 0 || options->will->qos > 2)
{
rc = MQTTASYNC_BAD_QOS;
goto exit;
}
}
if (options->struct_version != 0 && options->ssl) /* check validity of SSL options structure */
{
if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version != 0)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
}
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
}
m->connect.onSuccess = options->onSuccess;
m->connect.onFailure = options->onFailure;
m->connect.context = options->context;
tostop = 0;
if (sendThread_state != STARTING && sendThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
sendThread_state = STARTING;
Thread_start(MQTTAsync_sendThread, NULL);
Thread_unlock_mutex(mqttasync_mutex);
}
if (receiveThread_state != STARTING && receiveThread_state != RUNNING)
{
Thread_lock_mutex(mqttasync_mutex);
receiveThread_state = STARTING;
Thread_start(MQTTAsync_receiveThread, handle);
Thread_unlock_mutex(mqttasync_mutex);
}
m->c->keepAliveInterval = options->keepAliveInterval;
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = options->maxInflight;
if (m->c->will)
{
free(m->c->will);
m->c->will = NULL;
}
if (options->will && options->will->struct_version == 0)
{
m->c->will = malloc(sizeof(willMessages));
m->c->will->msg = malloc(strlen(options->will->message) + 1);
strcpy(m->c->will->msg, options->will->message);
m->c->will->qos = options->will->qos;
m->c->will->retained = options->will->retained;
m->c->will->topic = malloc(strlen(options->will->topicName) + 1);
strcpy(m->c->will->topic, options->will->topicName);
}
#if defined(OPENSSL)
if (options->struct_version != 0 && options->ssl)
{
m->c->sslopts = malloc(sizeof(MQTTClient_SSLOptions));
m->c->sslopts->trustStore = options->ssl->trustStore;
m->c->sslopts->keyStore = options->ssl->keyStore;
m->c->sslopts->privateKey = options->ssl->privateKey;
m->c->sslopts->privateKeyPassword = options->ssl->privateKeyPassword;
m->c->sslopts->enabledCipherSuites = options->ssl->enabledCipherSuites;
m->c->sslopts->enableServerCertAuth = options->ssl->enableServerCertAuth;
}
#endif
m->c->username = options->username;
m->c->password = options->password;
m->c->retryInterval = options->retryInterval;
/* Add connect request to operation queue */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
if (options)
{
conn->command.onSuccess = options->onSuccess;
conn->command.onFailure = options->onFailure;
conn->command.context = options->context;
conn->command.details.conn.timeout = options->connectTimeout;
if (options->struct_version >= 2 && options->serverURIcount > 0)
{
int i;
conn->command.details.conn.serverURIcount = options->serverURIcount;
conn->command.details.conn.serverURIs = malloc(options->serverURIcount * sizeof(char*));
for (i = 0; i < options->serverURIcount; ++i)
{
conn->command.details.conn.serverURIs[i] = malloc(strlen(options->serverURIs[i]) + 1);
strcpy(conn->command.details.conn.serverURIs[i], options->serverURIs[i]);
}
conn->command.details.conn.currentURI = 0;
}
}
conn->command.type = CONNECT;
rc = MQTTAsync_addCommand(conn, sizeof(conn));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_disconnect1(MQTTAsync handle, MQTTAsync_disconnectOptions* options, int internal)
{
MQTTAsyncs* m = handle;
int rc = MQTTASYNC_SUCCESS;
MQTTAsync_queuedCommand* dis;
FUNC_ENTRY;
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
/* Add disconnect request to operation queue */
dis = malloc(sizeof(MQTTAsync_queuedCommand));
memset(dis, '\0', sizeof(MQTTAsync_queuedCommand));
dis->client = m;
if (options)
{
dis->command.onSuccess = options->onSuccess;
dis->command.onFailure = options->onFailure;
dis->command.context = options->context;
dis->command.details.dis.timeout = options->timeout;
}
dis->command.type = DISCONNECT;
dis->command.details.dis.internal = internal;
rc = MQTTAsync_addCommand(dis, sizeof(dis));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_disconnect_internal(MQTTAsync handle, int timeout)
{
MQTTAsync_disconnectOptions options = MQTTAsync_disconnectOptions_initializer;
options.timeout = timeout;
return MQTTAsync_disconnect1(handle, &options, 1);
}
void MQTTProtocol_closeSession(Clients* c, int sendwill)
{
MQTTAsync_disconnect_internal((MQTTAsync)c->context, 0);
}
int MQTTAsync_disconnect(MQTTAsync handle, MQTTAsync_disconnectOptions* options)
{
return MQTTAsync_disconnect1(handle, options, 0);
}
int MQTTAsync_isConnected(MQTTAsync handle)
{
MQTTAsyncs* m = handle;
int rc = 0;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
if (m && m->c)
rc = m->c->connected;
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char** topic, int* qos, MQTTAsync_responseOptions* response)
{
MQTTAsyncs* m = handle;
int i = 0;
int rc = MQTTASYNC_FAILURE;
MQTTAsync_queuedCommand* sub;
FUNC_ENTRY;
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
}
if (qos[i] < 0 || qos[i] > 2)
{
rc = MQTTASYNC_BAD_QOS;
goto exit;
}
}
/* Add subscribe request to operation queue */
sub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(sub, '\0', sizeof(MQTTAsync_queuedCommand));
sub->client = m;
if (response)
{
sub->command.onSuccess = response->onSuccess;
sub->command.onFailure = response->onFailure;
sub->command.context = response->context;
}
sub->command.type = SUBSCRIBE;
sub->command.details.sub.count = count;
sub->command.details.sub.topics = malloc(sizeof(char*) * count);
sub->command.details.sub.qoss = malloc(sizeof(int) * count);
for (i = 0; i < count; ++i)
{
sub->command.details.sub.topics[i] = malloc(strlen(topic[i]) + 1);
strcpy(sub->command.details.sub.topics[i], topic[i]);
sub->command.details.sub.qoss[i] = qos[i];
}
rc = MQTTAsync_addCommand(sub, sizeof(sub));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_subscribe(MQTTAsync handle, char* topic, int qos, MQTTAsync_responseOptions* response)
{
int rc = 0;
FUNC_ENTRY;
rc = MQTTAsync_subscribeMany(handle, 1, &topic, &qos, response);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char** topic, MQTTAsync_responseOptions* response)
{
MQTTAsyncs* m = handle;
int i = 0;
int rc = SOCKET_ERROR;
MQTTAsync_queuedCommand* unsub;
FUNC_ENTRY;
if (m == NULL || m->c == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c->connected == 0)
{
rc = MQTTASYNC_DISCONNECTED;
goto exit;
}
if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
{
rc = MQTTASYNC_NO_MORE_MSGIDS;
goto exit;
}
for (i = 0; i < count; i++)
{
if (!UTF8_validateString(topic[i]))
{
rc = MQTTASYNC_BAD_UTF8_STRING;
goto exit;
}
}
/* Add unsubscribe request to operation queue */
unsub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(unsub, '\0', sizeof(MQTTAsync_queuedCommand));
unsub->client = m;
unsub->command.type = UNSUBSCRIBE;
if (response)
{
unsub->command.onSuccess = response->onSuccess;
unsub->command.onFailure = response->onFailure;
unsub->command.context = response->context;
}
unsub->command.details.unsub.count = count;
unsub->command.details.unsub.topics = malloc(sizeof(char*) * count);
for (i = 0; i < count; ++i)
{
unsub->command.details.unsub.topics[i] = malloc(strlen(topic[i]) + 1);
strcpy(unsub->command.details.unsub.topics[i], topic[i]);
}
rc = MQTTAsync_addCommand(unsub, sizeof(unsub));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_unsubscribe(MQTTAsync handle, char* topic, MQTTAsync_responseOptions* response)
{
int rc = 0;
FUNC_ENTRY;
rc = MQTTAsync_unsubscribeMany(handle, 1, &topic, response);
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_send(MQTTAsync handle, char* destinationName, int payloadlen, void* payload,
int qos, int retained, MQTTAsync_responseOptions* response)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
MQTTAsync_queuedCommand* pub;
FUNC_ENTRY;
if (m == NULL || m->c == NULL)
rc = MQTTASYNC_FAILURE;
else if (m->c->connected == 0)
rc = MQTTASYNC_DISCONNECTED;
else if (!UTF8_validateString(destinationName))
rc = MQTTASYNC_BAD_UTF8_STRING;
else if (qos < 0 || qos > 2)
rc = MQTTASYNC_BAD_QOS;
else if (m->c->outboundMsgs->count >= MAX_MSG_ID - 1)
rc = MQTTASYNC_NO_MORE_MSGIDS;
if (rc != MQTTASYNC_SUCCESS)
goto exit;
/* Add publish request to operation queue */
pub = malloc(sizeof(MQTTAsync_queuedCommand));
memset(pub, '\0', sizeof(MQTTAsync_queuedCommand));
pub->client = m;
pub->command.type = PUBLISH;
if (response)
{
pub->command.onSuccess = response->onSuccess;
pub->command.onFailure = response->onFailure;
pub->command.context = response->context;
}
pub->command.details.pub.destinationName = malloc(strlen(destinationName) + 1);
strcpy(pub->command.details.pub.destinationName, destinationName);
pub->command.details.pub.payloadlen = payloadlen;
pub->command.details.pub.payload = malloc(payloadlen);
memcpy(pub->command.details.pub.payload, payload, payloadlen);
pub->command.details.pub.qos = qos;
pub->command.details.pub.retained = retained;
rc = MQTTAsync_addCommand(pub, sizeof(pub));
exit:
FUNC_EXIT_RC(rc);
return rc;
}
int MQTTAsync_sendMessage(MQTTAsync handle, char* destinationName, MQTTAsync_message* message,
MQTTAsync_responseOptions* response)
{
int rc = MQTTASYNC_SUCCESS;
FUNC_ENTRY;
if (message == NULL)
{
rc = MQTTASYNC_NULL_PARAMETER;
goto exit;
}
if (strncmp(message->struct_id, "MQTM", 4) != 0 || message->struct_version != 0)
{
rc = MQTTASYNC_BAD_STRUCTURE;
goto exit;
}
rc = MQTTAsync_send(handle, destinationName, message->payloadlen, message->payload,
message->qos, message->retained, response);
exit:
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_retry(void)
{
static time_t last = 0L;
time_t now;
FUNC_ENTRY;
time(&(now));
if (difftime(now, last) > 5)
{
time(&(last));
MQTTProtocol_keepalive(now);
MQTTProtocol_retry(now, 1);
}
else
MQTTProtocol_retry(now, 0);
FUNC_EXIT;
}
int MQTTAsync_connecting(MQTTAsyncs* m)
{
int rc = -1;
FUNC_ENTRY;
if (m->c->connect_state == 1) /* TCP connect started - check for completion */
{
int error;
socklen_t len = sizeof(error);
if ((rc = getsockopt(m->c->net.socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len)) == 0)
rc = error;
if (rc != 0)
goto exit;
Socket_clearPendingWrite(m->c->net.socket);
#if defined(OPENSSL)
if (m->ssl)
{
if (SSLSocket_setSocketForSSL(&m->c->net, m->c->sslopts) != MQTTASYNC_SUCCESS)
{
if (m->c->session != NULL)
if ((rc = SSL_set_session(m->c->net.ssl, m->c->session)) != 1)
Log(TRACE_MIN, -1, "Failed to set SSL session with stored data, non critical");
rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket);
if (rc == -1)
m->c->connect_state = 2;
else if (rc == SSL_FATAL)
{
rc = SOCKET_ERROR;
goto exit;
}
else if (rc == 1) {
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
}
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
}
}
else
{
rc = SOCKET_ERROR;
goto exit;
}
}
else
{
#endif
m->c->connect_state = 3; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c)) == SOCKET_ERROR)
goto exit;
#if defined(OPENSSL)
}
#endif
}
#if defined(OPENSSL)
else if (m->c->connect_state == 2) /* SSL connect sent - wait for completion */
{
if ((rc = SSLSocket_connect(m->c->net.ssl, m->c->net.socket)) != 1)
goto exit;
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c)) == SOCKET_ERROR)
goto exit;
}
#endif
exit:
if ((rc != 0 && m->c->connect_state != 2) || (rc == SSL_FATAL))
{
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
}
FUNC_EXIT_RC(rc);
return rc;
}
MQTTPacket* MQTTAsync_cycle(int* sock, unsigned long timeout, int* rc)
{
struct timeval tp = {0L, 0L};
static Ack ack;
MQTTPacket* pack = NULL;
static int nosockets_count = 0;
FUNC_ENTRY;
if (timeout > 0L)
{
tp.tv_sec = timeout / 1000;
tp.tv_usec = (timeout % 1000) * 1000; /* this field is microseconds! */
}
#if defined(OPENSSL)
if ((*sock = SSLSocket_getPendingRead()) == -1)
{
#endif
/* 0 from getReadySocket indicates no work to do, -1 == error, but can happen normally */
*sock = Socket_getReadySocket(0, &tp);
if (!tostop && *sock == 0 && (tp.tv_sec > 0L || tp.tv_usec > 0L))
{
MQTTAsync_sleep(100L);
if (s.clientsds->count == 0)
{
if (++nosockets_count == 50) /* 5 seconds with no sockets */
tostop = 1;
}
}
else
nosockets_count = 0;
#if defined(OPENSSL)
}
#endif
Thread_lock_mutex(mqttasync_mutex);
if (*sock > 0)
{
MQTTAsyncs* m = NULL;
if (ListFindItem(handles, sock, clientSockCompare) != NULL)
m = (MQTTAsync)(handles->current->content);
if (m != NULL)
{
if (m->c->connect_state == 1 || m->c->connect_state == 2)
*rc = MQTTAsync_connecting(m);
else
pack = MQTTPacket_Factory(&m->c->net, rc);
if ((m->c->connect_state == 3) && (*rc == SOCKET_ERROR))
{
Log(TRACE_MINIMUM, -1, "CONNECT sent but MQTTPacket_Factory has returned SOCKET_ERROR");
if (m->connect.details.conn.currentURI < m->connect.details.conn.serverURIcount)
{
MQTTAsync_queuedCommand* conn;
MQTTAsync_closeOnly(m->c);
/* put the connect command back to the head of the command queue, using the next serverURI */
conn = malloc(sizeof(MQTTAsync_queuedCommand));
memset(conn, '\0', sizeof(MQTTAsync_queuedCommand));
conn->client = m;
conn->command = m->connect;
Log(TRACE_MIN, -1, "Connect failed, now trying %s",
m->connect.details.conn.serverURIs[m->connect.details.conn.currentURI]);
MQTTAsync_addCommand(conn, sizeof(m->connect));
}
else
{
MQTTAsync_closeSession(m->c);
MQTTAsync_freeConnect(m->connect);
if (m->connect.onFailure)
{
Log(TRACE_MIN, -1, "Calling connect failure for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(m->connect.onFailure))(m->connect.context, NULL);
Thread_lock_mutex(mqttasync_mutex);
}
}
}
}
if (pack)
{
int freed = 1;
/* Note that these handle... functions free the packet structure that they are dealing with */
if (pack->header.bits.type == PUBLISH)
*rc = MQTTProtocol_handlePublishes(pack, *sock);
else if (pack->header.bits.type == PUBACK || pack->header.bits.type == PUBCOMP)
{
int msgid;
ack = (pack->header.bits.type == PUBCOMP) ? *(Pubcomp*)pack : *(Puback*)pack;
msgid = ack.msgId;
*rc = (pack->header.bits.type == PUBCOMP) ?
MQTTProtocol_handlePubcomps(pack, *sock) : MQTTProtocol_handlePubacks(pack, *sock);
if (m)
{
ListElement* current = NULL;
if (m->dc)
{
Log(TRACE_MIN, -1, "Calling deliveryComplete for client %s, msgid %d", m->c->clientID, msgid);
(*(m->dc))(m->context, msgid);
}
/* use the msgid to find the callback to be called */
while (ListNextElement(m->responses, ¤t))
{
MQTTAsync_queuedCommand* command = (MQTTAsync_queuedCommand*)(current->content);
if (command->command.token == msgid)
{
if (!ListDetach(m->responses, command)) /* then remove the response from the list */
Log(LOG_ERROR, -1, "Publish command not removed from command list");
if (command->command.onSuccess)
{
MQTTAsync_successData data;
data.token = command->command.token;
data.alt.pub.destinationName = command->command.details.pub.destinationName;
data.alt.pub.message.payload = command->command.details.pub.payload;
data.alt.pub.message.payloadlen = command->command.details.pub.payloadlen;
data.alt.pub.message.qos = command->command.details.pub.qos;
data.alt.pub.message.retained = command->command.details.pub.retained;
Log(TRACE_MIN, -1, "Calling publish success for client %s", m->c->clientID);
Thread_unlock_mutex(mqttasync_mutex);
(*(command->command.onSuccess))(command->command.context, &data);
Thread_lock_mutex(mqttasync_mutex);
}
MQTTAsync_freeCommand(command);
break;
}
}
}
}
else if (pack->header.bits.type == PUBREC)
*rc = MQTTProtocol_handlePubrecs(pack, *sock);
else if (pack->header.bits.type == PUBREL)
*rc = MQTTProtocol_handlePubrels(pack, *sock);
else if (pack->header.bits.type == PINGRESP)
*rc = MQTTProtocol_handlePingresps(pack, *sock);
else
freed = 0;
if (freed)
pack = NULL;
}
}
MQTTAsync_retry();
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(*rc);
return pack;
}
int pubCompare(void* a, void* b)
{
Messages* msg = (Messages*)a;
return msg->publish == (Publications*)b;
}
int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens)
{
int rc = MQTTASYNC_SUCCESS;
MQTTAsyncs* m = handle;
*tokens = NULL;
FUNC_ENTRY;
Thread_lock_mutex(mqttasync_mutex);
if (m == NULL)
{
rc = MQTTASYNC_FAILURE;
goto exit;
}
if (m->c && m->c->outboundMsgs->count > 0)
{
ListElement* current = NULL;
int count = 0;
*tokens = malloc(sizeof(MQTTAsync_token) * (m->c->outboundMsgs->count + 1));
while (ListNextElement(m->c->outboundMsgs, ¤t))
{
Messages* m = (Messages*)(current->content);
(*tokens)[count++] = m->msgid;
}
(*tokens)[count] = -1;
}
exit:
Thread_unlock_mutex(mqttasync_mutex);
FUNC_EXIT_RC(rc);
return rc;
}
void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level)
{
Log_setTraceLevel(level);
}
void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback)
{
Log_setTraceCallback((Log_traceCallback*)callback);
}
MQTTAsync_nameValue* MQTTAsync_getVersionInfo()
{
#define MAX_INFO_STRINGS 8
static MQTTAsync_nameValue libinfo[MAX_INFO_STRINGS + 1];
int i = 0;
libinfo[i].name = "Product name";
libinfo[i++].value = "Paho Asynchronous MQTT C Client Library";
libinfo[i].name = "Version";
libinfo[i++].value = CLIENT_VERSION;
libinfo[i].name = "Build level";
libinfo[i++].value = BUILD_TIMESTAMP;
#if defined(OPENSSL)
libinfo[i].name = "OpenSSL version";
libinfo[i++].value = SSLeay_version(SSLEAY_VERSION);
libinfo[i].name = "OpenSSL flags";
libinfo[i++].value = SSLeay_version(SSLEAY_CFLAGS);
libinfo[i].name = "OpenSSL build timestamp";
libinfo[i++].value = SSLeay_version(SSLEAY_BUILT_ON);
libinfo[i].name = "OpenSSL platform";
libinfo[i++].value = SSLeay_version(SSLEAY_PLATFORM);
libinfo[i].name = "OpenSSL directory";
libinfo[i++].value = SSLeay_version(SSLEAY_DIR);
#endif
libinfo[i].name = NULL;
libinfo[i].value = NULL;
return libinfo;
}
/*******************************************************************************
* Copyright (c) 2009, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation
* Ian Craggs, Allan Stockdill-Mander - SSL connections
* Ian Craggs - multiple server connection support
*******************************************************************************/
/********************************************************************/
/**
* @cond MQTTAsync_main
* @mainpage Asynchronous MQTT client library for C
*
* © Copyright IBM Corp. 2009, 2013
*
* @brief An Asynchronous MQTT client library for C.
*
* An MQTT client application connects to MQTT-capable servers.
* A typical client is responsible for collecting information from a telemetry
* device and publishing the information to the server. It can also subscribe
* to topics, receive messages, and use this information to control the
* telemetry device.
*
* MQTT clients implement the published MQTT v3 protocol. You can write your own
* API to the MQTT protocol using the programming language and platform of your
* choice. This can be time-consuming and error-prone.
*
* To simplify writing MQTT client applications, this library encapsulates
* the MQTT v3 protocol for you. Using this library enables a fully functional
* MQTT client application to be written in a few lines of code.
* The information presented here documents the API provided
* by the Asynchronous MQTT Client library for C.
*
* <b>Using the client</b><br>
* Applications that use the client library typically use a similar structure:
* <ul>
* <li>Create a client object</li>
* <li>Set the options to connect to an MQTT server</li>
* <li>Set up callback functions</li>
* <li>Connect the client to an MQTT server</li>
* <li>Subscribe to any topics the client needs to receive</li>
* <li>Repeat until finished:</li>
* <ul>
* <li>Publish any messages the client needs to</li>
* <li>Handle any incoming messages</li>
* </ul>
* <li>Disconnect the client</li>
* <li>Free any memory being used by the client</li>
* </ul>
* Some simple examples are shown here:
* <ul>
* <li>@ref publish</li>
* <li>@ref subscribe</li>
* </ul>
* Additional information about important concepts is provided here:
* <ul>
* <li>@ref async</li>
* <li>@ref wildcard</li>
* <li>@ref qos</li>
* <li>@ref tracing</li>
* </ul>
* @endcond
*/
/// @cond EXCLUDE
#if !defined(MQTTASYNC_H)
#define MQTTASYNC_H
#if defined(WIN32)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#else
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#endif
#include <stdio.h>
/// @endcond
#if !defined(NO_PERSISTENCE)
#include "MQTTClientPersistence.h"
#endif
/**
* Return code: No error. Indicates successful completion of an MQTT client
* operation.
*/
#define MQTTASYNC_SUCCESS 0
/**
* Return code: A generic error code indicating the failure of an MQTT client
* operation.
*/
#define MQTTASYNC_FAILURE -1
/* error code -2 is MQTTAsync_PERSISTENCE_ERROR */
#define MQTTASYNC_PERSISTENCE_ERROR -2
/**
* Return code: The client is disconnected.
*/
#define MQTTASYNC_DISCONNECTED -3
/**
* Return code: The maximum number of messages allowed to be simultaneously
* in-flight has been reached.
*/
#define MQTTASYNC_MAX_MESSAGES_INFLIGHT -4
/**
* Return code: An invalid UTF-8 string has been detected.
*/
#define MQTTASYNC_BAD_UTF8_STRING -5
/**
* Return code: A NULL parameter has been supplied when this is invalid.
*/
#define MQTTASYNC_NULL_PARAMETER -6
/**
* Return code: The topic has been truncated (the topic string includes
* embedded NULL characters). String functions will not access the full topic.
* Use the topic length value to access the full topic.
*/
#define MQTTASYNC_TOPICNAME_TRUNCATED -7
/**
* Return code: A structure parameter does not have the correct eyecatcher
* and version number.
*/
#define MQTTASYNC_BAD_STRUCTURE -8
/**
* Return code: A qos parameter is not 0, 1 or 2
*/
#define MQTTASYNC_BAD_QOS -9
/**
* Return code: All 65535 MQTT msgids are being used
*/
#define MQTTASYNC_NO_MORE_MSGIDS -10
/**
* A handle representing an MQTT client. A valid client handle is available
* following a successful call to MQTTAsync_create().
*/
typedef void* MQTTAsync;
/**
* A value representing an MQTT message. A token is returned to the
* client application when a message is published. The token can then be used to
* check that the message was successfully delivered to its destination (see
* MQTTAsync_publish(),
* MQTTAsync_publishMessage(),
* MQTTAsync_deliveryComplete(), and
* MQTTAsync_getPendingTokens()).
*/
typedef int MQTTAsync_token;
/**
* A structure representing the payload and attributes of an MQTT message. The
* message topic is not part of this structure (see MQTTAsync_publishMessage(),
* MQTTAsync_publish(), MQTTAsync_receive(), MQTTAsync_freeMessage()
* and MQTTAsync_messageArrived()).
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTM. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The length of the MQTT message payload in bytes. */
int payloadlen;
/** A pointer to the payload of the MQTT message. */
void* payload;
/**
* The quality of service (QoS) assigned to the message.
* There are three levels of QoS:
* <DL>
* <DT><B>QoS0</B></DT>
* <DD>Fire and forget - the message may not be delivered</DD>
* <DT><B>QoS1</B></DT>
* <DD>At least once - the message will be delivered, but may be
* delivered more than once in some circumstances.</DD>
* <DT><B>QoS2</B></DT>
* <DD>Once and one only - the message will be delivered exactly once.</DD>
* </DL>
*/
int qos;
/**
* The retained flag serves two purposes depending on whether the message
* it is associated with is being published or received.
*
* <b>retained = true</b><br>
* For messages being published, a true setting indicates that the MQTT
* server should retain a copy of the message. The message will then be
* transmitted to new subscribers to a topic that matches the message topic.
* For subscribers registering a new subscription, the flag being true
* indicates that the received message is not a new one, but one that has
* been retained by the MQTT server.
*
* <b>retained = false</b> <br>
* For publishers, this ndicates that this message should not be retained
* by the MQTT server. For subscribers, a false setting indicates this is
* a normal message, received as a result of it being published to the
* server.
*/
int retained;
/**
* The dup flag indicates whether or not this message is a duplicate.
* It is only meaningful when receiving QoS1 messages. When true, the
* client application should take appropriate action to deal with the
* duplicate message.
*/
int dup;
/** The message identifier is normally reserved for internal use by the
* MQTT client and server.
*/
int msgid;
} MQTTAsync_message;
#define MQTTAsync_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* receipt of messages. The function is registered with the client library by
* passing it as an argument to MQTTAsync_setCallbacks(). It is
* called by the client library when a new message that matches a client
* subscription has been received from the server. This function is executed on
* a separate thread to the one on which the client application is running.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param topicName The topic associated with the received message.
* @param topicLen The length of the topic if there are one
* more NULL characters embedded in <i>topicName</i>, otherwise <i>topicLen</i>
* is 0. If <i>topicLen</i> is 0, the value returned by <i>strlen(topicName)</i>
* can be trusted. If <i>topicLen</i> is greater than 0, the full topic name
* can be retrieved by accessing <i>topicName</i> as a byte array of length
* <i>topicLen</i>.
* @param message The MQTTAsync_message structure for the received message.
* This structure contains the message payload and attributes.
* @return This function must return a boolean value indicating whether or not
* the message has been safely received by the client application. Returning
* true indicates that the message has been successfully handled.
* Returning false indicates that there was a problem. In this
* case, the client library will reinvoke MQTTAsync_messageArrived() to
* attempt to deliver the message to the application again.
*/
typedef int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* notification of delivery of messages to the server. The function is
* registered with the client library by passing it as an argument to MQTTAsync_setCallbacks().
* It is called by the client library after the client application has
* published a message to the server. It indicates that the necessary
* handshaking and acknowledgements for the requested quality of service (see
* MQTTAsync_message.qos) have been completed. This function is executed on a
* separate thread to the one on which the client application is running.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param token The ::MQTTAsync_token associated with
* the published message. Applications can check that all messages have been
* correctly published by matching the tokens returned from calls to
* MQTTAsync_send() and MQTTAsync_sendMessage() with the tokens passed
* to this callback.
*/
typedef void MQTTAsync_deliveryComplete(void* context, MQTTAsync_token token);
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* notification of the loss of connection to the server. The function is
* registered with the client library by passing it as an argument to
* MQTTAsync_setCallbacks(). It is called by the client library if the client
* loses its connection to the server. The client application must take
* appropriate action, such as trying to reconnect or reporting the problem.
* This function is executed on a separate thread to the one on which the
* client application is running.
* @param context A pointer to the <i>context</i> value originally passed to
* MQTTAsync_setCallbacks(), which contains any application-specific context.
* @param cause The reason for the disconnection.
* Currently, <i>cause</i> is always set to NULL.
*/
typedef void MQTTAsync_connectionLost(void* context, char* cause);
/** The data returned on completion of an unsuccessful API call in the response callback onFailure. */
typedef struct
{
/** A token identifying the failed request. */
MQTTAsync_token token;
/** A numeric code identifying the error. */
int code;
/** Optional text explaining the error. Can be NULL. */
char* message;
} MQTTAsync_failureData;
/** The data returned on completion of a successful API call in the response callback onSuccess. */
typedef struct
{
/** A token identifying the successful request. Can be used to refer to the request later. */
MQTTAsync_token token;
/** A union of the different values that can be returned for subscribe, unsubscribe and publish. */
union
{
/** For subscribe, the granted QoS of the subscription returned by the server. */
int qos;
/** For subscribeMany, the list of granted QoSs of the subscriptions returned by the server. */
int* qosList;
/** For publish, the message being sent to the server. */
struct
{
MQTTAsync_message message;
char* destinationName;
} pub;
} alt;
} MQTTAsync_successData;
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* notification of the successful completion of an API call. The function is
* registered with the client library by passing it as an argument in
* ::MQTTAsync_responseOptions.
* @param context A pointer to the <i>context</i> value originally passed to
* ::MQTTAsync_responseOptions, which contains any application-specific context.
* @param response Any success data associated with the API completion.
*/
typedef void MQTTAsync_onSuccess(void* context, MQTTAsync_successData* response);
/**
* This is a callback function. The client application
* must provide an implementation of this function to enable asynchronous
* notification of the unsuccessful completion of an API call. The function is
* registered with the client library by passing it as an argument in
* ::MQTTAsync_responseOptions.
* @param context A pointer to the <i>context</i> value originally passed to
* ::MQTTAsync_responseOptions, which contains any application-specific context.
* @param response Any failure data associated with the API completion.
*/
typedef void MQTTAsync_onFailure(void* context, MQTTAsync_failureData* response);
typedef struct
{
/** The eyecatcher for this structure. Must be MQTR */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/**
* A pointer to a callback function to be called if the API call successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the API call fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
MQTTAsync_token token; /* output */
} MQTTAsync_responseOptions;
#define MQTTAsync_responseOptions_initializer { {'M', 'Q', 'T', 'R'}, 0, NULL, NULL, 0, 0 }
/**
* This function sets the global callback functions for a specific client.
* If your client application doesn't use a particular callback, set the
* relevant parameter to NULL. Any necessary message acknowledgements and
* status communications are handled in the background without any intervention
* from the client application. If you do not set a messageArrived callback
* function, you will not be notified of the receipt of any messages as a
* result of a subscription.
*
* <b>Note:</b> The MQTT client must be disconnected when this function is
* called.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param context A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to each of the callback functions to
* provide access to the context information in the callback.
* @param cl A pointer to an MQTTAsync_connectionLost() callback
* function. You can set this to NULL if your application doesn't handle
* disconnections.
* @param ma A pointer to an MQTTAsync_messageArrived() callback
* function. You can set this to NULL if your application doesn't handle
* receipt of messages.
* @param dc A pointer to an MQTTAsync_deliveryComplete() callback
* function. You can set this to NULL if you do not want to check
* for successful delivery.
* @return ::MQTTASYNC_SUCCESS if the callbacks were correctly set,
* ::MQTTASYNC_FAILURE if an error occurred.
*/
DLLExport int MQTTAsync_setCallbacks(MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,
MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
/**
* This function creates an MQTT client ready for connection to the
* specified server and using the specified persistent storage (see
* MQTTAsync_persistence). See also MQTTAsync_destroy().
* @param handle A pointer to an ::MQTTAsync handle. The handle is
* populated with a valid client reference following a successful return from
* this function.
* @param serverURI A null-terminated string specifying the server to
* which the client will connect. It takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
* specify either an IP address or a domain name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
* @param clientId The client identifier passed to the server when the
* client connects to it. It is a null-terminated UTF-8 encoded string.
* ClientIDs must be no longer than 23 characters according to the MQTT
* specification.
* @param persistence_type The type of persistence to be used by the client:
* <br>
* ::MQTTCLIENT_PERSISTENCE_NONE: Use in-memory persistence. If the device or
* system on which the client is running fails or is switched off, the current
* state of any in-flight messages is lost and some messages may not be
* delivered even at QoS1 and QoS2.
* <br>
* ::MQTTCLIENT_PERSISTENCE_DEFAULT: Use the default (file system-based)
* persistence mechanism. Status about in-flight messages is held in persistent
* storage and provides some protection against message loss in the case of
* unexpected failure.
* <br>
* ::MQTTCLIENT_PERSISTENCE_USER: Use an application-specific persistence
* implementation. Using this type of persistence gives control of the
* persistence mechanism to the application. The application has to implement
* the MQTTClient_persistence interface.
* @param persistence_context If the application uses
* ::MQTTCLIENT_PERSISTENCE_NONE persistence, this argument is unused and should
* be set to NULL. For ::MQTTCLIENT_PERSISTENCE_DEFAULT persistence, it
* should be set to the location of the persistence directory (if set
* to NULL, the persistence directory used is the working directory).
* Applications that use ::MQTTCLIENT_PERSISTENCE_USER persistence set this
* argument to point to a valid MQTTClient_persistence structure.
* @return ::MQTTASYNC_SUCCESS if the client is successfully created, otherwise
* an error code is returned.
*/
DLLExport int MQTTAsync_create(MQTTAsync* handle, char* serverURI, char* clientId,
int persistence_type, void* persistence_context);
/**
* MQTTAsync_willOptions defines the MQTT "Last Will and Testament" (LWT) settings for
* the client. In the event that a client unexpectedly loses its connection to
* the server, the server publishes the LWT message to the LWT topic on
* behalf of the client. This allows other clients (subscribed to the LWT topic)
* to be made aware that the client has disconnected. To enable the LWT
* function for a specific client, a valid pointer to an MQTTAsync_willOptions
* structure is passed in the MQTTAsync_connectOptions structure used in the
* MQTTAsync_connect() call that connects the client to the server. The pointer
* to MQTTAsync_willOptions can be set to NULL if the LWT function is not
* required.
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTW. */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The LWT topic to which the LWT message will be published. */
char* topicName;
/** The LWT payload. */
char* message;
/**
* The retained flag for the LWT message (see MQTTAsync_message.retained).
*/
int retained;
/**
* The quality of service setting for the LWT message (see
* MQTTAsync_message.qos and @ref qos).
*/
int qos;
} MQTTAsync_willOptions;
#define MQTTAsync_willOptions_initializer { {'M', 'Q', 'T', 'W'}, 0, NULL, NULL, 0, 0 }
/**
* MQTTAsync_sslProperties defines the settings to establish an SSL/TLS connection using the
* OpenSSL library. It covers the following scenarios:
* - Server authentication: The client needs the digital certificate of the server. It is included
* in a store containting trusted material (also known as "trust store").
* - Mutual authentication: Both client and server are authenticated during the SSL handshake. In
* addition to the digital certificate of the server in a trust store, the client will need its own
* digital certificate and the private key used to sign its digital certificate stored in a "key store".
* - Anonymous connection: Both client and server do not get authenticated and no credentials are needed
* to establish an SSL connection. Note that this scenario is not fully secure since it is subject to
* man-in-the-middle attacks.
*/
typedef struct
{
/** The eyecatcher for this structure. Must be MQTS */
char struct_id[4];
/** The version number of this structure. Must be 0 */
int struct_version;
/** The file in PEM format containing the public digital certificates trusted by the client. */
char* trustStore;
/** The file in PEM format containing the public certificate chain of the client. It may also include
* the client's private key.
*/
char* keyStore;
/** If not included in the sslKeyStore, this setting points to the file in PEM format containing
* the client's private key.
*/
char* privateKey;
/** The password to load the client's privateKey if encrypted. */
char* privateKeyPassword;
/**
* The list of cipher suites that the client will present to the server during the SSL handshake. For a
* full explanation of the cipher list format, please see the OpenSSL on-line documentation:
* http://www.openssl.org/docs/apps/ciphers.html#CIPHER_LIST_FORMAT
* If this setting is ommitted, its default value will be "ALL", that is, all the cipher suites -excluding
* those offering no encryption- will be considered.
* This setting can be used to set an SSL anonymous connection ("aNULL" string value, for instance).
*/
char* enabledCipherSuites;
/** True/False option to enable verification of the server certificate **/
int enableServerCertAuth;
} MQTTAsync_SSLOptions;
#define MQTTAsync_SSLOptions_initializer { {'M', 'Q', 'T', 'S'}, 0, NULL, NULL, NULL, NULL, NULL, 1 }
/**
* MQTTAsync_connectOptions defines several settings that control the way the
* client connects to an MQTT server. Default values are set in
* MQTTAsync_connectOptions_initializer.
*/
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1 or 2.
* 0 signifies no SSL options and no serverURIs
* 1 signifies no serverURIs
*/
int struct_version;
/** The "keep alive" interval, measured in seconds, defines the maximum time
* that should pass without communication between the client and the server
* The client will ensure that at least one message travels across the
* network within each keep alive period. In the absence of a data-related
* message during the time period, the client sends a very small MQTT
* "ping" message, which the server will acknowledge. The keep alive
* interval enables the client to detect when the server is no longer
* available without having to wait for the long TCP/IP timeout.
* Set to 0 if you do not want any keep alive processing.
*/
int keepAliveInterval;
/**
* This is a boolean value. The cleansession setting controls the behaviour
* of both the client and the server at connection and disconnection time.
* The client and server both maintain session state information. This
* information is used to ensure "at least once" and "exactly once"
* delivery, and "exactly once" receipt of messages. Session state also
* includes subscriptions created by an MQTT client. You can choose to
* maintain or discard state information between sessions.
*
* When cleansession is true, the state information is discarded at
* connect and disconnect. Setting cleansession to false keeps the state
* information. When you connect an MQTT client application with
* MQTTAsync_connect(), the client identifies the connection using the
* client identifier and the address of the server. The server checks
* whether session information for this client
* has been saved from a previous connection to the server. If a previous
* session still exists, and cleansession=true, then the previous session
* information at the client and server is cleared. If cleansession=false,
* the previous session is resumed. If no previous session exists, a new
* session is started.
*/
int cleansession;
/**
* This controls how many messages can be in-flight simultaneously.
*/
int maxInflight;
/**
* This is a pointer to an MQTTAsync_willOptions structure. If your
* application does not make use of the Last Will and Testament feature,
* set this pointer to NULL.
*/
MQTTAsync_willOptions* will;
/**
* MQTT servers that support the MQTT v3.1 protocol provide authentication
* and authorisation by user name and password. This is the user name
* parameter.
*/
char* username;
/**
* MQTT servers that support the MQTT v3.1 protocol provide authentication
* and authorisation by user name and password. This is the password
* parameter.
*/
char* password;
/**
* The time interval in seconds to allow a connect to complete.
*/
int connectTimeout;
/**
* The time interval in seconds
*/
int retryInterval;
/**
* This is a pointer to an MQTTAsync_SSLOptions structure. If your
* application does not make use of SSL, set this pointer to NULL.
*/
MQTTAsync_SSLOptions* ssl;
/**
* A pointer to a callback function to be called if the connect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the connect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
/**
* The number of entries in the serverURIs array.
*/
int serverURIcount;
/**
* An array of null-terminated strings specifying the servers to
* which the client will connect. Each string takes the form <i>protocol://host:port</i>.
* <i>protocol</i> must be <i>tcp</i> or <i>ssl</i>. For <i>host</i>, you can
* specify either an IP address or a domain name. For instance, to connect to
* a server running on the local machines with the default MQTT port, specify
* <i>tcp://localhost:1883</i>.
*/
char** serverURIs;
} MQTTAsync_connectOptions;
#define MQTTAsync_connectOptions_initializer { {'M', 'Q', 'T', 'C'}, 2, 60, 1, 10, NULL, NULL, NULL, 30, 20, NULL, NULL, 0, NULL}
/**
* This function attempts to connect a previously-created client (see
* MQTTAsync_create()) to an MQTT server using the specified options. If you
* want to enable asynchronous message and status notifications, you must call
* MQTTAsync_setCallbacks() prior to MQTTAsync_connect().
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param options A pointer to a valid MQTTAsync_connectOptions
* structure.
* @return ::MQTTASYNC_SUCCESS if the client connect request was accepted.
* If the client was unable to connect to the server, an error code is
* returned via the onFailure callback, if set.
* Error codes greater than 0 are returned by the MQTT protocol:<br><br>
* <b>1</b>: Connection refused: Unacceptable protocol version<br>
* <b>2</b>: Connection refused: Identifier rejected<br>
* <b>3</b>: Connection refused: Server unavailable<br>
* <b>4</b>: Connection refused: Bad user name or password<br>
* <b>5</b>: Connection refused: Not authorized<br>
* <b>6-255</b>: Reserved for future use<br>
*/
DLLExport int MQTTAsync_connect(MQTTAsync handle, MQTTAsync_connectOptions* options);
typedef struct
{
/** The eyecatcher for this structure. Must be MQTD. */
char struct_id[4];
/** The version number of this structure. Must be 0 or 1. 0 signifies no SSL options */
int struct_version;
/**
* The client delays disconnection for up to this time (in
* milliseconds) in order to allow in-flight message transfers to complete.
*/
int timeout;
/**
* A pointer to a callback function to be called if the disconnect successfully
* completes. Can be set to NULL, in which case no indication of successful
* completion will be received.
*/
MQTTAsync_onSuccess* onSuccess;
/**
* A pointer to a callback function to be called if the disconnect fails.
* Can be set to NULL, in which case no indication of unsuccessful
* completion will be received.
*/
MQTTAsync_onFailure* onFailure;
/**
* A pointer to any application-specific context. The
* the <i>context</i> pointer is passed to success or failure callback functions to
* provide access to the context information in the callback.
*/
void* context;
} MQTTAsync_disconnectOptions;
#define MQTTAsync_disconnectOptions_initializer { {'M', 'Q', 'T', 'D'}, 0, 0, NULL, NULL, NULL }
/**
* This function attempts to disconnect the client from the MQTT
* server. In order to allow the client time to complete handling of messages
* that are in-flight when this function is called, a timeout period is
* specified. When the timeout period has expired, the client disconnects even
* if there are still outstanding message acknowledgements.
* The next time the client connects to the same server, any QoS 1 or 2
* messages which have not completed will be retried depending on the
* cleansession settings for both the previous and the new connection (see
* MQTTAsync_connectOptions.cleansession and MQTTAsync_connect()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param options The client delays disconnection for up to this time (in
* milliseconds) in order to allow in-flight message transfers to complete.
* @return ::MQTTASYNC_SUCCESS if the client successfully disconnects from
* the server. An error code is returned if the client was unable to disconnect
* from the server
*/
DLLExport int MQTTAsync_disconnect(MQTTAsync handle, MQTTAsync_disconnectOptions* options);
/**
* This function allows the client application to test whether or not a
* client is currently connected to the MQTT server.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @return Boolean true if the client is connected, otherwise false.
*/
DLLExport int MQTTAsync_isConnected(MQTTAsync handle);
/**
* This function attempts to subscribe a client to a single topic, which may
* contain wildcards (see @ref wildcard). This call also specifies the
* @ref qos requested for the subscription
* (see also MQTTAsync_subscribeMany()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param topic The subscription topic, which may include wildcards.
* @param qos The requested quality of service for the subscription.
* @param response A pointer to a response options structure. Used to set callback functions.
* @return ::MQTTASYNC_SUCCESS if the subscription request is successful.
* An error code is returned if there was a problem registering the
* subscription.
*/
DLLExport int MQTTAsync_subscribe(MQTTAsync handle, char* topic, int qos, MQTTAsync_responseOptions* response);
/**
* This function attempts to subscribe a client to a list of topics, which may
* contain wildcards (see @ref wildcard). This call also specifies the
* @ref qos requested for each topic (see also MQTTAsync_subscribe()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param count The number of topics for which the client is requesting
* subscriptions.
* @param topic An array (of length <i>count</i>) of pointers to
* topics, each of which may include wildcards.
* @param qos An array (of length <i>count</i>) of @ref qos
* values. qos[n] is the requested QoS for topic[n].
* @param response A pointer to a response options structure. Used to set callback functions.
* @return ::MQTTASYNC_SUCCESS if the subscription request is successful.
* An error code is returned if there was a problem registering the
* subscriptions.
*/
DLLExport int MQTTAsync_subscribeMany(MQTTAsync handle, int count, char** topic, int* qos, MQTTAsync_responseOptions* response);
/**
* This function attempts to remove an existing subscription made by the
* specified client.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param topic The topic for the subscription to be removed, which may
* include wildcards (see @ref wildcard).
* @param response A pointer to a response options structure. Used to set callback functions.
* @return ::MQTTASYNC_SUCCESS if the subscription is removed.
* An error code is returned if there was a problem removing the
* subscription.
*/
DLLExport int MQTTAsync_unsubscribe(MQTTAsync handle, char* topic, MQTTAsync_responseOptions* response);
/**
* This function attempts to remove existing subscriptions to a list of topics
* made by the specified client.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param count The number subscriptions to be removed.
* @param topic An array (of length <i>count</i>) of pointers to the topics of
* the subscriptions to be removed, each of which may include wildcards.
* @param response A pointer to a response options structure. Used to set callback functions.
* @return ::MQTTASYNC_SUCCESS if the subscriptions are removed.
* An error code is returned if there was a problem removing the subscriptions.
*/
DLLExport int MQTTAsync_unsubscribeMany(MQTTAsync handle, int count, char** topic, MQTTAsync_responseOptions* response);
/**
* This function attempts to publish a message to a given topic (see also
* ::MQTTAsync_sendMessage()). An ::MQTTAsync_token is issued when
* this function returns successfully. If the client application needs to
* test for successful delivery of messages, a callback should be set
* (see ::MQTTAsync_onSuccess() and ::MQTTAsync_deliveryComplete()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param destinationName The topic associated with this message.
* @param payloadlen The length of the payload in bytes.
* @param payload A pointer to the byte array payload of the message.
* @param qos The @ref qos of the message.
* @param retained The retained flag for the message.
* @param response A pointer to an ::MQTTAsync_responseOptions structure. Used to set callback functions.
* This is optional and can be set to NULL.
* @return ::MQTTASYNC_SUCCESS if the message is accepted for publication.
* An error code is returned if there was a problem accepting the message.
*/
DLLExport int MQTTAsync_send(MQTTAsync handle, char* destinationName, int payloadlen, void* payload, int qos, int retained,
MQTTAsync_responseOptions* response);
/**
* This function attempts to publish a message to a given topic (see also
* MQTTAsync_publish()). An ::MQTTAsync_token is issued when
* this function returns successfully. If the client application needs to
* test for successful delivery of messages, a callback should be set
* (see ::MQTTAsync_onSuccess() and ::MQTTAsync_deliveryComplete()).
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param destinationName The topic associated with this message.
* @param msg A pointer to a valid MQTTAsync_message structure containing
* the payload and attributes of the message to be published.
* @param response A pointer to an ::MQTTAsync_responseOptions structure. Used to set callback functions.
* @return ::MQTTASYNC_SUCCESS if the message is accepted for publication.
* An error code is returned if there was a problem accepting the message.
*/
DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, char* destinationName, MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
/**
* This function sets a pointer to an array of tokens for
* messages that are currently in-flight (pending completion).
*
* <b>Important note:</b> The memory used to hold the array of tokens is
* malloc()'d in this function. The client application is responsible for
* freeing this memory when it is no longer required.
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param tokens The address of a pointer to an ::MQTTAsync_token.
* When the function returns successfully, the pointer is set to point to an
* array of tokens representing messages pending completion. The last member of
* the array is set to -1 to indicate there are no more tokens. If no tokens
* are pending, the pointer is set to NULL.
* @return ::MQTTASYNC_SUCCESS if the function returns successfully.
* An error code is returned if there was a problem obtaining the list of
* pending tokens.
*/
DLLExport int MQTTAsync_getPendingTokens(MQTTAsync handle, MQTTAsync_token **tokens);
/**
* This function frees memory allocated to an MQTT message, including the
* additional memory allocated to the message payload. The client application
* calls this function when the message has been fully processed. <b>Important
* note:</b> This function does not free the memory allocated to a message
* topic string. It is the responsibility of the client application to free
* this memory using the MQTTAsync_free() library function.
* @param msg The address of a pointer to the ::MQTTAsync_message structure
* to be freed.
*/
DLLExport void MQTTAsync_freeMessage(MQTTAsync_message** msg);
/**
* This function frees memory allocated by the MQTT C client library, especially the
* topic name. This is needed on Windows when the client libary and application
* program have been compiled with different versions of the C compiler. It is
* thus good policy to always use this function when freeing any MQTT C client-
* allocated memory.
* @param ptr The pointer to the client library storage to be freed.
*/
DLLExport void MQTTAsync_free(void* ptr);
/**
* This function frees the memory allocated to an MQTT client (see
* MQTTAsync_create()). It should be called when the client is no longer
* required.
* @param handle A pointer to the handle referring to the ::MQTTAsync
* structure to be freed.
*/
DLLExport void MQTTAsync_destroy(MQTTAsync* handle);
enum MQTTASYNC_TRACE_LEVELS
{
MQTTASYNC_TRACE_MAXIMUM = 1,
MQTTASYNC_TRACE_MEDIUM,
MQTTASYNC_TRACE_MINIMUM,
MQTTASYNC_TRACE_PROTOCOL,
MQTTASYNC_TRACE_ERROR,
MQTTASYNC_TRACE_SEVERE,
MQTTASYNC_TRACE_FATAL,
};
/**
* This function sets the level of trace information which will be
* returned in the trace callback.
* @param level the trace level required
*/
DLLExport void MQTTAsync_setTraceLevel(enum MQTTASYNC_TRACE_LEVELS level);
/**
* This is a callback function prototype which must be implemented if you want
* to receive trace information.
* @param level the trace level of the message returned
* @param meesage the trace message. This is a pointer to a static buffer which
* will be overwritten on each call. You must copy the data if you want to keep
* it for later.
*/
typedef void MQTTAsync_traceCallback(enum MQTTASYNC_TRACE_LEVELS level, char* message);
/**
* This function sets the trace callback if needed. If set to NULL,
* no trace information will be returned. The default trace level is
* MQTTASYNC_TRACE_MINIMUM.
* @param callback a pointer to the function which will handle the trace information
*/
DLLExport void MQTTAsync_setTraceCallback(MQTTAsync_traceCallback* callback);
typedef struct
{
const char* name;
const char* value;
} MQTTAsync_nameValue;
/**
* This function returns version information about the library.
* no trace information will be returned. The default trace level is
* MQTTASYNC_TRACE_MINIMUM
* @return an array of strings describing the library. The last entry is a NULL pointer.
*/
DLLExport MQTTAsync_nameValue* MQTTAsync_getVersionInfo();
/**
* @cond MQTTAsync_main
* @page async Threading
* The client application runs on several threads.
* Processing of handshaking and maintaining
* the network connection is performed in the background.
* Notifications of status and message reception are provided to the client
* application using callbacks registered with the library by the call to
* MQTTAsync_setCallbacks() (see MQTTAsync_messageArrived(),
* MQTTAsync_connectionLost() and MQTTAsync_deliveryComplete()).
*
* @page wildcard Subscription wildcards
* Every MQTT message includes a topic that classifies it. MQTT servers use
* topics to determine which subscribers should receive messages published to
* the server.
*
* Consider the server receiving messages from several environmental sensors.
* Each sensor publishes its measurement data as a message with an associated
* topic. Subscribing applications need to know which sensor originally
* published each received message. A unique topic is thus used to identify
* each sensor and measurement type. Topics such as SENSOR1TEMP,
* SENSOR1HUMIDITY, SENSOR2TEMP and so on achieve this but are not very
* flexible. If additional sensors are added to the system at a later date,
* subscribing applications must be modified to receive them.
*
* To provide more flexibility, MQTT supports a hierarchical topic namespace.
* This allows application designers to organize topics to simplify their
* management. Levels in the hierarchy are delimited by the '/' character,
* such as SENSOR/1/HUMIDITY. Publishers and subscribers use these
* hierarchical topics as already described.
*
* For subscriptions, two wildcard characters are supported:
* <ul>
* <li>A '#' character represents a complete sub-tree of the hierarchy and
* thus must be the last character in a subscription topic string, such as
* SENSOR/#. This will match any topic starting with SENSOR/, such as
* SENSOR/1/TEMP and SENSOR/2/HUMIDITY.</li>
* <li> A '+' character represents a single level of the hierarchy and is
* used between delimiters. For example, SENSOR/+/TEMP will match
* SENSOR/1/TEMP and SENSOR/2/TEMP.</li>
* </ul>
* Publishers are not allowed to use the wildcard characters in their topic
* names.
*
* Deciding on your topic hierarchy is an important step in your system design.
*
* @page qos Quality of service
* The MQTT protocol provides three qualities of service for delivering
* messages between clients and servers: "at most once", "at least once" and
* "exactly once".
*
* Quality of service (QoS) is an attribute of an individual message being
* published. An application sets the QoS for a specific message by setting the
* MQTTAsync_message.qos field to the required value.
*
* A subscribing client can set the maximum quality of service a server uses
* to send messages that match the client subscriptions. The
* MQTTAsync_subscribe() and MQTTAsync_subscribeMany() functions set this
* maximum. The QoS of a message forwarded to a subscriber thus might be
* different to the QoS given to the message by the original publisher.
* The lower of the two values is used to forward a message.
*
* The three levels are:
*
* <b>QoS0, At most once:</b> The message is delivered at most once, or it
* may not be delivered at all. Its delivery across the network is not
* acknowledged. The message is not stored. The message could be lost if the
* client is disconnected, or if the server fails. QoS0 is the fastest mode of
* transfer. It is sometimes called "fire and forget".
*
* The MQTT protocol does not require servers to forward publications at QoS0
* to a client. If the client is disconnected at the time the server receives
* the publication, the publication might be discarded, depending on the
* server implementation.
*
* <b>QoS1, At least once:</b> The message is always delivered at least once.
* It might be delivered multiple times if there is a failure before an
* acknowledgment is received by the sender. The message must be stored
* locally at the sender, until the sender receives confirmation that the
* message has been published by the receiver. The message is stored in case
* the message must be sent again.
*
* <b>QoS2, Exactly once:</b> The message is always delivered exactly once.
* The message must be stored locally at the sender, until the sender receives
* confirmation that the message has been published by the receiver. The
* message is stored in case the message must be sent again. QoS2 is the
* safest, but slowest mode of transfer. A more sophisticated handshaking
* and acknowledgement sequence is used than for QoS1 to ensure no duplication
* of messages occurs.
* @page publish Publication example
@code
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
finished = 1;
}
void onSend(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect;
opts.context = client;
if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
opts.onSuccess = onSend;
opts.context = client;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start sendMessage, return code %d\n", rc);
exit(-1);
}
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
printf("Waiting for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
PAYLOAD, TOPIC, CLIENTID);
while (!finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return rc;
}
* @endcode
* @page subscribe Subscription example
@code
#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int disc_finished = 0;
int subscribed = 0;
int finished = 0;
void connlost(void *context, char *cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
}
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: ");
payloadptr = message->payload;
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
}
void onDisconnect(void* context, MQTTAsync_successData* response)
{
printf("Successful disconnection\n");
disc_finished = 1;
}
void onSubscribe(void* context, MQTTAsync_successData* response)
{
printf("Subscribe succeeded\n");
subscribed = 1;
}
void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
{
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
}
void onConnect(void* context, MQTTAsync_successData* response)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubscribeFailure;
opts.context = client;
deliveredtoken = 0;
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start subscribe, return code %d\n", rc);
exit(-1);
}
}
int main(int argc, char* argv[])
{
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_token token;
int rc;
int ch;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, msgarrvd, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(-1);
}
while (!subscribed)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
if (finished)
goto exit;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(-1);
}
while (!disc_finished)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
exit:
MQTTAsync_destroy(&client);
return rc;
}
* @endcode
* @page tracing Tracing
*
* Runtime tracing can be controlled by environment variables or API calls.
*
* #### Environment variables
*
* Tracing is switched on by setting the MQTT_C_CLIENT_TRACE environment variable.
* A value of ON, or stdout, prints to stdout, any other value is interpreted as a file name to use.
*
* The amount of trace detail is controlled with the MQTT_C_CLIENT_TRACE_LEVEL environment
* variable - valid values are ERROR, PROTOCOL, MINIMUM, MEDIUM and MAXIMUM
* (from least to most verbose).
*
* The variable MQTT_C_CLIENT_TRACE_MAX_LINES limits the number of lines of trace that are output
* to a file. Two files are used at most, when they are full, the last one is overwritten with the
* new trace entries. The default size is 1000 lines.
*
* #### Trace API calls
*
* MQTTAsync_traceCallback() is used to set a callback function which is called whenever trace
* information is available. This will be the same information as that printed if the
* environment variables were used to control the trace.
*
* The MQTTAsync_setTraceLevel() calls is used to set the maximum level of trace entries that will be
* passed to the callback function. The levels are:
* 1. ::MQTTASYNC_TRACE_MAXIMUM
* 2. ::MQTTASYNC_TRACE_MEDIUM
* 3. ::MQTTASYNC_TRACE_MINIMUM
* 4. ::MQTTASYNC_TRACE_PROTOCOL
* 5. ::MQTTASYNC_TRACE_ERROR
* 6. ::MQTTASYNC_TRACE_SEVERE
* 7. ::MQTTASYNC_TRACE_FATAL
*
* Selecting ::MQTTASYNC_TRACE_MAXIMUM will cause all trace entries at all levels to be returned.
* Choosing ::MQTTASYNC_TRACE_ERROR will cause ERROR, SEVERE and FATAL trace entries to be returned
* to the callback function.
*
* ### MQTT Packet Tracing
*
* A feature that can be very useful is printing the MQTT packets that are sent and received. To
* achieve this, use the following environment variable settings:
* @code
MQTT_C_CLIENT_TRACE=ON
MQTT_C_CLIENT_TRACE_LEVEL=PROTOCOL
* @endcode
* The output you should see looks like this:
* @code
20130528 155936.813 3 stdout-subscriber -> CONNECT cleansession: 1 (0)
20130528 155936.813 3 stdout-subscriber <- CONNACK rc: 0
20130528 155936.813 3 stdout-subscriber -> SUBSCRIBE msgid: 1 (0)
20130528 155936.813 3 stdout-subscriber <- SUBACK msgid: 1
20130528 155941.818 3 stdout-subscriber -> DISCONNECT (0)
* @endcode
* where the fields are:
* 1. date
* 2. time
* 3. socket number
* 4. client id
* 5. direction (-> from client to server, <- from server to client)
* 6. packet details
*
* ### Default Level Tracing
*
* This is an extract of a default level trace of a call to connect:
* @code
19700101 010000.000 (1152206656) (0)> MQTTClient_connect:893
19700101 010000.000 (1152206656) (1)> MQTTClient_connectURI:716
20130528 160447.479 Connecting to serverURI localhost:1883
20130528 160447.479 (1152206656) (2)> MQTTProtocol_connect:98
20130528 160447.479 (1152206656) (3)> MQTTProtocol_addressPort:48
20130528 160447.479 (1152206656) (3)< MQTTProtocol_addressPort:73
20130528 160447.479 (1152206656) (3)> Socket_new:599
20130528 160447.479 New socket 4 for localhost, port 1883
20130528 160447.479 (1152206656) (4)> Socket_addSocket:163
20130528 160447.479 (1152206656) (5)> Socket_setnonblocking:73
20130528 160447.479 (1152206656) (5)< Socket_setnonblocking:78 (0)
20130528 160447.479 (1152206656) (4)< Socket_addSocket:176 (0)
20130528 160447.479 (1152206656) (4)> Socket_error:95
20130528 160447.479 (1152206656) (4)< Socket_error:104 (115)
20130528 160447.479 Connect pending
20130528 160447.479 (1152206656) (3)< Socket_new:683 (115)
20130528 160447.479 (1152206656) (2)< MQTTProtocol_connect:131 (115)
* @endcode
* where the fields are:
* 1. date
* 2. time
* 3. thread id
* 4. function nesting level
* 5. function entry (>) or exit (<)
* 6. function name : line of source code file
* 7. return value (if there is one)
*
* ### Memory Allocation Tracing
*
* Setting the trace level to maximum causes memory allocations and frees to be traced along with
* the default trace entries, with messages like the following:
* @code
20130528 161819.657 Allocating 16 bytes in heap at file /home/icraggs/workspaces/mqrtc/mqttv3c/src/MQTTPacket.c line 177 ptr 0x179f930
20130528 161819.657 Freeing 16 bytes in heap at file /home/icraggs/workspaces/mqrtc/mqttv3c/src/MQTTPacket.c line 201, heap use now 896 bytes
* @endcode
* When the last MQTT client object is destroyed, if the trace is being recorded
* and all memory allocated by the client library has not been freed, an error message will be
* written to the trace. This can help with fixing memory leaks. The message will look like this:
* @code
20130528 163909.208 Some memory not freed at shutdown, possible memory leak
20130528 163909.208 Heap scan start, total 880 bytes
20130528 163909.208 Heap element size 32, line 354, file /home/icraggs/workspaces/mqrtc/mqttv3c/src/MQTTPacket.c, ptr 0x260cb00
20130528 163909.208 Content
20130528 163909.209 Heap scan end
* @endcode
* @endcond
*/
#endif