How to Write an Output Adapter
Introduction
For job packages that produce test output, for example a job that runs a Pytest test suite, the LabScale job runner can parse those results and ingest them into the LabScale service for further analysis. However, since each test framework produces its own style of ouput, LabScale requires the implementer of the test suite to use an output adapter designed to parse the particular output from that test framework and convert it into something readable by LabScale. This guide will show you how to write an ouput adapter written for the Robot OS Test (ROSTest) framework and make it available to your own Robot OS test cases.
Writing the Adapter
An output adapter simply parses a job's output file at the end of execution and produces a resulting JSON structure in a format that understandable by the LabScale service. An adapter can be written in any language, for instance, bash or Python, the only requirement is that the output file be in JSON format that adheres to the LabScale standard. This standard is just a list of dictionaries containing the name of the test case and a result as pass
, fail
, or skip
. Below is an example of the JSON output expected from an adapter:
[
{"testcase": "testcase1_name", "result": "pass"},
{"testcase": "testcase2_name", "result": "fail"},
{"testcase": "testcase3_name", "result": "skip"}
]
The adapter script must also accept two parameters, --input
for the path to the job's log file, and --output
for the file and location where the parsed results will be written.
The Main Function
First, the script must accept the command line arguments --input
and --output
which contain the file paths to the job's log file and a path to the file containing the resulting JSON produced by the adapter, an example for this may look like this:
#!/usr/bin/env python3
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Input file")
parser.add_argument("--output", help="Output file", default="/dev/stdout")
args = parser.parse_args()
Parsing ROSTest output
Once the basic skeleton of the adapter is created, we now need to have it parse the output from ROSTest and translate it into the JSON format understood by the LabScale service. For ROSTest, each test case will produce output in a textual format similar to what is shown below:
[test_package_name.testcase_name/test_name][result_type]
The result_type
in this case can be one of three values, passed
, skipped
, and FAILURE
. With this in mind we can now write a simple regular expression to parse these fields, for example:
#!/usr/bin/env python3
import argparse
import re
# Regex for ROSTest output
RE_ROSTEST_RESULT = r"\[([^\]]+)\]\[(FAILURE|passed|skipped)\]"
# Parse the test case output
def parse_results(results_text):
return re.findall(RE_ROSTEST_RESULT, results_text)
# The main entrypoint of the script
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Input file")
parser.add_argument("--output", help="Output file", default="/dev/stdout")
args = parser.parse_args()
# Open the ROSTest log file and parse its contents
with open(args.input, 'r') as input_file:
matches = rparse_results(input_file.read())
Transforming the results
The next step is to convert the results data into dictionary structure that can be easily ingested by the LabScale service:
#!/usr/bin/env python3
import argparse
import re
# Regex for ROSTest output
RE_ROSTEST_RESULT = r"\[([^\]]+)\]\[(FAILURE|passed|skipped)\]"
# Parse the test case output
def parse_results(results_text):
return re.findall(RE_ROSTEST_RESULT, results_text)
# The mapping betwen ROSTest and LabScale result types
TRANSFORM_MAP = {
"FAILURE": "fail",
"passed": "pass",
"skipped": "skip",
}
# Replace ROSTest result types with LabScale result types
def normalize_result(result):
new_result = TRANSFORM_MAP.get(result)
if not new_result:
raise RuntimeError(f"Unknown ROSTest Response: '{result}'")
return new_result
# Transform test case output into a JSON structure that can be
# easily ingested by the LabScale service.
def transform_results(results_text):
matches = parse_results(results_text)
# Transform results into a dictionary accepted by the LabScale service
return [{"testcase": match[0], "result": normalize_result(match[1])} for match in matches]
# The main entrypoint of the script
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Input file")
parser.add_argument("--output", help="Output file", default="/dev/stdout")
args = parser.parse_args()
# Open the ROSTest log file and parse its contents
with open(args.input, "r") as input_file:
# Transform results into a dictionary accepted by the LabScale service
results = transform_results(input_file.read())
Writing the results
Once the results are transformed into a compatible structure, they will need to be writen to a file:
#!/usr/bin/env python3
import argparse
import json
import re
# Regex for ROSTest output
RE_ROSTEST_RESULT = r"\[([^\]]+)\]\[(FAILURE|passed|skipped)\]"
# Parse the test case output
def parse_results(results_text):
return re.findall(RE_ROSTEST_RESULT, results_text)
# The mapping betwen ROSTest and LabScale result types
TRANSFORM_MAP = {
"FAILURE": "fail",
"passed": "pass",
"skipped": "skip",
}
# Replace ROSTest result types with LabScale result types
def normalize_result(result):
new_result = TRANSFORM_MAP.get(result)
if not new_result:
raise RuntimeError(f"Unknown ROSTest Response: '{result}'")
return new_result
# Transform test case output into a JSON structure that can be
# easily ingested by the LabScale service.
def transform_results(results_text):
matches = parse_results(results_text)
# Transform results into a dictionary accepted by the LabScale service
return [{"testcase": match[0], "result": normalize_result(match[1])} for match in matches]
# The main entrypoint of the script
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True, help="Input file")
parser.add_argument("--output", help="Output file", default="/dev/stdout")
args = parser.parse_args()
# Open the ROSTest log file and parse its contents
with open(args.input, "r") as input_file:
# Transform results into a dictionary accepted by the LabScale service
results = transform_results(input_file.read())
# Write the results to a file
with open(args.output, 'w') as output_file:
json.dump(results, output_file )
That is all there is to it! Now, create a folder named rostest_stdout_adapter
and save the code above to a file in that folder named rostest_stdout_adapter/rostest_sdout_adapter.py
. For good measure, also make the file executable, i.e. chmod +x rostest_stdout_adapter/rostest_sdout_adapter.py
.
Testing the adapter
Now that we have an adapter, we will want to test it. Create a ROSTest log file, this is an example of the output produced by the ROSTest test framework:
[ROSUNIT] Outputting test results to /home/user/projects/ros_example/artifacts/ros/test_results/beginner_tutorials/rostest-test_testAll.xml
[Testcase: testtest_bare_bones] ... ok
[ROSTEST]-----------------------------------------------------------------------
[beginner_tutorials.rosunit-test_bare_bones/test_neg_one_minus_one][FAILURE]----
-2 != -1 : -2 != -2
File "/usr/lib/python3.8/unittest/case.py", line 60, in testPartExecutor
yield
File "/usr/lib/python3.8/unittest/case.py", line 676, in run
self._callTestMethod(testMethod)
File "/usr/lib/python3.8/unittest/case.py", line 633, in _callTestMethod
method()
File "/home/user/projects/ros_example/catkin_ws/src/beginner_tutorials/test/test_add.py", line 33, in test_neg_one_minus_one
self.assertEqual(out, -1, "%s != -2" % out)
File "/usr/lib/python3.8/unittest/case.py", line 912, in assertEqual
assertion_func(first, second, msg=msg)
File "/usr/lib/python3.8/unittest/case.py", line 905, in _baseAssertEqual
raise self.failureException(msg)
--------------------------------------------------------------------------------
[beginner_tutorials.rosunit-test_bare_bones/test_neg_one_plus_one][passed]
[beginner_tutorials.rosunit-test_bare_bones/test_one_minus_one][passed]
[beginner_tutorials.rosunit-test_bare_bones/test_one_plus_one][passed]
SUMMARY
* RESULT: FAIL
* TESTS: 4
* ERRORS: 0
* FAILURES: 1
Copy this text and save it to a file named rostest_output.txt
in the current working folder. Now, on the command line, invoke the adapter providing it with a path to the input file:
python3 rostest_stdout_adapter.py --input ./rostest_output.txt
[{"testcase": "beginner_tutorials.rosunit-test_bare_bones/test_neg_one_minus_one", "result": "fail"}, {"testcase": "beginner_tutorials.rosunit-test_bare_bones/test_neg_one_plus_one", "result": "pass"}, {"testcase": "beginner_tutorials.rosunit-test_bare_bones/test_one_minus_one", "result": "pass"}, {"testcase": "beginner_tutorials.rosunit-test_bare_bones/test_one_plus_one", "result": "pass"}]
Packaging the Adapter
The adapter will now need to be packaged for distribution to the LabScale test runners. The package is a simple gzipped tar file common to Unix platforms that contains the adapter code (see above) and a config.yaml
file that describes the adapter and provides the test runner with the proper command line invocation. An example for this adapter would look like this:
name: rostest_stdout
version: 1.0.0
description: Adapter for rostest using stdout (i.e. does not parse XML logs).
cmd: python3 ${DIR}/rostest_sdout_adapter.py
Copy the above YAML code and save it to a file named rostest_stdout_adapter/config.yaml
in the same directory as the rostest_stdout_adapter.py
. And finally, create the tar package by running the following on the command line:
tar czf rostest_stdout_adapter.tgz rostest_stdout_adapter
This will produce a file named rostest_stdout_adapter.tgz
. the adapter is now ready for use by the test runner!
Attaching the Adapter to a test suite
Now that the output adapter has been created, it must be associated with a test suite or job that will require it. To do this, you must add it to the Job Package's config.yaml file, in this case we will add it to an existing job package that already contains a ROS test case based on the beginner example provided by the ROSTest framework:
name: ros_pubsub_example
version: 1.0.0
cmd: "${DIR}/exec.sh rostest beginner_tutorials testAll.test"
ignore_rc: false
# Declare what output adapter to use
adapter:
name: rostest_stdout_adapter
Make the Adapter known to the Agent
Finally, the agent will need to be informed of the existence of the new adapter and where to find it. This is done by adding an available_adapters
configuration to the agent's labscale-agent/config.yaml
configuration file.
agent_id: xxxxxxxxx
broker_url: ws://app.labscale.com/mqtt
config_version: 0
team_id: xxxxxxxxxx
# Custom output adapter locations
available_adapters:
rostest_stdout_adapter: file:///path/to/rostest_stdout_adapter.tgz
In this example, you will want to replace the /path/to/rostest_stdout_adapter.tgz
with a path to the the adapter you just created and can be replaced by any URL to a location that hosts the adapter.