mqtt - Examples


publish with luasocket

An adapter with the luasocket API is needed.

local mqtt = require'mqtt311'
local socket = require'socket'  -- https://lunarmodules.github.io/luasocket/

local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT))

local client = mqtt({
    socket = {
        close = function ()
            conn:shutdown()
        end,
        read  = function (_, n)
            return conn:receive(n)
        end,
        write = function (_, data)
            conn:send(data)
        end,
    },
    logger = require'logging.console'({}),
})

client:connect({ clean = true, id = '' })
local connack = client:read()
if connack and connack.rc == 0 then
    client:publish('foo/bar', 'Hello world!', { qos = 1 })
    local puback = client:read()
    client:disconnect()
end
conn:shutdown()

publish with luasec

An adapter with the luasec API is needed.

local mqtt = require'mqtt311'
local socket = require'socket'  -- https://lunarmodules.github.io/luasocket/
local ssl = require'ssl'        -- https://github.com/brunoos/luasec/

local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT_TLS))
conn = ssl.wrap(conn, {
    mode     = 'client',
    protocol = 'any',
    options  = { 'all' },
})
conn:dohandshake()

local client = mqtt({
    socket = {
        close = function ()
            conn:close()
        end,
        read  = function (_, n)
            return conn:receive(n)
        end,
        write = function (_, data)
            conn:send(data)
        end,
    },
    logger = require'logging.console'({}),
})

client:connect({ clean = true })
local connack = client:read()
if connack and connack.rc == 0 then
    client:publish('foo/bar', 'Hello world!', { qos = 1 })
    local puback = client:read()
    client:disconnect()
end
conn:close()

publish with http.websocket

An adapter with the http.websocket API is needed.

local mqtt = require'mqtt311'
local websocket = require'http.websocket'       -- https://daurnimator.github.io/lua-http/

local conn = assert(websocket.new_from_uri('http://test.mosquitto.org:8080/', { 'mqtt' }))
conn:connect(5)

local socket
socket = {
    data  = '',
    pos   = 1,
    close = function ()
        conn:close()
    end,
    read  = function (_, n)
        if socket.pos > #socket.data then
            socket.data = conn:receive(5)
            socket.pos = 1
        end
        local s = string.sub(socket.data, socket.pos, socket.pos + n -1)
        socket.pos = socket.pos + n
        return s ~= '' and s or nil
    end,
    write = function (_, data)
        conn:send(data, 'binary')
    end,
}

local client = mqtt({
    socket = socket,
    logger = require'logging.console'({}),
})

client:connect({ clean = true })
local connack = client:read()
if connack and connack.rc == 0 then
    client:publish('foo/bar', 'Hello world!', { qos = 1 })
    local puback = client:read()
    client:disconnect()
end
conn:close()

subscribe with Copas

Copas (Coroutine Oriented Portable Asynchronous Services) has an event loop which requires non-blocking socket.

local mqtt = require'mqtt311'
local socket = require'socket'  -- https://lunarmodules.github.io/luasocket/
local copas = require'copas'    -- https://lunarmodules.github.io/copas/

local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT))
conn:settimeout(0)              -- non-blocking

local client
client = mqtt({
    socket     = {
        close = function ()
            conn:shutdown()
        end,
        read  = function (_, n)
            return copas.receive(conn, n)
        end,
        write = function (_, data)
            copas.send(conn, data)
        end,
    },
    logger     = require'logging.console'({}),
    on_connect = function (rc)
        if rc == 0 then
            client:subscribe({
                '$SYS/broker/version', 0,
                '$SYS/broker/publish/#', 0,
            })
        end
    end,
    on_message = function (topic, payload)
        print(topic, payload)
    end,
})

copas.addthread(function()
    client:connect({ clean = true })
    local connack = client:read()
    assert(connack.rc == 0)
    while client:read() do
    end
end)

copas.addthread(function()
    while true do
        copas.sleep(15)
        client:ping()
    end
end)

copas.loop()

subscribe with cqueues

Note: cqueues is not available on Windows.

local mqtt = require'mqtt5'
local cqueues = require'cqueues'        -- https://25thandclement.com/~william/projects/cqueues.html
local socket = require'cqueues.socket'

local function new_socket ()
    local sock = socket.connect({
        host      = 'test.mosquitto.org',
        port      = mqtt.PORT_TLS,
        family    = socket.AF_INET,     -- IPv4
        reuseaddr = true,
    })
    sock:setmode('b-', 'bn')            -- binary mode, no output buffering
    sock:connect(5)
    sock:starttls()
    return sock
end

local client
client = mqtt({
    socket     = assert(new_socket()),
    logger     = require'logging.console'({}),
    on_connect = function (rc)
        if rc == 0 then
            client:subscribe({
                '$SYS/broker/version', 0,
                '$SYS/broker/publish/#', 0,
            })
        end
    end,
    on_message = function (topic, payload)
        print(topic, payload)
    end,
    on_error   = function ()
        for _ = 1, 10 do
            cqueues.sleep(2)
            local _, sock = pcall(new_socket)
            if socket.type(sock) == 'socket' then
                client:reconnect(sock)
                return true
            end
        end
        return false
    end,
})

client:connect({
    id       = 'LuaMQTT',
    username = 'guest',
    password = 'guest',
}, {
    mqtt.PROP.SESSION_EXPIRY_INTERVAL, 0xFFFFFFFF,
    mqtt.PROP.MAXIMUM_PACKET_SIZE, 0x3F,
})
local connack = client:read()
assert(connack.rc == 0)

local loop = cqueues.new()

loop:wrap(function()
    while client:read() do
    end
end)

loop:wrap(function()
    while true do
        cqueues.sleep(15)
        client:ping()
    end
end)

assert(loop:loop())

dispatch messages with Rotas

Rotas was designed as a web server router, but it also does this job (just MQTT topic instead of HTTP uri).

local mqtt = require'mqtt5'
local cqueues = require'cqueues'        -- https://25thandclement.com/~william/projects/cqueues.html
local socket = require'cqueues.socket'

local logger = require'logging.console'({})

local conn = assert(socket.connect({
        host      = 'test.mosquitto.org',
        port      = mqtt.PORT_TLS,
        family    = socket.AF_INET,     -- IPv4
        reuseaddr = true,
}))
conn:setmode('b-', 'bn')                -- binary mode, no output buffering
conn:connect(5)
conn:starttls()

local client
client = mqtt({
    socket     = conn,
    logger     = logger,
    on_connect = function (rc)
        if rc == 0 then
            client:subscribe({
                '$SYS/broker/version', 0,
                '$SYS/broker/publish/#', 0,
            })
        end
    end,
    on_message = function (topic, payload, props)
        local app = require'app_rotas'
        local fn = app('MQTT', topic)
        if not fn then
            logger:warn(topic .. ' --> no MQTT handler')
        else
            local status, msg = pcall(fn, topic, payload, props)
            if not status then
                logger:error(topic .. ' --> throw error: ' .. msg)
            end
        end
    end,
})

client:connect({ clean = true })
local connack = client:read()
assert(connack.rc == 0)

local loop = cqueues.new()

loop:wrap(function()
    while client:read() do
    end
end)

assert(loop:loop())
-- file: app_rotas.lua

local rotas = require'Rotas'            -- https://fperrad.frama.io/lua-Rotas/

rotas.http_methods = { 'MQTT' }         -- replace all HTTP verbs

local app = rotas()

local function filter (patt)            -- a matcher for Rotas
    local match = require'mqtt5'.match
    return function (s)
        return match(s, patt)
    end
end

app.MQTT[filter'$SYS/broker/publish/#'] = function (topic, payload)
    print(topic, payload)
end

app.MQTT[filter'$SYS/broker/version'] = function (_, payload)
    print('VERSION', payload)
end

return app