mqtt - Examples
publish with luasocket
An adapter with the luasocket API is needed.
local mqtt = require'mqtt311'
local socket = require'socket' -- https://lunarmodules.github.io/luasocket/
local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT))
local client = mqtt({
socket = {
close = function ()
conn:shutdown()
end,
read = function (_, n)
return conn:receive(n)
end,
write = function (_, data)
conn:send(data)
end,
},
logger = require'logging.console'({}),
})
client:connect({ clean = true, id = '' })
local connack = client:read()
if connack and connack.rc == 0 then
client:publish('foo/bar', 'Hello world!', { qos = 1 })
local puback = client:read()
client:disconnect()
end
conn:shutdown()
publish with luasec
An adapter with the luasec API is needed.
local mqtt = require'mqtt311'
local socket = require'socket' -- https://lunarmodules.github.io/luasocket/
local ssl = require'ssl' -- https://github.com/brunoos/luasec/
local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT_TLS))
conn = ssl.wrap(conn, {
mode = 'client',
protocol = 'any',
options = { 'all' },
})
conn:dohandshake()
local client = mqtt({
socket = {
close = function ()
conn:close()
end,
read = function (_, n)
return conn:receive(n)
end,
write = function (_, data)
conn:send(data)
end,
},
logger = require'logging.console'({}),
})
client:connect({ clean = true })
local connack = client:read()
if connack and connack.rc == 0 then
client:publish('foo/bar', 'Hello world!', { qos = 1 })
local puback = client:read()
client:disconnect()
end
conn:close()
publish with http.websocket
An adapter with the http.websocket API is needed.
local mqtt = require'mqtt311'
local websocket = require'http.websocket' -- https://daurnimator.github.io/lua-http/
local conn = assert(websocket.new_from_uri('http://test.mosquitto.org:8080/', { 'mqtt' }))
conn:connect(5)
local socket
socket = {
data = '',
pos = 1,
close = function ()
conn:close()
end,
read = function (_, n)
if socket.pos > #socket.data then
socket.data = conn:receive(5)
socket.pos = 1
end
local s = string.sub(socket.data, socket.pos, socket.pos + n -1)
socket.pos = socket.pos + n
return s ~= '' and s or nil
end,
write = function (_, data)
conn:send(data, 'binary')
end,
}
local client = mqtt({
socket = socket,
logger = require'logging.console'({}),
})
client:connect({ clean = true })
local connack = client:read()
if connack and connack.rc == 0 then
client:publish('foo/bar', 'Hello world!', { qos = 1 })
local puback = client:read()
client:disconnect()
end
conn:close()
subscribe with Copas
Copas (Coroutine Oriented Portable Asynchronous Services) has an event loop which requires non-blocking socket.
local mqtt = require'mqtt311'
local socket = require'socket' -- https://lunarmodules.github.io/luasocket/
local copas = require'copas' -- https://lunarmodules.github.io/copas/
local conn = assert(socket.connect('test.mosquitto.org', mqtt.PORT))
conn:settimeout(0) -- non-blocking
local client
client = mqtt({
socket = {
close = function ()
conn:shutdown()
end,
read = function (_, n)
return copas.receive(conn, n)
end,
write = function (_, data)
copas.send(conn, data)
end,
},
logger = require'logging.console'({}),
on_connect = function (rc)
if rc == 0 then
client:subscribe({
'$SYS/broker/version', 0,
'$SYS/broker/publish/#', 0,
})
end
end,
on_message = function (topic, payload)
print(topic, payload)
end,
})
copas.addthread(function()
client:connect({ clean = true })
local connack = client:read()
assert(connack.rc == 0)
while client:read() do
end
end)
copas.addthread(function()
while true do
copas.sleep(15)
client:ping()
end
end)
copas.loop()
subscribe with cqueues
Note: cqueues is not available on Windows.
local mqtt = require'mqtt5'
local cqueues = require'cqueues' -- https://25thandclement.com/~william/projects/cqueues.html
local socket = require'cqueues.socket'
local function new_socket ()
local sock = socket.connect({
host = 'test.mosquitto.org',
port = mqtt.PORT_TLS,
family = socket.AF_INET, -- IPv4
reuseaddr = true,
})
sock:setmode('b-', 'bn') -- binary mode, no output buffering
sock:connect(5)
sock:starttls()
return sock
end
local client
client = mqtt({
socket = assert(new_socket()),
logger = require'logging.console'({}),
on_connect = function (rc)
if rc == 0 then
client:subscribe({
'$SYS/broker/version', 0,
'$SYS/broker/publish/#', 0,
})
end
end,
on_message = function (topic, payload)
print(topic, payload)
end,
on_error = function ()
for _ = 1, 10 do
cqueues.sleep(2)
local _, sock = pcall(new_socket)
if socket.type(sock) == 'socket' then
client:reconnect(sock)
return true
end
end
return false
end,
})
client:connect({
id = 'LuaMQTT',
username = 'guest',
password = 'guest',
}, {
mqtt.PROP.SESSION_EXPIRY_INTERVAL, 0xFFFFFFFF,
mqtt.PROP.MAXIMUM_PACKET_SIZE, 0x3F,
})
local connack = client:read()
assert(connack.rc == 0)
local loop = cqueues.new()
loop:wrap(function()
while client:read() do
end
end)
loop:wrap(function()
while true do
cqueues.sleep(15)
client:ping()
end
end)
assert(loop:loop())
dispatch messages with Rotas
Rotas was designed as a web server router, but it also does this job (just MQTT topic instead of HTTP uri).
local mqtt = require'mqtt5'
local cqueues = require'cqueues' -- https://25thandclement.com/~william/projects/cqueues.html
local socket = require'cqueues.socket'
local logger = require'logging.console'({})
local conn = assert(socket.connect({
host = 'test.mosquitto.org',
port = mqtt.PORT_TLS,
family = socket.AF_INET, -- IPv4
reuseaddr = true,
}))
conn:setmode('b-', 'bn') -- binary mode, no output buffering
conn:connect(5)
conn:starttls()
local client
client = mqtt({
socket = conn,
logger = logger,
on_connect = function (rc)
if rc == 0 then
client:subscribe({
'$SYS/broker/version', 0,
'$SYS/broker/publish/#', 0,
})
end
end,
on_message = function (topic, payload, props)
local app = require'app_rotas'
local fn = app('MQTT', topic)
if not fn then
logger:warn(topic .. ' --> no MQTT handler')
else
local status, msg = pcall(fn, topic, payload, props)
if not status then
logger:error(topic .. ' --> throw error: ' .. msg)
end
end
end,
})
client:connect({ clean = true })
local connack = client:read()
assert(connack.rc == 0)
local loop = cqueues.new()
loop:wrap(function()
while client:read() do
end
end)
assert(loop:loop())
-- file: app_rotas.lua
local rotas = require'Rotas' -- https://fperrad.frama.io/lua-Rotas/
rotas.http_methods = { 'MQTT' } -- replace all HTTP verbs
local app = rotas()
local function filter (patt) -- a matcher for Rotas
local match = require'mqtt5'.match
return function (s)
return match(s, patt)
end
end
app.MQTT[filter'$SYS/broker/publish/#'] = function (topic, payload)
print(topic, payload)
end
app.MQTT[filter'$SYS/broker/version'] = function (_, payload)
print('VERSION', payload)
end
return app