/*
* Copyright (c) 2017 Intel Corporation
*
* SPDX-License-Identifier: Apache-2.0
*/
#include <zephyr/logging/log.h>
LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
#include <zephyr/kernel.h>
#include <zephyr/net/socket.h>
#include <zephyr/net/mqtt.h>
#include <zephyr/random/rand32.h>
#include <string.h>
#include <errno.h>
#include "config.h"
#if defined(CONFIG_USERSPACE)
#include <zephyr/app_memory/app_memdomain.h>
K_APPMEM_PARTITION_DEFINE(app_partition);
struct k_mem_domain app_domain;
#define APP_BMEM K_APP_BMEM(app_partition)
#define APP_DMEM K_APP_DMEM(app_partition)
#else
#define APP_BMEM
#define APP_DMEM
#endif
/* Buffers for MQTT client. */
static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
/* Making RX buffer large enough that the full IPv6 packet can fit into it */
#define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280
/* Websocket needs temporary buffer to store partial packets */
static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN];
#endif
/* The mqtt client struct */
static APP_BMEM struct mqtt_client client_ctx;
/* MQTT Broker details. */
static APP_BMEM struct sockaddr_storage broker;
#if defined(CONFIG_SOCKS)
static APP_BMEM struct sockaddr socks5_proxy;
#endif
static APP_BMEM struct zsock_pollfd fds[1];
static APP_BMEM int nfds;
static APP_BMEM bool connected;
#if defined(CONFIG_MQTT_LIB_TLS)
#include "test_certs.h"
#define TLS_SNI_HOSTNAME "localhost"
#define APP_CA_CERT_TAG 1
#define APP_PSK_TAG 2
static APP_DMEM sec_tag_t m_sec_tags[] = {
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
APP_CA_CERT_TAG,
#endif
#if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
APP_PSK_TAG,
#endif
};
static int tls_init(void)
{
int err = -EINVAL;
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
ca_certificate, sizeof(ca_certificate));
if (err < 0) {
LOG_ERR("Failed to register public certificate: %d", err);
return err;
}
#endif
#if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
client_psk, sizeof(client_psk));
if (err < 0) {
LOG_ERR("Failed to register PSK: %d", err);
return err;
}
err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
client_psk_id, sizeof(client_psk_id) - 1);
if (err < 0) {
LOG_ERR("Failed to register PSK ID: %d", err);
}
#endif
return err;
}
#endif /* CONFIG_MQTT_LIB_TLS */
static void prepare_fds(struct mqtt_client *client)
{
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds[0].fd = client->transport.tcp.sock;
}
#if defined(CONFIG_MQTT_LIB_TLS)
else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
fds[0].fd = client->transport.tls.sock;
}
#endif
fds[0].events = ZSOCK_POLLIN;
nfds = 1;
}
static void clear_fds(void)
{
nfds = 0;
}
static int wait(int timeout)
{
int ret = 0;
if (nfds > 0) {
ret = zsock_poll(fds, nfds, timeout);
if (ret < 0) {
LOG_ERR("poll error: %d", errno);
}
}
return ret;
}
void mqtt_evt_handler(struct mqtt_client *const client,
const struct mqtt_evt *evt)
{
int err;
switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed %d", evt->result);
break;
}
connected = true;
LOG_INF("MQTT client connected!");
break;
case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected %d", evt->result);
connected = false;
clear_fds();
break;
case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error %d", evt->result);
break;
}
LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
break;
case MQTT_EVT_PUBREC:
if (evt->result != 0) {
LOG_ERR("MQTT PUBREC error %d", evt->result);
break;
}
LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);
const struct mqtt_pubrel_param rel_param = {
.message_id = evt->param.pubrec.message_id
};
err = mqtt_publish_qos2_release(client, &rel_param);
if (err != 0) {
LOG_ERR("Failed to send MQTT PUBREL: %d", err);
}
break;
case MQTT_EVT_PUBCOMP:
if (evt->result != 0) {
LOG_ERR("MQTT PUBCOMP error %d", evt->result);
break;
}
LOG_INF("PUBCOMP packet id: %u",
evt->param.pubcomp.message_id);
break;
case MQTT_EVT_PINGRESP:
LOG_INF("PINGRESP packet");
break;
default:
break;
}
}
static char *get_mqtt_payload(enum mqtt_qos qos)
{
#if APP_BLUEMIX_TOPIC
static APP_BMEM char payload[30];
snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
(uint8_t)sys_rand32_get());
#else
static APP_DMEM char payload[] = "DOORS:OPEN_QoSx";
payload[strlen(payload) - 1] = '0' + qos;
#endif
return payload;
}
static char *get_mqtt_topic(void)
{
#if APP_BLUEMIX_TOPIC
return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
"/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
#else
return "sensors";
#endif
}
static int publish(struct mqtt_client *client, enum mqtt_qos qos)
{
struct mqtt_publish_param param;
param.message.topic.qos = qos;
param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
param.message.topic.topic.size =
strlen(param.message.topic.topic.utf8);
param.message.payload.data = get_mqtt_payload(qos);
param.message.payload.len =
strlen(param.message.payload.data);
param.message_id = sys_rand32_get();
param.dup_flag = 0U;
param.retain_flag = 0U;
return mqtt_publish(client, ¶m);
}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
static void broker_init(void)
{
#if defined(CONFIG_NET_IPV6)
struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
broker6->sin6_family = AF_INET6;
broker6->sin6_port = htons(SERVER_PORT);
zsock_inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
#if defined(CONFIG_SOCKS)
struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
proxy6->sin6_family = AF_INET6;
proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
zsock_inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
#endif
#else
struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
broker4->sin_family = AF_INET;
broker4->sin_port = htons(SERVER_PORT);
zsock_inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
#if defined(CONFIG_SOCKS)
struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
proxy4->sin_family = AF_INET;
proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
zsock_inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
#endif
#endif
}
static void client_init(struct mqtt_client *client)
{
mqtt_client_init(client);
broker_init();
/* MQTT client configuration */
client->broker = &broker;
client->evt_cb = mqtt_evt_handler;
client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
client->client_id.size = strlen(MQTT_CLIENTID);
client->password = NULL;
client->user_name = NULL;
client->protocol_version = MQTT_VERSION_3_1_1;
/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);
/* MQTT transport configuration */
#if defined(CONFIG_MQTT_LIB_TLS)
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET;
#else
client->transport.type = MQTT_TRANSPORT_SECURE;
#endif
struct mqtt_sec_config *tls_config = &client->transport.tls.config;
tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
tls_config->hostname = TLS_SNI_HOSTNAME;
#else
tls_config->hostname = NULL;
#endif
#else
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET;
#else
client->transport.type = MQTT_TRANSPORT_NON_SECURE;
#endif
#endif
#if defined(CONFIG_MQTT_LIB_WEBSOCKET)
client->transport.websocket.config.host = SERVER_ADDR;
client->transport.websocket.config.url = "/mqtt";
client->transport.websocket.config.tmp_buf = temp_ws_rx_buf;
client->transport.websocket.config.tmp_buf_len =
sizeof(temp_ws_rx_buf);
client->transport.websocket.timeout = 5 * MSEC_PER_SEC;
#endif
#if defined(CONFIG_SOCKS)
mqtt_client_set_proxy(client, &socks5_proxy,
socks5_proxy.sa_family == AF_INET ?
sizeof(struct sockaddr_in) :
sizeof(struct sockaddr_in6));
#endif
}
/* In this routine we block until the connected variable is 1 */
static int try_to_connect(struct mqtt_client *client)
{
int rc, i = 0;
while (i++ < APP_CONNECT_TRIES && !connected) {
client_init(client);
rc = mqtt_connect(client);
if (rc != 0) {
PRINT_RESULT("mqtt_connect", rc);
k_sleep(K_MSEC(APP_SLEEP_MSECS));
continue;
}
prepare_fds(client);
if (wait(APP_CONNECT_TIMEOUT_MS)) {
mqtt_input(client);
}
if (!connected) {
mqtt_abort(client);
}
}
if (connected) {
return 0;
}
return -EINVAL;
}
static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
{
int64_t remaining = timeout;
int64_t start_time = k_uptime_get();
int rc;
while (remaining > 0 && connected) {
if (wait(remaining)) {
rc = mqtt_input(client);
if (rc != 0) {
PRINT_RESULT("mqtt_input", rc);
return rc;
}
}
rc = mqtt_live(client);
if (rc != 0 && rc != -EAGAIN) {
PRINT_RESULT("mqtt_live", rc);
return rc;
} else if (rc == 0) {
rc = mqtt_input(client);
if (rc != 0) {
PRINT_RESULT("mqtt_input", rc);
return rc;
}
}
remaining = timeout + start_time - k_uptime_get();
}
return 0;
}
#define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
#define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
static int publisher(void)
{
int i, rc, r = 0;
LOG_INF("attempting to connect: ");
rc = try_to_connect(&client_ctx);
PRINT_RESULT("try_to_connect", rc);
SUCCESS_OR_EXIT(rc);
i = 0;
while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
r = -1;
rc = mqtt_ping(&client_ctx);
PRINT_RESULT("mqtt_ping", rc);
SUCCESS_OR_BREAK(rc);
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
SUCCESS_OR_BREAK(rc);
rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
PRINT_RESULT("mqtt_publish", rc);
SUCCESS_OR_BREAK(rc);
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
SUCCESS_OR_BREAK(rc);
rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
PRINT_RESULT("mqtt_publish", rc);
SUCCESS_OR_BREAK(rc);
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
SUCCESS_OR_BREAK(rc);
rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
PRINT_RESULT("mqtt_publish", rc);
SUCCESS_OR_BREAK(rc);
rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
SUCCESS_OR_BREAK(rc);
r = 0;
}
rc = mqtt_disconnect(&client_ctx);
PRINT_RESULT("mqtt_disconnect", rc);
LOG_INF("Bye!");
return r;
}
static int start_app(void)
{
int r = 0, i = 0;
while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
r = publisher();
if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
k_sleep(K_MSEC(5000));
}
}
return r;
}
#if defined(CONFIG_USERSPACE)
#define STACK_SIZE 2048
#if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
#define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
#else
#define THREAD_PRIORITY K_PRIO_PREEMPT(8)
#endif
K_THREAD_DEFINE(app_thread, STACK_SIZE,
start_app, NULL, NULL, NULL,
THREAD_PRIORITY, K_USER, -1);
static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
#endif
int main(void)
{
#if defined(CONFIG_MQTT_LIB_TLS)
int rc;
rc = tls_init();
PRINT_RESULT("tls_init", rc);
#endif
#if defined(CONFIG_USERSPACE)
int ret;
struct k_mem_partition *parts[] = {
#if Z_LIBC_PARTITION_EXISTS
&z_libc_partition,
#endif
&app_partition
};
ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
__ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
ARG_UNUSED(ret);
k_mem_domain_add_thread(&app_domain, app_thread);
k_thread_heap_assign(app_thread, &app_mem_pool);
k_thread_start(app_thread);
k_thread_join(app_thread, K_FOREVER);
#else
exit(start_app());
#endif
return 0;
}