MQTT
- Updated2026-04-22
- 5 minute(s) read
Use Message Queuing Telemetry Transport (MQTT) functions and data types to configure MQTT communication.
Introduced in PAtools 2026 Q2
Example: Using mqttClient and mqttSub
Note For easier use, use mqttClient and mqttSub in a PAscript
graph.
program
section globals
int32 Test_Start.ACT
endsection
section initialization
mqttClient client
mqttSub subText
mqttSub subInt
mqttSub subReal
mqttSub subBuffer
mqttSub subJson
int32 _timeout = 0
int32 _messages_checked = 0
mqttClient:connectionStatus connection
mqttClient:status status
mqttClient:allWritten written
mqttSub:status subStatus
list<mqttSub> subList
text topicIntReceive(20) = "topic/int2"
text topicRealReceive(20) = "topic/real2"
text topicTextReceive(20) = "topic/text2"
text topicBufferReceive(20) = "topic/buffer2"
text topicJsonValueReceive(20) = "topic/jsonValue2"
text topicIntSend(20) = "topic/int1"
text topicRealSend(20) = "topic/real1"
text topicTextSend(20) = "topic/text1"
text topicBufferSend(20) = "topic/buffer1"
text topicJsonValueSend(20) = "topic/jsonValue1"
int32 state = 1
int32 publish = 0
endsection
//addLastWill
if state == 1 and Test_Start.ACT == 1 then
status = client:addLastWill(topicIntSend, 0, mqttClient:QoS:level0, mqttClient:retain:deactive)
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "error adding lastWill int")
endif
status = client:addLastWill(topicRealSend, 0.0, mqttClient:QoS:level0, mqttClient:retain:deactive)
status = client:addLastWill(topicTextSend, "", mqttClient:QoS:level0, mqttClient:retain:deactive)
buffer lastWillBuffer(0)
status = client:addLastWill(topicBufferSend, lastWillBuffer, mqttClient:QoS:level0, mqttClient:retain:deactive)
jsonValue lastWillJson("")
status = client:addLastWill(topicJsonValueSend, lastWillJson, mqttClient:QoS:level0, mqttClient:retain:deactive)
state = 2
endif
//connect
if state == 2 then
status = client:connect("broker")
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "connection error")
endif
state = 3
_timeout = 0
endif
//getConnectionStatus
if state == 3 then
connection = client:getConnectionStatus()
_timeout++
if connection == mqttClient:connectionStatus:connected then
state = 4
_timeout = 0
endif
if _timeout > 100 then
sys:logError(sys:ui | sys:test, "timeout during getConnectionStatus")
state = 100
endif
endif
//subscribe and getStatus
if state == 4 then
subInt = client:subscribe(topicIntReceive, mqttClient:QoS:level1)
if subInt:getStatus() != mqttSub:status:ok then
sys:logError(sys:ui | sys:test, "error subscribe to int")
endif
subText = client:subscribe(topicTextReceive, mqttClient:QoS:level1)
subReal = client:subscribe(topicRealReceive, mqttClient:QoS:level1)
subStatus = subReal:getStatus()
subBuffer = client:subscribe(topicBufferReceive, mqttClient:QoS:level1)
subJson = client:subscribe(topicJsonValueReceive, mqttClient:QoS:level1)
state = 5
endif
//topicName and Lists
if state == 5 then
subList:add(subInt)
subList:add(subText)
list<text> names
names:add(topicIntReceive)
names:add(topicTextReceive)
int32 i = 0
mqttSub sub
foreach sub in subList
text subName(20)
subName = sub:topicName()
text name(20)
name = names[i]
if (subName != name) then
sys:logError(sys:ui | sys:test, "error topic Name not matching: %s %s", subName, name)
endif
i++
endforeach
subList:clear()
state = 6
endif
//publish
if state == 6 then
status = client:publish(topicTextSend, "client2", mqttClient:QoS:level1, mqttClient:retain:active)
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "error publishing text")
endif
int32 i = 2
status = client:publish(topicIntSend, i, mqttClient:QoS:level1, mqttClient:retain:active)
real64 r = 2.2
status = client:publish(topicRealSend, r, mqttClient:QoS:level1, mqttClient:retain:active)
text setBufferText(20) = "hello2"
buffer bufferTopic(20) = setBufferText
status = client:publish(topicBufferSend, bufferTopic, mqttClient:QoS:level1, mqttClient:retain:active)
int32 counter_value = 2
jsonValue a(counter_value)
jsonObject b
b:addOrSetValue("Message",a)
jsonValue value2(b)
status = client:publish(topicJsonValueSend, value2, mqttClient:QoS:level1, mqttClient:retain:active)
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "error publishing json")
endif
state = 7
endif
//checkAllWritten
if state == 7 then
written = client:checkAllWritten()
_timeout++
if written == mqttClient:allWritten:yes then
state = 8
_timeout = 0
endif
if _timeout > 1000 then
sys:logError(sys:ui | sys:test, "timeout checkAllWritten")
state = 9
endif
endif
//newMessages, payloadSize and Gets
if state == 8 then
if subInt:newMessages() > 0 and subInt:payloadSize() == 4 then
int32 intTopic = subInt:getInt32()
if intTopic == 1 then
_messages_checked++
else
sys:logError(sys:ui | sys:test, "error receiving correct int: %d", intTopic)
endif
endif
if subText:newMessages() > 0 and subText:payloadSize() == 7 then
text textTopic(20) = subText:getText()
if textTopic == "client1" then
_messages_checked++
else
sys:logError(sys:ui | sys:test, "error receiving correct text: %s, length: %d", textTopic, textTopic:length)
endif
endif
if subReal:newMessages() > 0 and subReal:payloadSize() == 8 then
real64 realTopic = subReal:getReal64()
if realTopic == 1.1 then
_messages_checked++
else
sys:logError(sys:ui | sys:test, "error receiving correct real: %f", realTopic)
endif
endif
if subJson:newMessages() > 0 and subJson:payloadSize() == 13 then
jsonValue value("")
value = subJson:getJsonValue()
jsonObject object = value:getValueAsJsonObject()
jsonMember member = object:getMember("Message")
value = member:getValue()
int32 jsonInt = value:getValueAsInt32()
if jsonInt == 1 then
_messages_checked++
else
sys:logError(sys:ui | sys:test, "error receiving correct json: %d", jsonInt)
endif
endif
if subBuffer:newMessages() > 0 and subBuffer:payloadSize() == 6 then
buffer bufferReceive(20)
bufferReceive = subBuffer:getBuffer()
text buffertext(30)
buffertext = bufferReceive:read(0, bufferReceive:length)
if buffertext == "hello1" then
_messages_checked++
else
sys:logError(sys:ui | sys:test, "error receiving correct buffer: %s, bufferlength: %d", buffertext, bufferReceive:length)
endif
endif
_timeout++
if _messages_checked == 5 then
state = 9
_timeout = 0
endif
if _timeout > 100 then
sys:logError(sys:ui | sys:test, "timeout check newMessages and Get, payloadSizes: %d %d %d %d %d", subInt:payloadSize(), subReal:payloadSize(), subText:payloadSize(), subBuffer:payloadSize(), subJson:payloadSize())
_timeout = 0
state = 9
endif
endif
//clear Topics
if state == 9 then
if publish == 0 then
status = client:clearTopic(topicTextSend, mqttClient:QoS:level0, mqttClient:retain:active)
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "error clearing text")
endif
status = client:clearTopic(topicIntSend, mqttClient:QoS:level0, mqttClient:retain:active)
status = client:clearTopic(topicRealSend, mqttClient:QoS:level0, mqttClient:retain:active)
status = client:clearTopic(topicBufferSend, mqttClient:QoS:level0, mqttClient:retain:active)
status = client:clearTopic(topicJsonValueSend, mqttClient:QoS:level0, mqttClient:retain:active)
publish = 1
endif
written = client:checkAllWritten()
_timeout++
if written == mqttClient:allWritten:yes then
state = 10
publish = 0
_timeout = 0
endif
if _timeout > 100 then
sys:logError(sys:ui | sys:test, "timeout check allwritten clear")
state = 10
publish = 0
endif
endif
//disconnect
if state == 10 then
status = client:disconnect()
if status != mqttClient:status:ok then
sys:logError(sys:ui | sys:test, "error disconnecting")
endif
state = 11
endif
//check disconnect
if state == 11 then
connection = client:getConnectionStatus()
_timeout++
if connection == mqttClient:connectionStatus:disconnected then
state = 100
_timeout = 0
endif
if _timeout > 100 then
sys:logError(sys:ui | sys:test, "error disconnecting")
state = 100
endif
endif
//done
if state == 100 then
if Test_Start.ACT == 0 then
state = 1
_messages_checked = 0
endif
endif
endprogram
MQTT Quality of Service
In MQTT, the Quality of Service (QoS) levels determine the guarantee of message delivery between the publisher and the subscriber.
MQTT defines the following QoS levels:
If you use Mosquitto as a broker, refer to mosquitto.conf man page for information about configuring QoS in Mosquitto.