How to Add Metrics to a DIP
The LabScale DIP is the interface to your hardware and is the mechanism by which the agent communicates with it. It not only monitors device status, but it can also produce metrics and send them to the LabScale server where they can be visualized. In this tutorial we will cover adding device metrics (also known as telemetry) to the Example DIP created in the How to Create a DIP guide. If you have not read it, please familiarize yourself with how to build a DIP before continuing.
Terminology
- Agent - A locally running service that executes jobs on behalf of the LabScale service.
- Device - The hardware under test.
- DIP - Device Integration Package, provides hooks required by the agent for probing target devices.
- Host - The machine that an agent is running on.
- OpenTelemetry(OTEL) - An Observability framework and toolkit designed to create and manage telemetry data such as traces, metrics, and logs. We leverage its metrics protocol and Software Development Kit (SDK) for the reporting of results.
Device Metrics and OpenTelemetry
The LabScale agent will call the DIP periodically to get device metrics if the DIP provides the proper get_metrics call-in hook. This hook can be in the form of an executable or a parameter passed to an executable. It really doesn't matter, the only thing that is important is that the data returned from that call conforms to the structure, or schema, required by LabScale. For device metrics, the Agent will only accept the OpenTelemetry Protocol as a single Resource Metric in JSON string format. The agent will automatically wrap this data in a Protobuf and send it to the server. You are not required to use libraries provided by OpenTelemetry, only that the resulting data conforms to the standard OTELP schema, and that certain keys are present in the JSON in order for LabScale to find them.
The Example DIP
We are going to add a get_metrics hook to the Python based Example DIP we created in a previous how to guide. First thing to be aware of is that the OpenTelementry module does not come with Python but must be installed onto the same host as the Agent and DIP. Instead of installing it manually, we can have the agent do this as part of the installation procedure for the DIP. To do this, we want to create an installer script that will install the Python Virtual Environment and all of the resources required to use OpenTelementry, the script may look something like this:
python3 -m venv .
source "./bin/activate"
python3 -m pip install opentelemetry-sdk
python3 -m pip install opentelemetry-exporter-otlp-proto-common
python3 -m pip install protobuf
python3 -m pip install JSON-minify
python3 -m pip install psutil
Save this to a file named install.sh
. Now update the Example DIP
dip.yaml file
and add the dip_install hook, it look something like this:
name: example_dip
version: 1.0.0
commands:
get_status: "python3 ${DIP_ROOT}/dip.py get_status"
get_state: "python3 ${DIP_ROOT}/dip.py get_state"
dip_install: "${DIP_ROOT}/install.sh"
For purposes of development purposes, you should install Open Telemetry modules and start the virtual environment in your workspace by invoking the installer script, like this:
chmod +x ./install.sh
./install.sh
source bin/activate
(example_dip) example_dip %
Add the Get Metrics Hook
Once the appropriate Open Telemetry modules are installed, we can update the DIP
to collect some metrics. In the Example DIP, open the dip.py
file and add the
following code, this is required to create the output compatible with OTELP:
import io
import json
import time
import math
from os import linesep
from os.path import splitext, basename
import psutil
from opentelemetry.metrics import (
get_meter_provider,
set_meter_provider,
CallbackOptions,
Observation,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
PeriodicExportingMetricReader,
ConsoleMetricExporter,
MetricsData,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import encode_metrics
from google.protobuf.json_format import MessageToJson
from json_minify import json_minify
# Format the JSON to conform to the OTELP standard for export.
# For more information about OTel JSON serialization, see:
# https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#json-file-serialization
def json_formatter(metric_data: MetricsData) -> str:
export_metrics_service_request = encode_metrics(metric_data)
json_msg = MessageToJson(message=export_metrics_service_request)
return json_minify(json_msg) + linesep
buf = io.StringIO()
exporter = ConsoleMetricExporter(
out=buf,
formatter=json_formatter,
)
# The PeriodicExportingMetricReader takes the preferred aggregation
# from the passed in exporter
reader = PeriodicExportingMetricReader(
exporter,
export_interval_millis=math.inf,
)
provider = MeterProvider(metric_readers=[reader])
set_meter_provider(provider)
meter = get_meter_provider().get_meter(splitext(basename(__file__))[0], "1.0.0")
Open Telemetry is designed for monitoring and aggregating streams of data from various data sources that are polled in callback functions at a particular frequency. We will need to implement a callback for each kind of metric we want to report. An example for CPU usage, disk usage, memory usage, and uptime are shown below:
# Callback to gather cpu usage
def get_cpu_usage_callback(_: CallbackOptions):
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
attributes = {"cpu_number": int(number)}
yield Observation(percent, attributes)
# Callback to gather disk usage
def get_disk_usage_callback(_: CallbackOptions):
disk_usage = psutil.disk_usage(".")
attributes = {"disk_total": int(disk_usage.total), "disk_used": int(disk_usage.used)}
yield Observation(disk_usage.percent, attributes)
# Callback to gather memory usage
def get_mem_usage_callback(_: CallbackOptions):
mem = psutil.virtual_memory()
attributes = {"mem_total": int(mem.total), "mem_used": int(mem.used)}
yield Observation(mem.percent, attributes)
# Callback to gather uptime
def get_uptime_callback(_: CallbackOptions):
uptime = float(time.time() - psutil.boot_time())
yield Observation(uptime)
We now need to write the get_metrics function where we register these callbacks with Open Telemetry and gather the data, see the example below:
def get_metrics(**kwargs):
meter.create_observable_gauge(
callbacks=[get_cpu_usage_callback],
name="device.cpu.usage",
description="per-cpu usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_disk_usage_callback],
name="device.disk.usage",
description="disk usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_mem_usage_callback],
name="device.mem.usage",
description="memory usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_uptime_callback],
name="device.uptime",
description="uptime",
unit="hours"
)
reader.collect()
return json.loads(buf.getvalue())
Then add this function to the list of available commands in the Example DIP.
if __name__ == "__main__":
from argparse import ArgumentParser
commands = {
"get_metrics": get_metrics, # Add get_metrics
"get_status": get_status,
"get_state": get_state,
}
parser = ArgumentParser()
parser.add_argument("command", nargs=1, choices=commands.keys())
args = parser.parse_args()
data = commands[args.command[0]]()
print(json.dumps(data))
buf.close()
At this point we can test the script to see if it works, the output should look something like this...
(example_dip) example_dip % python ./dip.py get_metrics
{"resourceMetrics":[{"resource":{"attributes":[{"key":"telemetry.sdk.language","value":{"stringValue":"python"}},{"key":"telemetry.sdk.name","value":{"stringValue":"opentelemetry"}},{"key":"telemetry.sdk.version","value":{"stringValue":"1.27.0"}},{"key":"service.name","value":{"stringValue":"unknown_service"}}]},"scopeMetrics":[{"scope":{"name":"dip","version":"1.0.0"},"metrics":[{"name":"device.cpu.usage","description":"per-cpu usage","unit":"%","gauge":{"dataPoints":[{"timeUnixNano":"1730250340528414000","asDouble":56.5,"attributes":[{"key":"cpu_number","value":{"intValue":"0"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"1"}}]},{"timeUnixNano":"1730250340528414000","asDouble":26.1,"attributes":[{"key":"cpu_number","value":{"intValue":"2"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"3"}}]},{"timeUnixNano":"1730250340528414000","asDouble":17.4,"attributes":[{"key":"cpu_number","value":{"intValue":"4"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"5"}}]},{"timeUnixNano":"1730250340528414000","asDouble":13.6,"attributes":[{"key":"cpu_number","value":{"intValue":"6"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"7"}}]},{"timeUnixNano":"1730250340528414000","asDouble":8.7,"attributes":[{"key":"cpu_number","value":{"intValue":"8"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"9"}}]},{"timeUnixNano":"1730250340528414000","asDouble":4.5,"attributes":[{"key":"cpu_number","value":{"intValue":"10"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"11"}}]},{"timeUnixNano":"1730250340528414000","asDouble":4.5,"attributes":[{"key":"cpu_number","value":{"intValue":"12"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"13"}}]},{"timeUnixNano":"1730250340528414000","asDouble":4.3,"attributes":[{"key":"cpu_number","value":{"intValue":"14"}}]},{"timeUnixNano":"1730250340528414000","asDouble":0.0,"attributes":[{"key":"cpu_number","value":{"intValue":"15"}}]}]}},{"name":"device.disk.usage","description":"disk usage","unit":"%","gauge":{"dataPoints":[{"timeUnixNano":"1730250340528414000","asDouble":28.0,"attributes":[{"key":"disk_total","value":{"intValue":"1000240963584"}},{"key":"disk_used","value":{"intValue":"271611240448"}}]}]}},{"name":"device.mem.usage","description":"memory usage","unit":"%","gauge":{"dataPoints":[{"timeUnixNano":"1730250340528414000","asDouble":63.3,"attributes":[{"key":"mem_total","value":{"intValue":"17179869184"}},{"key":"mem_used","value":{"intValue":"9688121344"}}]}]}},{"name":"device.uptime","description":"uptime","unit":"hours","gauge":{"dataPoints":[{"timeUnixNano":"1730250340528414000","asDouble":1649124.5283441544}]}}]}]}]}
Once it is verified that it works, you can add the hook to the dip.yaml
configuration file...
name: example_dip
version: 1.0.0
commands:
get_status: "python3 ${DIP_ROOT}/dip.py get_status"
get_state: "python3 ${DIP_ROOT}/dip.py get_state"
get_metrics: "python3 ${DIP_ROOT}/dip.py get_metrics"
dip_install: "${DIP_ROOT}/install.sh"
That's it! You can now upload the dip to your team account, and when associated with a device in LabScale, you will see the metrics graphs for that device begin to fill with data!
Adding Network Metrics
For network statistics, the LabScale expects the difference in the number of bytes between samples, not the absolute number that most operating systems will provide. Therefore, it is up to the DIP to perform this calculation between samples. Unfortunately, the DIP cannot remember values between consecutive calls because it lives for only the length of the time it takes to complete the operation. To address this, the DIP will need to save state between calls, in this case a file for each device must be read and written to to store and retrieve past and current network metrics.
The example function below can be used to do this:
# Use the temp path already created for the agent
LABSCALE_RUN_TMP = "/tmp/labscale_agent/run"
# Get the device ID from the environment and use it in the filename.
DEVICE_ID = os.environ.get("LS_DIP_device_id", "test")
METRICS_TEMP_PATH = os.path.join(LABSCALE_RUN_TMP, f"device-metrics-{DEVICE_ID}.json")
def calc_netstats(net_bytes_recv: int|None, net_bytes_sent: int|None):
prev_bytes_sent = net_bytes_sent
prev_bytes_recv = net_bytes_recv
if os.path.isfile(METRICS_TEMP_PATH):
try:
with open(METRICS_TEMP_PATH, "r") as f:
data = json.load(f)
prev_bytes_sent = data["bytesSent"]
prev_bytes_recv = data["bytesRecv"]
except:
pass
os.makedirs(os.path.dirname(METRICS_TEMP_PATH), exist_ok=True)
with open(METRICS_TEMP_PATH, "w") as f:
json.dump({
"bytesRecv": prev_bytes_recv if net_bytes_recv is None else net_bytes_recv,
"bytesSent": prev_bytes_sent if net_bytes_sent is None else net_bytes_sent,
}, f)
new_bytes_recv, new_bytes_sent = None, None
if net_bytes_recv is not None:
new_bytes_recv = max(net_bytes_recv - prev_bytes_recv, 0)
if net_bytes_sent is not None:
new_bytes_sent = max(net_bytes_sent - prev_bytes_sent, 0)
return new_bytes_recv, new_bytes_sent
Then to add the network metrics callbacks:
# Callback to gather network stats
def get_net_recv_callback(_: CallbackOptions):
counts = psutil.net_io_counters()
recv, _ = calc_netstats(counts.bytes_recv, None)
yield Observation(recv)
def get_net_sent_callback(_: CallbackOptions):
counts = psutil.net_io_counters()
_, sent = calc_netstats(None, counts.bytes_sent)
yield Observation(sent)
And finally register the callbacks with Open Telemetry in the get_metrics()
function:
def get_metrics():
meter.create_observable_gauge(
callbacks=[get_net_recv_callback],
name="device.net.recv",
description="network bytes received",
unit="bytes"
)
meter.create_observable_gauge(
callbacks=[get_net_sent_callback],
name="device.net.sent",
description="network bytes sent",
unit="bytes"
)
The complete dip.py
from this guide can be viewed here:
Click for the complete dip.py
import io
import json
import math
import os
from os import linesep
from os.path import splitext, basename
import psutil
import time
from opentelemetry.metrics import (
get_meter_provider,
set_meter_provider,
CallbackOptions,
Observation,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import (
PeriodicExportingMetricReader,
ConsoleMetricExporter,
MetricsData,
)
from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import encode_metrics
from google.protobuf.json_format import MessageToJson
from json_minify import json_minify
# Format the JSON to conform to the OTELP standard for export.
# For more information about OTel JSON serialization, see:
# https://opentelemetry.io/docs/specs/otel/protocol/file-exporter/#json-file-serialization
def json_formatter(metric_data: MetricsData) -> str:
export_metrics_service_request = encode_metrics(metric_data)
json_msg = MessageToJson(message=export_metrics_service_request)
return json_minify(json_msg) + linesep
buf = io.StringIO()
exporter = ConsoleMetricExporter(
out=buf,
formatter=json_formatter,
)
# The PeriodicExportingMetricReader takes the preferred aggregation
# from the passed in exporter
reader = PeriodicExportingMetricReader(
exporter,
export_interval_millis=math.inf,
)
provider = MeterProvider(metric_readers=[reader])
set_meter_provider(provider)
meter = get_meter_provider().get_meter(splitext(basename(__file__))[0], "1.0.0")
DEVICE_ID = os.environ.get("LS_DIP_device_id", "test")
LABSCALE_RUN_TMP = "/tmp/labscale_agent/run"
METRICS_TEMP_PATH = os.path.join(LABSCALE_RUN_TMP, f"device-metrics-{DEVICE_ID}.json")
def calc_netstats(net_bytes_recv: int|None, net_bytes_sent: int|None):
prev_bytes_sent = net_bytes_sent
prev_bytes_recv = net_bytes_recv
if os.path.isfile(METRICS_TEMP_PATH):
try:
with open(METRICS_TEMP_PATH, "r") as f:
data = json.load(f)
prev_bytes_sent = data["bytesSent"]
prev_bytes_recv = data["bytesRecv"]
except:
pass
os.makedirs(os.path.dirname(METRICS_TEMP_PATH), exist_ok=True)
with open(METRICS_TEMP_PATH, "w") as f:
json.dump({
"bytesRecv": prev_bytes_recv if net_bytes_recv is None else net_bytes_recv,
"bytesSent": prev_bytes_sent if net_bytes_sent is None else net_bytes_sent,
}, f)
new_bytes_recv, new_bytes_sent = None, None
if net_bytes_recv is not None:
new_bytes_recv = max(net_bytes_recv - prev_bytes_recv, 0)
if net_bytes_sent is not None:
new_bytes_sent = max(net_bytes_sent - prev_bytes_sent, 0)
return new_bytes_recv, new_bytes_sent
def get_cpu_usage_callback(_: CallbackOptions):
for (number, percent) in enumerate(psutil.cpu_percent(percpu=True)):
attributes = {"cpu_number": int(number)}
yield Observation(percent, attributes)
# Callback to gather disk usage
def get_disk_usage_callback(_: CallbackOptions):
disk_usage = psutil.disk_usage(".")
attributes = {"disk_total": int(disk_usage.total), "disk_used": int(disk_usage.used)}
yield Observation(disk_usage.percent, attributes)
# Callback to gather memory usage
def get_mem_usage_callback(_: CallbackOptions):
mem = psutil.virtual_memory()
attributes = {"mem_total": int(mem.total), "mem_used": int(mem.used)}
yield Observation(mem.percent, attributes)
# Callback to gather uptime
def get_uptime_callback(_: CallbackOptions):
uptime = float(time.time() - psutil.boot_time())
yield Observation(uptime)
# Callback to gather network stats
def get_net_recv_callback(_: CallbackOptions):
counts = psutil.net_io_counters()
recv, _ = calc_netstats(counts.bytes_recv, None)
yield Observation(recv)
def get_net_sent_callback(_: CallbackOptions):
counts = psutil.net_io_counters()
_, sent = calc_netstats(None, counts.bytes_sent)
yield Observation(sent)
def get_metrics(**kwargs):
meter.create_observable_gauge(
callbacks=[get_cpu_usage_callback],
name="device.cpu.usage",
description="per-cpu usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_disk_usage_callback],
name="device.disk.usage",
description="disk usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_mem_usage_callback],
name="device.mem.usage",
description="memory usage",
unit="%"
)
meter.create_observable_gauge(
callbacks=[get_uptime_callback],
name="device.uptime",
description="uptime",
unit="hours"
)
meter.create_observable_gauge(
callbacks=[get_net_recv_callback],
name="device.net.recv",
description="network bytes received",
unit="bytes"
)
meter.create_observable_gauge(
callbacks=[get_net_sent_callback],
name="device.net.sent",
description="network bytes sent",
unit="bytes"
)
reader.collect()
return json.loads(buf.getvalue())
def get_state() -> dict:
"""
The get_state call is an invasive function that
probes the device and returns not only status,
but any other bits of platform metadata.
"""
import platform
data = get_status()
data.update({
'serialNumber': platform.node(),
'softwareVersion': platform.release(),
})
return data
def get_status() -> dict:
"""
The get_state function will return an online or offline
state of the virtual device. Since a virtual device is
shell on the local host, we will assume that it will
always be online.
"""
return {"status": "online"}
if __name__ == "__main__":
from argparse import ArgumentParser
commands = {
"get_status": get_status,
"get_state": get_state,
"get_metrics": get_metrics, # Add get_metrics
}
parser = ArgumentParser()
parser.add_argument("command", nargs=1, choices=commands.keys())
args = parser.parse_args()
data = commands[args.command[0]]()
print(json.dumps(data))
buf.close()
Adding Custom Metrics
It is possible to include other metrics beyond the ones presented here by adding more data sources and gauges (OTEL Gauge is the only supported format at the moment). The only requirement is that the metric names be unique and not duplicate the existing metric names from above.
Viewing Metrics
Built-in Metrics
The built-in metrics mentioned in the examples above can be viewed from the
host/device details page (Labs & Devices
-> <lab>
-> Hosts/Devices
->
<host/device>
-> System Metrics
) without additional configuration.
device.cpu.usage
-> cpu_usagedevice.mem.usage
-> memory_usagedevice.disk.usage
-> disk_usagedevice.uptime
-> uptimedevice.net.recv
-> net_bytes_recvdevice.net.sent
-> net_bytes_sent
Custom Metrics
The custom metrics need to be onboarded to the system before they will be
processed and shown in the host/device details page under the Custom Metrics
section.
To onboard a custom metric, go to Admin
-> Custom Metrics
, and add a new
metric definition.
- Name: same as the name instrumented in the DIP.
- Description: this will be shown as the tooltip in the metrics chart.
- units: built-in unit will have pre-defined formatting in the chart.