mqtt: FIX: add unsubscribe function and update return codes

MQTT unsubscribe message was not included previously. See ZEP-623.
So, this commit adds the handling functions for the MQTT UNSUBSCRIBE
message.

MQTT high-level API return codes are now unified to only return:
- 0 on success
- -EINVAL when invalid parameters are received
- -EIO on network error

API documentation is also updated to reflect the previous changes.

Jira: ZEP-623

Change-Id: I04d65c303762ce2ecaca73a4f222f0b77fe70503
Signed-off-by: Flavio Santes <flavio.santes@intel.com>
This commit is contained in:
Flavio Santes 2016-08-03 19:41:56 -05:00 committed by Inaky Perez-Gonzalez
commit df22e226f0
2 changed files with 161 additions and 53 deletions

View file

@ -99,31 +99,31 @@ int mqtt_connect(struct mqtt_app_ctx_t *app)
rc = mqtt_pack_connect(tx_buf, app->client); rc = mqtt_pack_connect(tx_buf, app->client);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
rc = netz_tcp(netz_ctx); rc = netz_tcp(netz_ctx);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = netz_tx(netz_ctx, tx_buf); rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = netz_rx(netz_ctx, rx_buf); rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = mqtt_unpack_connack(rx_buf, &session, &conn_ack); rc = mqtt_unpack_connack(rx_buf, &session, &conn_ack);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
if (app->client->clean_session == 1) { if (app->client->clean_session == 1) {
@ -149,14 +149,16 @@ int mqtt_disconnect(struct mqtt_app_ctx_t *app)
rc = mqtt_pack_disconnect(app->tx_buf); rc = mqtt_pack_disconnect(app->tx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
rc = netz_tx(app->netz_ctx, app->tx_buf); rc = netz_tx(app->netz_ctx, app->tx_buf);
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
if (rc != 0) {
return -EIO;
}
return rc; return 0;
} }
@ -174,12 +176,12 @@ static inline int mqtt_publish_qos1(struct mqtt_app_ctx_t *app, uint16_t id)
rc = netz_rx(netz_ctx, rx_buf); rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) { if (rc != 0) {
return rc; return -EIO;
} }
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
if (pkt_type != MQTT_PUBACK) { if (pkt_type != MQTT_PUBACK) {
@ -215,7 +217,7 @@ static inline int mqtt_publish_qos2(struct mqtt_app_ctx_t *app,
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
if (pkt_type != MQTT_PUBREC) { if (pkt_type != MQTT_PUBREC) {
@ -227,7 +229,7 @@ static inline int mqtt_publish_qos2(struct mqtt_app_ctx_t *app,
rc = mqtt_pack_pubrel(tx_buf, dup, pkt_id); rc = mqtt_pack_pubrel(tx_buf, dup, pkt_id);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
rc = netz_tx(netz_ctx, tx_buf); rc = netz_tx(netz_ctx, tx_buf);
@ -242,7 +244,7 @@ static inline int mqtt_publish_qos2(struct mqtt_app_ctx_t *app,
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
if (pkt_type != MQTT_PUBCOMP) { if (pkt_type != MQTT_PUBCOMP) {
@ -280,7 +282,7 @@ int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
rc = mqtt_pack_publish(tx_buf, msg); rc = mqtt_pack_publish(tx_buf, msg);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
rc = netz_tx(netz_ctx, tx_buf); rc = netz_tx(netz_ctx, tx_buf);
@ -297,7 +299,7 @@ int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
rc = mqtt_publish_qos1(app, msg->pkt_id); rc = mqtt_publish_qos1(app, msg->pkt_id);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
break; break;
@ -305,7 +307,7 @@ int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
rc = mqtt_publish_qos2(app, msg->pkt_id); rc = mqtt_publish_qos2(app, msg->pkt_id);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
break; break;
@ -337,25 +339,25 @@ int mqtt_pingreq(struct mqtt_app_ctx_t *app)
rc = mqtt_pack_pingreq(tx_buf); rc = mqtt_pack_pingreq(tx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
rc = netz_tx(netz_ctx, tx_buf); rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = netz_rx(netz_ctx, rx_buf); rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id); rc = mqtt_unpack_ack(rx_buf, &pkt_type, &dup, &rcv_pkt_id);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
if (pkt_type != MQTT_PINGRESP) { if (pkt_type != MQTT_PINGRESP) {
@ -373,12 +375,10 @@ int mqtt_subscribe(struct mqtt_app_ctx_t *app,
struct netz_ctx_t *netz_ctx; struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf; struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf; struct app_buf_t *rx_buf;
int dup = 0;
uint16_t pkt_id; uint16_t pkt_id;
uint16_t rcv_pkt_id; uint16_t rcv_pkt_id;
int granted_qos; int granted_qos;
int dup = 0;
int rc; int rc;
netz_ctx = app->netz_ctx; netz_ctx = app->netz_ctx;
@ -392,25 +392,25 @@ int mqtt_subscribe(struct mqtt_app_ctx_t *app,
rc = mqtt_pack_subscribe(tx_buf, dup, pkt_id, topic, qos); rc = mqtt_pack_subscribe(tx_buf, dup, pkt_id, topic, qos);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
rc = netz_tx(netz_ctx, tx_buf); rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = netz_rx(netz_ctx, rx_buf); rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
rc = mqtt_unpack_suback(rx_buf, &rcv_pkt_id, &granted_qos); rc = mqtt_unpack_suback(rx_buf, &rcv_pkt_id, &granted_qos);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EINVAL;
} }
if (rcv_pkt_id != pkt_id) { if (rcv_pkt_id != pkt_id) {
@ -422,10 +422,60 @@ int mqtt_subscribe(struct mqtt_app_ctx_t *app,
return granted_qos; return granted_qos;
} }
int mqtt_unsubscribe(struct mqtt_app_ctx_t *app, char *topic)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
struct app_buf_t *rx_buf;
uint16_t pkt_id;
uint16_t rcv_pkt_id;
int dup = 0;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
rx_buf = app->rx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
mqtt_next_pktid(app->client, &pkt_id);
rc = mqtt_pack_unsubscribe(tx_buf, dup, pkt_id, topic);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
rc = mqtt_unpack_unsuback(rx_buf, &rcv_pkt_id);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EINVAL;
}
if (rcv_pkt_id != pkt_id) {
nano_sem_give(&app->sem);
return -EINVAL;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_handle_publish(struct mqtt_app_ctx_t *app) int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
{ {
struct mqtt_msg_t msg; struct mqtt_msg_t msg;
uint16_t rcv_pkt_id; uint16_t rcv_pkt_id;
int8_t rcv_pkt_type; int8_t rcv_pkt_type;
uint8_t dup; uint8_t dup;
@ -443,12 +493,12 @@ int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
case MQTT_QoS1: case MQTT_QoS1:
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBACK, msg.pkt_id, 0); rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBACK, msg.pkt_id, 0);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
rc = netz_tx(app->netz_ctx, app->tx_buf); rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) { if (rc != 0) {
return rc; return -EIO;
} }
break; break;
@ -456,23 +506,23 @@ int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
case MQTT_QoS2: case MQTT_QoS2:
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBREC, msg.pkt_id, 0); rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBREC, msg.pkt_id, 0);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
rc = netz_tx(app->netz_ctx, app->tx_buf); rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) { if (rc != 0) {
return rc; return -EIO;
} }
rc = netz_rx(app->netz_ctx, app->rx_buf); rc = netz_rx(app->netz_ctx, app->rx_buf);
if (rc != 0) { if (rc != 0) {
return rc; return -EIO;
} }
rc = mqtt_unpack_ack(app->rx_buf, &rcv_pkt_type, &dup, rc = mqtt_unpack_ack(app->rx_buf, &rcv_pkt_type, &dup,
&rcv_pkt_id); &rcv_pkt_id);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
if (rcv_pkt_type != MQTT_PUBREL) { if (rcv_pkt_type != MQTT_PUBREL) {
@ -485,12 +535,12 @@ int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBCOMP, msg.pkt_id, 0); rc = mqtt_pack_msg(app->tx_buf, MQTT_PUBCOMP, msg.pkt_id, 0);
if (rc != 0) { if (rc != 0) {
return rc; return -EINVAL;
} }
rc = netz_tx(app->netz_ctx, app->tx_buf); rc = netz_tx(app->netz_ctx, app->tx_buf);
if (rc != 0) { if (rc != 0) {
return rc; return -EIO;
} }
break; break;
@ -506,11 +556,36 @@ int mqtt_handle_publish(struct mqtt_app_ctx_t *app)
return 0; return 0;
} }
int mqtt_handle_pingreq(struct mqtt_app_ctx_t *app)
{
struct netz_ctx_t *netz_ctx;
struct app_buf_t *tx_buf;
int rc;
netz_ctx = app->netz_ctx;
tx_buf = app->tx_buf;
nano_sem_take(&app->sem, TICKS_UNLIMITED);
rc = mqtt_pack_pingresp(tx_buf);
if (rc != 0) {
return -EINVAL;
}
rc = netz_tx(netz_ctx, tx_buf);
if (rc != 0) {
nano_sem_give(&app->sem);
return -EIO;
}
nano_sem_give(&app->sem);
return 0;
}
int mqtt_read(struct mqtt_app_ctx_t *app) int mqtt_read(struct mqtt_app_ctx_t *app)
{ {
struct netz_ctx_t *netz_ctx; struct netz_ctx_t *netz_ctx;
struct app_buf_t *rx_buf; struct app_buf_t *rx_buf;
int pkt_type; int pkt_type;
int rc; int rc;
@ -523,7 +598,7 @@ int mqtt_read(struct mqtt_app_ctx_t *app)
rc = netz_rx(netz_ctx, rx_buf); rc = netz_rx(netz_ctx, rx_buf);
if (rc != 0) { if (rc != 0) {
nano_sem_give(&app->sem); nano_sem_give(&app->sem);
return rc; return -EIO;
} }
if (rx_buf->length < 2) { if (rx_buf->length < 2) {
@ -535,6 +610,9 @@ int mqtt_read(struct mqtt_app_ctx_t *app)
rc = -EINVAL; rc = -EINVAL;
/* This switch-case will be used for packet-handling routines
* that will be coded in future releases.
*/
switch (pkt_type) { switch (pkt_type) {
case MQTT_PUBLISH: case MQTT_PUBLISH:
rc = mqtt_handle_publish(app); rc = mqtt_handle_publish(app);
@ -551,6 +629,7 @@ int mqtt_read(struct mqtt_app_ctx_t *app)
case MQTT_UNSUBACK: case MQTT_UNSUBACK:
break; break;
case MQTT_PINGREQ: case MQTT_PINGREQ:
rc = mqtt_handle_pingreq(app);
break; break;
case MQTT_PINGRESP: case MQTT_PINGRESP:
break; break;

View file

@ -160,9 +160,11 @@ int mqtt_network(struct mqtt_app_ctx_t *app, struct netz_ctx_t *netz_ctx);
* @details This function packs and sends the MQTT * @details This function packs and sends the MQTT
* CONNECT message to the server. * CONNECT message to the server.
* @param app MQTT application context. * @param app MQTT application context.
* @return 0 on success.<br> * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function.<br> * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_connect(struct mqtt_app_ctx_t *app); int mqtt_connect(struct mqtt_app_ctx_t *app);
@ -171,9 +173,11 @@ int mqtt_connect(struct mqtt_app_ctx_t *app);
* @details This function packs and sends the MQTT * @details This function packs and sends the MQTT
* DISCONNECT message to the server. * DISCONNECT message to the server.
* @param app MQTT application context. * @param app MQTT application context.
* @return 0 on success.<br> * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function.<br> * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_disconnect(struct mqtt_app_ctx_t *app); int mqtt_disconnect(struct mqtt_app_ctx_t *app);
@ -187,9 +191,11 @@ int mqtt_disconnect(struct mqtt_app_ctx_t *app);
* @param msg MQTT Message structure. * @param msg MQTT Message structure.
* @param qos Message QoS. * @param qos Message QoS.
* @param retained Retained property. * @param retained Retained property.
* @return 0 on success.<br> * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function.<br> * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg, int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
enum mqtt_qos qos, int retained); enum mqtt_qos qos, int retained);
@ -200,8 +206,10 @@ int mqtt_publish(struct mqtt_app_ctx_t *app, struct mqtt_msg_t *msg,
* PINGREQ message to the server. * PINGREQ message to the server.
* @param app MQTT application context. * @param app MQTT application context.
* @return 0 on success. * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function. * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_pingreq(struct mqtt_app_ctx_t *app); int mqtt_pingreq(struct mqtt_app_ctx_t *app);
@ -216,11 +224,30 @@ int mqtt_pingreq(struct mqtt_app_ctx_t *app);
* @param topic Topic to subscribe to. * @param topic Topic to subscribe to.
* @param qos The desired QoS. * @param qos The desired QoS.
* @return 0 on success. * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function. * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_subscribe(struct mqtt_app_ctx_t *app, char *topic, enum mqtt_qos qos); int mqtt_subscribe(struct mqtt_app_ctx_t *app, char *topic, enum mqtt_qos qos);
/**
* @brief mqtt_unsubscribe Sends the MQTT UNSUBSCRIBE message
* @details This functions packs and sends the MQTT
* UNSUBSCRIBE message to the server.
* So far, only one topic is processed by
* message.
* TODO: add an array of topics.
* @param app MQTT application context.
* @param topic Topic to unsubscribe from.
* @return 0 on success.
* @return -EINVAL if an invalid parameter was
* passed as argument or received from the
* server.
* @return -EIO on network error.
*/
int mqtt_unsubscribe(struct mqtt_app_ctx_t *app, char *topic);
/** /**
* @brief mqtt_read Read any received MQTT message * @brief mqtt_read Read any received MQTT message
* @details If a MQTT PUBLISH message is received, * @details If a MQTT PUBLISH message is received,
@ -229,8 +256,10 @@ int mqtt_subscribe(struct mqtt_app_ctx_t *app, char *topic, enum mqtt_qos qos);
* TODO: implement more messages' handlers. * TODO: implement more messages' handlers.
* @param app MQTT application context. * @param app MQTT application context.
* @return 0 on success. * @return 0 on success.
* netz API return codes are also returned * @return -EINVAL if an invalid parameter was
* by this function. * passed as argument or received from the
* server.
* @return -EIO on network error.
*/ */
int mqtt_read(struct mqtt_app_ctx_t *app); int mqtt_read(struct mqtt_app_ctx_t *app);