Creating a data stream from NIST manufacturing lab data – Part 1
The Industry Experiences team has recently published a solution guide for extracting insights from existing IoT data. The solution consists of the following high-level components.
- Ingest data
- Hot path processing
- Cold path processing
- Analytics clients
This is the first of a series of blogs that go through those components in detail. We start with ingesting data into the solution and creating a data stream.
The NIST data
The solution uses the data published by US National Institute of Standards and Technology Smart Manufacturing Systems test bed that exposes the manufacturing lab’s data. We use the data from the lab's volatile data stream (VDS).
The VDS is implemented using the MTConnect standard. The standard offers a semantic vocabulary for manufacturing equipment. It defines a protocol to communicate with an MTConnect agent and the schema for the data returned.
The agent and its methods
The agent exposes an HTTP endpoint that supports the following API operations:
- probe: Returns the list of devices, components, subcomponents and data items.
- current: Returns a snapshot of the data item's most recent values or the state of the device at a point in time.
- sample: Returns Samples, Events and Condition in time series.
- asset: Most recent state of an asset known to this device.
An example of the protocol flow for this solution follows.
The solution needs to poll the VDS endpoint. Once it receives the data stream, it can post the data records on the data stream in name value pairs to Azure Event Hubs. Then we can process the stream with a stream processing technology on the other end.
The ingestion component
The ingestion component does the following tasks:
- Polls the VDS endpoint.
- Saves the raw response as XML to Azure blob storage.
- Transforms the hierarchical XML result into a flat data records structure, with name-value pairs and time stamps.
- Posts each record to an Azure Event Hub. A different Event Hub is used for each message type.
There are many Azure service options for implementing the ingestion component. All of the following services can poll an HTTP end-point, save the raw data to an interim storage solution and post the transformed data to Azure Event Hubs.
- Azure Logic Apps
- Custom code running on
I chose to use Azure Logic Apps, because it is very easy to setup a very simple workflow, make a few HTTP calls against the VDS endpoint, call the necessary REST operations, and continue polling during the scheduled interval. Azure Logic Apps provides more than 200 connectors for performing tasks with data. Since I would like to store the raw response to a data store, the Azure Blob Storage connector is the easiest solution.
Two tasks remain. The component must transform the XML payload to a flat data records structure, and post the results to Azure Event Hubs.
The response data is formatted as an MTConnectStreams document. Find the XSD schemas for various MTConnect documents. A closer look at the result document’s schema reveals that polymorphism is used extensively in the documents. For example, an element with the name temperature is of type TemperatureType which is polymorphic, using the XML feature Substitution Groups.
There is a Transform XML connector for Logic Apps. A quick look at this connector reveals the need for developing a map using the Enterprise Integration Pack. Although it is possible to develop a map, that activity may turn out to be a lot of effort when comparing it to doing the same with custom code.
I developed a MTConnect client during one of my fun coding weekends. The component uses Plain Old CLR Object (POCO) classes. The classes were generated using the Visual Studio’s xsd.exe tool. Although the generated code is not capable of de-serializing the polymorphic XML, modifying it to succeed is easy. Simply apply the XmlIncludeAttribute to the polymorphic generated classes.
I decoupled the following two tasks. Polling the VDS endpoint and saving the raw data, and transforming then posting the result to the Event Hub. Such a design moves the more complicated task of parsing and transforming the result, to an easier environment to work in which helps develop, debug, and maintain. The resulting design consists of two microservices. The first is developed as a Logic App, managed by the Azure Logic Apps service. The second custom code is deployed to an Azure Container Instances service. I use a Storage queue for communication between the two microservices.
Implementation of the Logic App
Going through the full implementation of the Logic App portion of the component may take a lot of space and repetition. I will highlight only the parts that need extra attention. The implementation follows the following flow chart. The full source code can be found on GitHub.
Logic Apps can use built-in functions for de-serializing and serializing the JSON and XML contained in the workflow definition language. The raw results are stored as JSON for optimizing the storage requirements. The JSON representation of the same data takes less space. XML tags are more verbose. The result of the current and sample requests is saved into variables using the following formula. If you are using the GUI for editing the workflow, you can use the first line. To edit in code view, make sure the function starts with @ so the interpreter can work:
json(replace(string(json(xml(trim(body('Current'))))), '"@', '"')) "value": "@json(replace(string(json(xml(trim(body('Current'))))), '"@', '"'))"
The key is to de-serialize the XML first, then serialize the result to JSON. This operation creates a JSON string with the attribute names prepended with the “@” character. The activity converts the JSON to a string. Remove the “@” and make a new JSON from the result.
The original XML looks like the following.
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="/styles/Streams.xsl"?> <MTConnectStreams xmlns:m="urn:mtconnect.org:MTConnectStreams:1.3" xmlns="urn:mtconnect.org:MTConnectStreams:1.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:x="urn:nist.gov:NistStreams:1.3" xsi:schemaLocation="urn:nist.gov:NistStreams:1.3 /schemas/NistStreams_1.3.xsd"> <Header creationTime="2018-08-24T22:02:06Z" sender="5c182aaf5077" instanceId="1535115106" version="1.3.0.18" bufferSize="1048576" nextSequence="438473124" firstSequence="437424548" lastSequence="438473123"/> <Streams> <DeviceStream name="GFAgie01" uuid="mtc_adapter001"> <ComponentStream component="Device" name="GFAgie01" componentId="GFAgie01"> <Events> <Availability dataItemId="GFAgie01-dtop_1" timestamp="2018-08-24T22:02:06.397326" name="avail" sequence="438473077">UNAVAILABLE</Availability> <EmergencyStop dataItemId="GFAgie01-dtop_2" timestamp="2018-08-24T22:02:06.397970" name="estop" sequence="438473086">UNAVAILABLE</EmergencyStop> <AssetChanged dataItemId="GFAgie01_asset_chg" timestamp="2018-08-24T22:02:06.390521" sequence="438472999" assetType="">UNAVAILABLE</AssetChanged>
And if the “@” character is not removed, the JSON looks like the following.
{ "?xml": { "@version": "1.0", "@encoding": "UTF-8" }, "?xml-stylesheet": "type="text/xsl" href="/styles/Streams.xsl"", "MTConnectStreams": { "@xmlns:m": "urn:mtconnect.org:MTConnectStreams:1.3", "@xmlns": "urn:mtconnect.org:MTConnectStreams:1.3", "@xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance", "@xmlns:x": "urn:nist.gov:NistStreams:1.3", "@xsi:schemaLocation": "urn:nist.gov:NistStreams:1.3 /schemas/NistStreams_1.3.xsd", "Header": { "@creationTime": "2018-08-24T22:02:06Z", "@sender": "5c182aaf5077", "@instanceId": "1535115106",
Converting the result into JSON also allows access to the attribute values for firstSequence and lastSequence.
"value": "@int(variables('CurrentResult').MTConnectStreams.Header.firstSequence)" Then I can plug in the values to the query parameter of the sample request. "Sample": { "type": "Http", "inputs": { "method": "GET", "uri": "https://smstestbed.nist.gov/vds/sample?from=@{variables('NextSequence')}" }, "runAfter": { "Delay": [ "Succeeded" ] } },
After each probe request is successfully received, the workflow saves the result to a blob on Azure Blob Storage. It then puts a message into Storage Queue with the URL of the saved blob, This notifies the other microservice to transform and post the data records to their subsequent Event Hubs.
Next steps
- Check out Azure Logic Apps documentation for more details.
- Clone the sample repo.
- Read Part 2 of this post when its released for the implementation of the communication between microservices and the implementation details of the second microservice.
Source: IoT