Pale blue cloud  

   

   

Request-response message exchange with MQTT and Python

Details

For years I've enjoyed the features and possibilities of professional-grade (i.e. expensive) message-based software in my day to day work. So I was very happy to learn that similar software, albeit with fewer feautures, is available for use in the hobbyist area. I am talking about MQTT, "a lightweight messaging protocol for small sensors and mobile devices, optimized for high-latency or unreliable networks".

MQTT is not exactly new, having been around since 1999, but what makes it very attractive now is the proliferation of small, low cost devices running some form of Linux/Android etc. Libraries are available for many popular programming languages - in this article I'll use the Paho library for Python and I'll talk to the Mosquitto MQTT broker.

MQTT is described as a publish/subscribe message pattern: a client connects to a central message server ('broker'), publishes a message to a topic without knowing who is subscribing to that topic, and then leaves the topic without waiting for an answer. This pattern can be used for example by a remote temperature sensor, publishing its own name and the temperature it measured on a topic where a home automation system (e.g. Domoticz) picks it up for display to a user.

In contrast, the request/response (or request/reply) message pattern is one where a client contacts a remote service with a request and waits for the service to return with an answer. For example: software like Domoticz uses a request/response pattern to obtain weather information from the Weather Undergound website.

Many request/response services are implemented synchronously: while the client is waiting for an answer from the service it's not doing anything else. For example: when your browser requests a website from a server it waits and shows some kind of hour glass or spinner picture while the site loads. When implementing request/reply with a message broker like MQTT, the client can be programmed in such a way that it publishes its request, then goes off doing other stuff while 'simultaneously' waiting for the reply to come back from the service. This is convenient and economic for small devices with few resources, doing a lot of things.

It is possible, and indeed not very difficult, to write a request/response service for MQTT. The examples below were made with the following setup:

  • Raspberry Pi (two actually, but it's perfectly fine to run the MQTT broker, the client and the service on a single device)
  • Python 3
  • Paho MQTT class for Python

 Basic principle

The basic principle for this request/response implementation is as follows:

  1. the service connects to a message broker and subscribes to a topic using a wildcard (e.g. 'services/foo/request/+')
  2. the client connects to the same broker and subscribes to a topic where it expects to receive a response later (e.g. 'services/foo/response/12345'.
  3. the client publishes its request to a topic using the same last hierarchical element as the topic where it is subscribing ('services/foo/request/12345')
  4. the service, subscribing to the wildcard topic, picks up the request and processes it
  5. the service publishes the response to topic 'services/foo/response/12345'
  6. the client receives the response on 'services/foo/response/12345'

A simple time service

The two Python scripts below implement a simple service where a client asks what time it is from a time service.

For simplicity's sake this example does not take the following real world problems into account:

  • exception handling of any sort (service unavailable, broker unavailable, etc.)
  • delay in the response - can the client deal with a time that has become incorrect because there were delays on the network or a slow response by an overworked service?
  • multiple clients using the same 'random' request id

Service implementation

#!/usr/bin/python3
#
# Name:         timeService.py
# Purpose:      Simple time service to demonstrate request/response pattern
#               with MQTT (Mosquitto).
# Author:       Martijn
# Date:         Feb 2016
#
#-----------------------------------------------------------------------------

import configparser
from time import localtime, strftime
import json
import paho.mqtt.client as mqtt

#
# Global variables
#
config = configparser.ConfigParser()
config.read('/home/pi/bin/py.conf')     # Broker connection config.

requestTopic  = 'services/timeservice/request/+'        # Request comes in here. Note wildcard.
responseTopic = 'services/timeservice/response/'        # Response goes here. Request ID will be appended later


#
# Callback that is executed when the client receives a CONNACK response from the server.
#
def onConnect(client, userdata, flags, rc):
   print("Connected with result code " + str(rc))

   # Subscribe on request topic with a single-level wildcard.
   # Subscribing in on_connect() means that if we lose the connection and
   # reconnect then subscriptions will be renewed.
   client.subscribe(requestTopic, 0)    # topic, QoS


#
# Callback that is executed when a message is received.
#
def onMessage(client, userdata, message):
   requestTopic = message.topic
   requestID = requestTopic.split('/')[3]       # obtain requestID as last field from the topic

   print("Received a time request on topic " + requestTopic + ".")


   # Get and format the local time
   lTime = strftime('%H:%M:%S', localtime())

   # Publish the time to the response topic
   client.publish((responseTopic + requestID), payload=lTime, qos=0, retain=False)


#
# Callback that is executed when we disconnect from the broker.
#
def onDisconnect(client, userdata, message):
    print("Disconnected from the broker.")


#-----------------------------------------------------------------------------
# Main
#-----------------------------------------------------------------------------


# Create MQTT client instance
mqttc = mqtt.Client(client_id='raspberrypi', clean_session=True)

mqttc.on_connect = onConnect
mqttc.on_message = onMessage
mqttc.on_disconnect = onDisconnect

# Connect to the broker
mqttc.username_pw_set(config['MQTT']['userMQTT'], password=config['MQTT']['passwdMQTT'])
mqttc.connect(config['MQTT']['hostMQTT'], port=int(config['MQTT']['portMQTT']), keepalive=60, bind_address="")


# This is a blocking form of the network loop and will not return until the client
# calls disconnect(). It automatically handles reconnecting.
mqttc.loop_forever()

# End

Client implementation

#!/usr/bin/python3
#
# Name:         timeClient.py
# Purpose:      Simple time client that requests the time from a remote time
#               service to demonstrate request/response pattern with MQTT
#               (Mosquitto)
# Author:       Martijn
# Date:         Feb 2016
#
#-----------------------------------------------------------------------------

import configparser
import paho.mqtt.client as mqtt
import random
import time


#
# Global variables
#
config = configparser.ConfigParser()
config.read('/home/pi/bin/py.conf')     # Broker connection config.

# Name of this client. Don't use identical client IDs for different clients
clientID='nestcam'
DEBUG = 0


# Subscribe to topic with unique identifier. This is where the
# response will be sent by the service.
requestTopic  = 'services/timeservice/request/'         # Request goes here. Request ID will be appended later
responseTopic = 'services/timeservice/response/'        # Response comes here. Request ID will be appended later


#
# Callback that is executed when the client receives a CONNACK response from the server.
#
def onConnect(client, userdata, flags, rc):
   print("Connected with result code " + str(rc))

#
# Callback that is executed when subscribing to a topic
#
def onSubscribe(client, userdata, mid, granted_qos):
   if DEBUG: print('Subscribed on topic.')


#
# Callback that is executed when a message is received.
# This displays the time from the remote service.
#
def onMessage(client, userdata, message):
   # Decode the payload to get rid of the 'b' prefix and single quotes:
   print('It is ' + str(message.payload.decode("utf-8")))


#
# Callback that is executed when we disconnect from the broker.
#
def onDisconnect(client, userdata, message):
   print("Disconnected from the broker.")


#-----------------------------------------------------------------------------
# Main
#-----------------------------------------------------------------------------


# Create MQTT client instance
mqttc = mqtt.Client(client_id=clientID, clean_session=True)


# Define the callback functions
mqttc.on_connect    = onConnect
mqttc.on_subscribe  = onSubscribe
mqttc.on_message    = onMessage
mqttc.on_disconnect = onDisconnect


# Connect to the broker
mqttc.username_pw_set(config['MQTT']['userMQTT'], password=config['MQTT']['passwdMQTT'])
mqttc.connect(config['MQTT']['hostMQTT'], port=int(config['MQTT']['portMQTT']), keepalive=60, bind_address="")
mqttc.loop_start()


#
# Keep looping, asking for the time every <n> seconds.
#
while True:
   requestID = str(random.randrange(10000))     # Create a new random request (topic) ID

   # Subscribe to the topic where we expect the response
   mqttc.subscribe(responseTopic+requestID, 0)    # topic name, QoS

   # Publish time request on request topic
   # Note that we sent no payload in the request
   mqttc.publish(requestTopic+requestID, qos=0, retain=False)

   time.sleep(2)        # blocking sleep, 2 seconds

   # Unsubscribe from the temporary topics
   mqttc.unsubscribe(responseTopic+requestID)
   mqttc.unsubscribe(requestTopic+requestID)


# Clean up when done
mqttc.loop_stop()
mqttc.disconnect()

# End

Running examples

In the snapshots below, timeClient, running on the host 'nestcam' is asking the time from timeService, running on the host 'raspberrypi'.

timeService

timeClientReferences

   
© Palebluedot