The custom Starlark MQTT integration attempts to make it easy to connect your current MQTT application or Edge controller towards Neowit and write code that transforms the topics and payloads into something that Neowit understands.
Technical details
Starlark
The integration uses the Starlark language and our extension modules to the language in order to define devices and metrics that can be stored in the Neowit application. The Starlark language is a simple Python subset that should be easy to learn. It is fairly limited, but should contain enough power to deal with normal JSON MQTT payloads.
See .
Connecting to the MQTT broker
The integration settings page will show the connection details you need to use to connect to our MQTT broker. See our page for more details on this.
Troubleshooting
The Starlark implementation is intended for advanced users, and we currently don't have any good ways to help you debug the Starlark code.
If your code does not compile correctly due to syntax errors, the integration will show up as not connected. The built-in editor should give you some hints of what is wrong, but it's not perfect.
If no devices or metrics show up, you're code may be throwing errors. We currently don't have any good way of showing that to you, but feel free to contact support so that we may help you out.
Writing code
Our Starlark executor looks for a function called on_publish with accepts the arguments topic and payload. The topic arguments is the MQTT topic that the payload was published to.
The goal of the code you need to define is to convert the topic and payload into something that Neowit understands using the following modules:
Examples
Simple example
Consider the following JSON payload published to topic mytopic/location1/device1. The code following would create one unique device and publish a temperature value to the Neowit metrics store.
PUBLISH mytopic/location1/device2
{ at: 1719065842, "temperature": 30.3}
# called when MQTT receives a PUBLISH from the app or edge controller
def on_publish(topic, payload):
# the payload is JSON formatted, we need to decode
# it into a starlark data structure.
data = json.decode(payload)
# ignore payloads that doesnt have the expected input
if not "at" in data or not "temperature" in data:
return
# the topic here includes the information we
# need to identity a unique device, we use this
# as our basis. The other attributes are mocked up,
# but could also be defined if the data is available
# in this or other payloads.
external_id = topic.replace("mytopic/", "")
device = devices.Device(
external_id=external_id,
name="My device 1",
vendor="My device vendor",
model="My device model",
status="STATUS_CONNECTED",
status_reason="Received something"
)
# this will register or update the device.
devices.upsert(device)
# publish the metric with the given unix timestamp
# and value on the sensor.TEMP sensor.
series.publish(external_id, data["at"], sensors.TEMP, data["temperature"])
More advanced example
This example decodes a JSON structure and upserts devices and series extracted from the json document.
time_layout = "2006-01-02 15:04:05.000-0700"
##
## Main entry point
##
def on_publish(topic, payload):
data = json.decode(payload)
ts = time.parse_time(data["updated"], time_layout)
if "rooms" in data:
for room in data["rooms"]:
publish_room(ts, room)
if "dampers" in data:
for damper in data["dampers"]:
publish_damper(ts, damper)
##
## Upsert damper device and publish series
##
def publish_damper(ts, damper):
# '564.001-SQ402 Rom 5.1826 .....'
descr = damper["airflowDescription"].split(" ")
if len(descr) < 3:
return
code, room = descr[0], descr[2]
id = 'damper%s@%s' % (code, room)
device = devices.Device(
external_id = id,
name = "Damper: %s %s" % (code, room),
vendor = "My damper vendor",
model = "My damper model",
status = "STATUS_CONNECTED",
status_reason = "OK"
)
devices.upsert(device)
series.publish(id, ts.unix, sensors.AIR_FLOW_CUBIC_HOUR, get_number_or_none(damper, "airflow"))
series.publish(id, ts.unix, sensors.AIR_FLOW_DEMAND_PERCENTAGE, get_number_or_none(damper, "demandInPrc"))
##
## Upsert room device and publish series
##
def publish_room(ts, room):
device = devices.Device(
external_id = room["location"],
name = "Room: %s" % room["location"],
vendor = "My vendor",
model = "My model",
status = "STATUS_CONNECTED",
status_reason = "OK"
)
devices.upsert(device)
series.publish(device.external_id, ts.unix, sensors.TEMP, get_number_or_none(room, "temperature"))
series.publish(device.external_id, ts.unix, sensors.CO2, get_number_or_none(room, "airQuality"))
series.publish(device.external_id, ts.unix, sensors.MOTION_DETECTED, get_bool_or_none(room, "pir"))
series.publish(device.external_id, ts.unix, sensors.LIGHT_LUX, get_number_or_none(room, "lightIntensity"))
series.publish(device.external_id, ts.unix, sensors.AIR_FLOW_DEMAND_PERCENTAGE, get_number_or_none(room, "airDemand"))
series.publish(device.external_id, ts.unix, sensors.SETPOINT_CO2_PPM, get_number_or_none(room, "setpointCo2Actual"))
series.publish(device.external_id, ts.unix, sensors.SETPOINT_TEMPERATURE_C, get_number_or_none(room, "setpointActual"))
##
## Helpers
##
def get_number_or_none(room, key):
if key not in room:
return None
value = room[key]
return None if value == "NULL" else value
def get_bool_or_none(room, key):
if key not in room:
return None
value = room[key]
return None if value == "NULL" else value