article

Jordan Johnson avatar image
3 Likes"
Jordan Johnson posted Maurizio Giubilato commented

MQTT: Accessing Retained Messages in FlexSim using Python

MQTT is a communication protocol designed for IoT devices. Clients (the devices) connect to a Broker which allows them to publish (send) and subscribe (receive) messages. Each message is associated with a Topic. Each client can choose which topics to use for publishing and subscribing.

Usually, the MQTT broker discards messages once they are sent to all subscribers. However, a publisher can mark a message as "retained." This means the broker will keep the last message for that topic and make it available to other subscribers. Only the most recent message is retained.

When developing a digital twin, you may need to access these retained messages. This article describes how to do that using FlexSim's Python connection. This article is not meant as a comprehensive guide, but as a starting point, which you certainly will modify for your circumstances.

Here are the python scripts and FlexSim model used in this example:

MQTT.zip

A quick note: since MQTT is a device communication protocol, support for MQTT in the Emulation module is in progress. This article describes how to import data from an MQTT broker like any other file/database/http server, rather than connecting for Emulation purposes.

Step 1: Gaining Access to an MQTT Broker

Your facility may already have an MQTT Broker running. In the end, your digital twin will need to connect to that broker to retrieve the latest retained messages.

However, for testing, or if you don't have an MQTT Broker yet, you can easily get one. One possibility is to use Docker to run the EMQX Broker. However, there are dozens of brokers and installation steps

You can download Docker Desktop here: https://www.docker.com/products/docker-desktop/

Once docker is running, you can use the following command in a terminal to download and run the EMQX Broker:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx

Note that this command runs a Linux container, so if Docker is in Windows mode, you'll need to switch before running this command.

Step 2: Publishing Messages to an MQTT Broker

A real facility will have many publishers and subscribers. However, for testing, it can be convenient to create publishers and subscribers that you control.

Once your broker is running, you can use code like the following to create a publishing client:

https://github.com/emqx/MQTT-Client-Examples/blob/master/mqtt-client-Python3/pub_sub_tcp.py

For this example, I created two clients that publish tabular data: one that publishes a set of "raw materials" available for use in the process, and another that publishes a set of "finished goods" available to fulfill orders.

To create some data, run the finished_goods_client.py and raw_materials_client.py files for a few seconds each. Note that both of these clients mark the messages to be retained, so FlexSim can access the latest message, even if neither client is actively running.

Step 3: Writing a Python Script to Retrieve Messages

This approach is demonstrated in fs_client.py, shown here:

import paho.mqtt.subscribe as subscribe
import json

def get_retained_msgs(topics):
    auth = {
       'username': 'emqx',
       'password': 'public', 
    }
    msgs = subscribe.simple(
        topics, qos=0, msg_count=len(topics), retained=True, hostname="localhost",
        port=1883, client_id=None, auth=auth, keepalive=5)
    
    if type(msgs) == list:
        return [json.loads(m.payload) for m in msgs]
    else:
        return [json.loads(msgs.payload)]

if __name__ == "__main__":
    topics = ["python-mqtt/raw-materials", "python-mqtt/finished-goods"]
    print(get_retained_msgs(topics))

Run this script to verify that it returns the latest data from the two publishers.

When FlexSim calls this function, FlexSim can convert python Lists and Dictionaries into FlexSim Arrays and Maps. So you can return complex data structures directly to FlexSim.

Step 4: Writing a User Command to Call the Python Command

In FlexSim, create a new user command, and edit the code. Be sure to toggle the node for external use, and set it to the following:

/**external python: */ /**/"fs_client"/**/
/** \nfunction name:*/ /**/"get_retained_msgs"/**/

To use Python like this, you need python installed on the path. You also need the paho-mqtt package installed globally. Finally, you need to verify that your global preferences indicate the correct version of Python.

For more information on connecting to python functions, see FlexSim's documentation:

https://docs.flexsim.com/en/23.2/Reference/DeveloperAdvancedUser/ConnectingToExternalCode/ConnectingToExternalCode.html

Step 5: Writing a User Command to Write Messages to Global Tables

In this example, the messages we are interested in store tabular data, so it makes sense to store the data in Global Tables:

Array tables = [Table("GlobalTable1"), Table("GlobalTable2")];
Array topics = ["python-mqtt/raw-materials", "python-mqtt/finished-goods"
Array msgs = getRetainedMessages(topics);

for (int m = 1; m <= msgs.length; m++) {
    Table fsTable = tables[m
    // header code elided; see example
    Array msgData = msgs[m].data;
    Table msgTable = Table(msgData);
    msgTable.cloneTo(fsTable);
    // header code elided; see example
}

If you run this new user command, you can see the latest data pulled in to FlexSim.

Conclusion

This example shows just one way you could import data from an MQTT broker into FlexSim. In addition, the kind of data you import or what you do with that data is up to you as the user.

pythondigital twinmqttiot
mqtt.zip (28.6 KiB)
· 1
5 |100000

Up to 12 attachments (including images) can be used with a maximum of 23.8 MiB each and 47.7 MiB total.

Maurizio Giubilato avatar image Maurizio Giubilato commented ·

Jordan,

MQTT connection will be released in FlexSim 2024.0 Emulation

0 Likes 0 ·

Article

Contributors

jordan.johnson contributed to this article