mqtt to Influxdb with telegraf

Mqtt is a lightweight publish-subscriber protocol for message queues, typically used with IoT devices, but also for monitoring services. As such, messages have typically a ephemeral character. I use several Zigbee sensors together with zigbee2mqtt in our house and I wanted to collect and store all the mqtt messages in a database to keep track of what’s going on. In this blog post I describe how to write all mqtt messages into an InfluxDB database using telegraf.

First we will discuss how telegraf can be used to write mqtt messages to influxdb. Then there is a tutorial on how to implement this as a systemd service using a podman container.

Telegraf as mqtt2influxdb bridge

InfluxDB is often a good choice for mqtt-based messages because it’s a) a natural time series and b) typically already json data following a scheme, even if not a strict one. Measurement of various sensors typically follow this pattern, this makes InfluxDB in my opinion a very good fit for many lab measurements, such as this one (yes it’s our home, but I tread it as a laboratory for those experiments).

Measurements are organized in different buckets, indexed with a unique timestamp and a measurement name with an arbitrary number of fields, which is an exact match for all of the sensors that I have. Each measurement happens at a given time, is identified by a name and can contain multiple readings (“fields”): Take for instance simple temperature/humidity sensor. Every other second it reports it’s reading as one single measurement, containing two different readings: one for the temperature and one for the humidity:

"home/zigbee/livingroom" : {"battery":60,"humidity":60.4,"last_seen":1727811333827,"temperature":21.4}

Everything until here works nicely out of the box with zigbee2mqtt. My task was to bridge the mqtt messages to Influxdb, or in other words, how to write the mqtt messages into an InfluxDB database. I figured that telegraf (a monitoring agent from the same company that also provides InfluxDB) is a capable tool for doing this task. I typically use it as a monitoring agent for many of my server systems, that’s why I came up using telegraf for this task as well.

To see how it works, see the following configuration file telegraf.conf:

# agent configuration (optional)
[agent]
omit_hostname = true

## zigbee2mqtt input
[[inputs.mqtt_consumer]]
servers = ["tcp://192.168.0.101:1883"]
topics = ["home/zigbee/+"]
data_format = "json" # see https://docs.influxdata.com/telegraf/v1/data_formats/input/
json_strict = true

  # take the measurement name from the mqtt topic
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "home/zigbee/+"
  measurement = "_/_/measurement"

## InfluxDB output
[[outputs.influxdb_v2]]
urls = ["http://192.168.0.102:8086"]
token = "<redacted>"
organization = "home"
bucket = "home"

This configuration is all you need to already write the readings from zigbee2mqtt to your InfluxDB database. It expects the input fields to be json formatted, and will parse the measurement name from the topic. For me this works e.g. with the mqtt message we already saw above:

"home/zigbee/livingroom" : {"battery":60,"humidity":60.4,"last_seen":1727811333827,"temperature":21.4}

Here telegraf correctly deduces the measurement name to be the “livingroom” measurement with the readings in json format. InfluxDB can directly work with those, no further processing needed.

Translating multiple metrics into one multi-field metric

I also have a smart meter wifi gateway, pushes the readings from our power smart meter to mqtt. This works great, the readings I’m getting are like the following:

home/smartgateway/dsmr/reading/electricity_delivered_1 21972.75
home/smartgateway/dsmr/reading/electricity_returned_1 2487.19
home/smartgateway/dsmr/reading/electricity_delivered_2 17605.84
home/smartgateway/dsmr/reading/electricity_returned_2 5700.87
home/smartgateway/dsmr/reading/electricity_currently_delivered 0.26
home/smartgateway/dsmr/reading/electricity_currently_returned 0.00

Here every reading is in it’s own topic, and with the above configuration this would result in every single reading being a own measurement. This works, but is not ideal. It would be better to have them all aggregated into one measurement, with the different topics being different fields.

In order to aggregate multiple readings into one single multi-value measurement, telegraf ships the pivot processor plugin. This allows to rotate single values metrics (here e.g. electricity_delivered_1 value=21972.75) into one multi-field metric (e.g. smartgateway electricity_delivered_1=21972.75 electricity_returned_1=2487.19 ...). This means it transforms the following independent readings:

electricity_delivered_1 value=21972.75
electricity_returned_1 value=2487.19
electricity_delivered_2 value=17605.84
electricity_returned_2 value=5700.87
electricity_currently_delivered value=0.26
electricity_currently_returned value=0.00

Into one single measurement:

smartgateway electricity_delivered_1=21972.75,electricity_returned_1=2487.19electricity_delivered_2=17605.84,electricity_returned_2=5700.87,electricity_currently_delivered=0.26,electricity_currently_returned=0.00

This provides an elegant way to better organize the database datastructures. Less clutter and more compact and concise data is something that makes your life much easier in the short and in the long run, at least in my experience.

See the following example telegraf.conf configuration file to achieve this for our smart gateway:

## Smart gateway
[[inputs.mqtt_consumer]]
servers = ["tcp://192.168.0.101:1883"]
topics = ["home/smartgateway/dsmr/reading/electricity_tariff", "home/smartgateway/dsmr/reading/electricity_delivered_1", "home/smartgateway/dsmr/reading/electricity_delivered_2", "home/smartgateway/dsmr/reading/electricity_returned_1", "home/smartgateway/dsmr/reading/electricity_returned_2", "home/smartgateway/dsmr/reading/electricity_currently_delivered", "home/smartgateway/dsmr/reading/electricity_currently_returned", "home/smartgateway/dsmr/consumption/gas/delivered"]
data_format = "value"
data_type = "float"

  # Parse the tariff as int (while the rest is a float value)
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "home/smartgateway/dsmr/reading/electricity_tariff"
  measurement = "_/measurement/_/_/_"
  tags = "_/_/_/_/field"
  data_format = "value"
  data_type = "int"

  # Parse the electricity readings
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "home/smartgateway/dsmr/reading/+"
  measurement = "_/measurement/_/_/_"
  tags = "_/_/_/_/field"

  # Parse the gas readings
  [[inputs.mqtt_consumer.topic_parsing]]
  topic = "home/smartgateway/dsmr/consumption/gas/delivered"
  measurement = "_/measurement/_/_/_/_"
  tags = "_/_/_/_/field/_"

# Aggregate the readings into one measurement
[[processors.pivot]]
  tag_key = "field"
  value_key = "value"

# Insert here: The InfluxDB section

telegraf supports a wide array of configuration options, from different input formats (here we use json but also plain value fields) as well as many mqtt configuration options. The provided example configuration in their documentation are understandable and a useful information resource.

Running mqtt2influxdb as system service using podman

I realized my small mqtt2influxdb gateway/bridge as a simple systemd service, running a podman container. To achieve this you need:

  • A telegraf.conf file (stored in /etc/mqtt2influxdb)
  • podman (although docker works as well, but not as systemd service)
  • The quadlet file described below

quadlets are container template files, that systemd transforms into units via its service generator. Quadlets are cool because they get rid of a lot of boilerplate code. They are nothing special and can also be replaced by a traditional systemd unit file (e.g. as the ones I wrote about here). I implemented my mqtt2influxdb service via the following quadlet configuration in /etc/containers/systemd/mqtt2influxdb.container:

[Unit]
Description=mqtt to influxdb writer

[Container]
# Update version to your liking or use latest
Image=docker.io/telegraf:1.32
ContainerName=mqtt2influxdb
AutoUpdate=registry

Volume=/etc/mqtt2influxdb/telegraf.conf:/etc/telegraf/telegraf.conf:ro
AddCapability=cap_net_raw

[Service]
Restart=always

[Install]
# Start by default on boot
WantedBy=multi-user.target default.target

After storing this, do a systemctl daemon-reload && systemctl start mqtt2influxdb.service. Quadlets don’t need to be enabled, they will be started automatically on boot time.

That’s all what you need for your own mqtt2influxdb gateway. telegraf is a mighty tool and makes this whole task easily achievable, without the need to hack together your own bridge. That will be a project for another rainy day.

Cheers.