The Garden of Tomorrow – Architecture and workflows

In an earlier blog, I showed the video of my automated garden irrigation system that is powered by a couple of IoT devices with the control logic being handled by AWS IoT Analytics. In this post I’ll go a bit deeper into how it all works.

The overall system architecture looks like this – I have 2 micro-controllers powering the system, one handles all the environmental monitoring and one handles the flow of the water. I could do it all with a single micro-controller OK, this is just how my system happened to evolve. In both cases I’m using the ESP8266 and the Arduino IDE to write the code.

The problem can be broken down into getting the data, analyzing the data and then taking the appropriate actions depending on the analysis. In this project I’m going to use CloudWatch Rules to trigger Lambda Functions that control the water flow and use the Analysis to configure those Rules.

Data Sources
  1. Environmental sensor data on illumination & temperature
  2. Flow rate data from the irrigation pipe
  3. Future weather forecast data from darksky
Analysis
  1. Determine when to start watering by looking at the past illumination data
  2. Determine how much water by looking at the past temperature data
Command and Control
  1. Configure CloudWatch Rules to Trigger Water_On and Water_Off Lambda
  2. Send SNS notifications of planned activity
  3. Send SNS alarms if the water is flowing when it should not be

Before diving in to the analysis, a few words on where and how I’m storing all my device data.

My devices publish sensor data on a variety of MQTT topics, a sub-set of the topics I use would be;

rtjm/<DEVICEID>/weather/temperature
rtjm/<DEVICEID>/weather/humidity
rtjm/<DEVICEID>/photoresistor
rtjm/<DEVICEID>/infrared
rtjm/<DEVICEID>/barometer/pressure
rtjm/<DEVICEID>/barometer/temperature

The messages on all the topics are somewhat different, but typically contain a timestamp, a measurement and other small pieces of information.

I send all this data to a single channel and store it all in a single data store as I will use SQL queries in datasets and Jupyter Notebooks to extract the data I want.

The Analysis

The first step is to create data sets to collect all the data for the period we will analyze in the notebook. I’ve chosen to use 1 data set for the temperature data and another data set for the illumination data, but it would be equally possible to use a single data set with the right query.

What does my temperature data set look like?

SELECT * FROM telescope_data where __dt >= current_date - interval '7' day AND (unit='Celcius' OR unit='C') order by epoch desc

What does my illumination data set look like?

SELECT * FROM telescope_data where __dt >= current_date - interval '7' day AND unit='lumen' order by epoch desc

I’ve set both data sets to execute daily as the preparation step for the next stage of analysis.

The Notebook

The crux of this entire project is the Jupyter Notebook, so we’re going to look at that in some detail. The full code for the notebook is available here.

Let’s start with the basics, to read the contents of a dataset, we can use some code like this;

iota = boto3.client('iotanalytics')
dataset = "illumination"
dataset_url = iota.get_dataset_content(datasetName = dataset,versionId = "$LATEST")['entries'][0]['dataURI']
df_light = pd.read_csv(dataset_url,low_memory=False)

This reads the latest version of the dataset content (every time the dataset is executed, a new version will be generated) for the dataset called illumination and reads it into a panda dataframe called df_light.

df_light['datetime']=pd.DatetimeIndex(pd.to_datetime(df_light["received"]/1000, unit='s')) \
    .tz_localize('UTC') \
    .tz_convert('US/Pacific')

df_light.index = df_light['datetime']

This adds a datetime index to the dataframe using the ‘received’ column in the data and converts it to the appropriate timezone.

Next we do some analysis with this data to figure out when dawn is and when we should turn the water on. I’m not going to explain this in detail as really the analysis you will be doing is totally dependent on the actual problem you want to solve, but you can review the code for the notebook here.

Typically in my notebooks I plot the data I am working with so I can visually inspect whether the data aligns with my expectations. Here’s the illumination data plotted by the notebook for example;

And here’s the temperature data from the other dataset.

Looking at the notebook code, you’ll see that we distill this data down to a time to turn the water on and a time to turn the water off.

water on (local)= 2018-08-23 06:06:10
water off (local)= 2018-08-23 07:31:10

You will recall I mentioned we would look at the weather forecast to determine if it was going to rain or not? How does that work?

lambdaClient = boto3.client('lambda')
response = lambdaClient.invoke(FunctionName='IsItGoingToRain')
result = json.loads(response['Payload'].read().decode("utf-8"))
willItRain = result

I’ve encapsulated the ‘IsItGoingToRain’ into a Lambda function that is executed by the notebook and this beings me to an important but sometimes overlooked point – I can use the entire AWS SDK from within my notebook and this gives me a great deal of flexibility to design a solution that leverages many other services. This lambda function is really simple, the code looks like this;

import json
from urllib.request import urlopen

def lambda_handler(event, context):
    url = "https://api.darksky.net/forecast/<REDACTED>/<LAT>,<LON>?units=si&exclude=currently,flags,alerts,minutely,daily"
    response = urlopen(url)
    weather = json.load(response)
    hourly=weather["hourly"]["data"]
    willItRain=False
    for hour in hourly:
        if ( hour["precipIntensity"] > 3 and hour["precipProbability"]>0.8) :
            willItRain = True
    return willItRain

Next the notebook leverages CloudWatch Event Rules to trigger another pair of lambda functions – one to turn the water on and one to turn the water off. Let’s take a look at the rule configuration to see how straight-forward that is as well.

ruleStatus = 'DISABLED' if (willItRain) else 'ENABLED'
cwe = boto3.client('events')

response = cwe.put_rule(
    Name='water-on',\
    ScheduleExpression='cron('+str(water_on.minute)+' '+str(water_on.hour)+' ? * * *)',\
    State=ruleStatus,\
    Description='Autogenerated rule to turn the water ON at the specified time')

response = cwe.put_rule(
    Name='water-off',\
    ScheduleExpression='cron('+str(water_off.minute)+' '+str(water_off.hour)+' ? * * *)',\
    State=ruleStatus,\
    Description='Autogenerated rule to turn the water OFF at the specified time')

The notebook goes on to publish the analysis back into another datastore, send messages to my phone etc, so please read the full notebook code here to get a sense of the variety of possibilities.

Great, so I have my notebook for analysis and I’ve tested it so that I’m happy, but how do I automate execution – it’s not very convenient having to manually run the notebook every time I want to adjust the irrigation and manual execution kind of misses the point of the project.

The key is to ‘containerize’ the notebook. This process is started by simply clicking on the containerize button you should see on the upper menu bar;

This process, launched at jupytercon 2018, allows you to package up your notebook into a docker image stored in an Amazon Elastic Container Registry repository and then you can use a container data set within IoT Analytics to execute the docker image on demand – either on a fixed schedule or triggered by the completion of another data set (which could be a SQL data set that prepares the data for the notebook).

The Result

Once a day my notebook is executed and determines when to turn the water on and off using both local environmental sensor readings and the weather forecast for the day ahead, the configuration drives CloudWatch Event Rules that invoke Lambda functions to turn the water on and off. The system has been up and running all summer without incident and the garden is still thriving.

UPDATE

Learn more about containerizing your notebook on the official AWS Iot Blog

 

Quick Tip – creating export workflows from AWS IoT Analytics

AWS IoT Analytics provides access to the results of a SQL query as a data set that you download using a pre-signed URL, but what if you want to export the results somewhere else automatically?

Although AWS IoT Analytics doesn’t contain this functionality natively, we can leverage the power of a triggered notebook container to achieve our desired outcome. For flexibility, in this example I’m going to use Amazon Kinesis Firehose to stream the data into S3 and in a future post we’ll look at how we can also use Firehose to stream the data into Redshift.

Http iframes are not shown in https pages in many major browsers. Please read this post for details.

That’s pretty straightforward.

Let’s take a closer look at the crux of the job, streaming the CSV out of IoT Analytics and putting JSON records into S3 via firehose.

stream = urllib.request.urlopen(dataset_url)
reader = csv.DictReader(codecs.iterdecode(stream, 'utf-8'))
rows=0
for row in reader:
    record = json.dumps(row)+"\n"
    response = firehose.put_record(DeliveryStreamName=streamName,Record={'Data': record})

This isn’t especially efficient as we are calling Put Record once for each row in our CSV, but it keeps the code simple. If we migrated to using Put Records instead, which is a batch API, it would be much faster but we would have to introduce complexity to keep the batch size within the limits.

All we need to do now is setup a container notebook triggered from the dataset execution in a similar way to how we did it in an earlier post and we’ll have the data set streamed into S3 on every execution.

You may be wondering why you can’t just do this with a Lambda function? You could, if you could trigger a Lambda function when the data set content has been generated, but that’s not currently possible 🙁

 

Detecting clouds and clear skies (part three)

Last time we saw how we could take the the results of our cloud sensor data set and explore them using a Jupyter notebook.  Typically you use the notebook to implement the data science part of your projects but once you have the notebook ready, how do you run it automatically on a schedule?

First let’s start with the data science we would like to do. I’m going to do some analysis of my sensor readings to determine if it is night or day and if the sky is clear, has low or high cloud, or it’s raining (or snowing). Then, if conditions have changed since the last update, I’m going to publish a message on an SNS topic which will result in a message on my mobile phone for this example.

The first new feature I’m going to use is that of delta windows for my dataset.

In the last example, I scheduled a data set every 15 minutes to retrieve the last 5 days of data to plot on a graph. I’m going to narrow this down now to just retrieve the incremental data that has arrived since the last time the query was executed. For this project, it really doesn’t matter if I re-analyse data that I analysed before, but for other workloads it can be really important that the data is analysed in batches that do not overlap and that’s where the delta window feature comes in.

We will edit the data set and configure the delta time window like this;

The Timestamp expression is the most important option, IoT Analytics needs to know how to determine the timestamp of your message to ensure that only those falling within the window are used by the data set. You can also set a time offset that lets you adjust for messages in flight when the data set is scheduled.

Note that my Timestamp expression is;

from_unixtime(received/1000)

In many of my projects I use the Rule Engine Action SQL to add a received timestamp to my messages in case the device clock is incorrect or the device simply doesn’t report a time. This generates epoch milliseconds hence I’m dividing by 1000 to turn this into seconds before conversion to the timestamp object.

We’re going to make some changes to our Jupyter notebook as well, to make it easier to see what I’ve done, the complete notebook is available here.

First thing to note is that the delta window with a query scheduled every 15 minutes means we will only have data for a 15 minute window, here’s what a typical plot of that data will look like;

And here’s the ‘data science’ bit – the rules we will use to determine whether it is night or day and whether it is cloudy or not. Obviously in this example we could basically do this in real-time from the incoming data stream, but imagine that you needed to do much more complex analysis … that’s where the real power of Jupyter notebooks and Amazon Sagemaker comes to the fore. For now though, we’ll just do something simple;

mean = statistics.mean(df_delta)
sigma = statistics.stdev(df_delta)

sky='Changeable'

if (sigma < 5 and mean > 20) :
    sky = 'Clear'
if (sigma < 1 and mean > 25) :
    sky = 'Very Clear'
if (sigma < 5 and mean <= 3) :
    sky = 'Rain or Snow'
if (sigma < 5 and mean > 3 and mean <= 10) :
    sky = 'Low cloud'
if (sigma < 5 and mean >12 and mean <= 15) :
    sky = 'High cloud'

mean,sigma,sky

So we’ll basically report Very Clear, Clear, Rain or Snow, Low cloud or High cloud depending on the difference between the temperature of the sky and ground which is a viable measure of cloud height.

We’ll also determine if it is night or day by looking at the light readings from another sensor in the same physical location.

Automation

We can test our new notebook by running it as normal, but when we’re ready to automate the workflow we need to containerize the notebook so it can be independently executed without any human intervention. Full details on this process are documented over at AWS

Trigger the notebook container after the data set

Once you’ve completed the containerization, the next step is to create a new data set that will execute it once the SQL data set has completed.

Select Create Container and on the next screen name your data set so you can easily find it in the list of data sets later.

Now you want to select the trigger for the analysis. You don’t have to trigger the container execution from a data set, but it is quite a common workflow and the one we’re going to use today, so click Link to select the trigger from the 3 options below.

Next we have to select which data set we want to link this analysis to.

And then we need to configure the source container that will be executed.

Note that you can choose to deploy any arbitrary container from Amazon ECR but we’re going to choose the container we created earlier. Note that the latest image is tagged to help you locate it since typically you will want to run the most recent version you have containerised.

On the next page, note that you can select between different compute resources depending on the complexity of the analysis you need to run. I typically pick the 4 vCPU / 16GiB version just to be frugal.

The final step is to configure the retention period for your data set and then we’re all set.

Although there are a lot of steps, once you’ve done this a couple of times it all becomes very straight-forward indeed. We now have the capability to execute a powerful piece of analysis triggered by the output of the SQL data set and do this entire workflow on a schedule of our choosing. The automation possibilities this opens up are significant and go beyond my simple example of sending me a message when the weather changes locally.

 

 

 

 

 

 

Connecting the ESP8266 to AWS IoT Core over MQTT

Securely sending IoT data to the cloud is an important consideration, especially if you can receive messages from the cloud and then activate equipment. It might be annoying if my house lights are turned on or off by someone else, but if my garage or front door can be opened by a malicious person, that’s much more serious.

AWS IoT Core is a secure platform for sending IoT device data, but this in turn presents challenges for developers using some of the popular micro-controllers like the ESP8266 which has very little RAM and a relatively slow processor. AWS IoT Core uses X.509 client certificates to identify devices and you’ll need to be able to negotiate a TLS 1.2 connection – which can be quite a challenge for a constrained device. Prior to the end of 2017 this was a real issue for the ESP8266 but thanks to work on the SSL libraries, it is now possible to easily make a secure connection – with one caveat.

While the ESP8266 can now make a TLS 1.2 negotiated connection to AWS IoT Core and identify itself using an X.509 client certificate, for a secure connection the client also needs to verify that the server really is who it claims to be. This is done by verifying the certificate authority that signed the server certificate and currently, this is beyond the memory capabilities of the ESP8266.

So caveat aside, what does the code look like for making a secure connection to AWS IoT Core over MQTT?

There are a few ways of handling the certificate encoding, and there is a nice example of how to do this over on github written by one of the contributors to the ESP8266 Arduino project.

While the risk of a compromise here is low, you should be cautious about any data you send or receive without verifying the identity of the server. Sending temperature readings or receiving commands to turn on some small projects is low risk, but I wouldn’t be sending my credit card details over MQTT from the ESP8266 for example (although this would be a stupid thing to do in any event). Remember this is not a security issue with AWS IoT Core, it’s with the Arduino library running on the ESP8266 which currently doesn’t have the capability of verifying the certificate chain. This may change in the future.

The good news is, there is an easy solution, upgrade your projects to the more recent and more powerful ESP32 – the big brother of the ESP8266 from the same manufacturer.

The code for the ESP32 is similar but simpler;

Copy and paste the certificate and key files you get when you create your device Thing in AWS IoT Core and tweak the formatting so you can use them like this;

const char*  certificatePemCrt = \
"-----BEGIN CERTIFICATE-----\n" \
"MIIDWjDCAkKgAwIBAlIVAO4oCOcEtp6ex+nzUkv1+Nd4ZcgEMA0GCFqGSIL3DQ3B" \
"---------------- redacted for clarity and privacy --------------" \
"AcVdn0SlXDZ2eqEIXs79tsOuw7awrkWvMRyZ8A4lQlin53dA77jXEzwbAOp6dp==" \
"-----END CERTIFICATE-----\n";

const char*  privatePemKey = \
"-----BEGIN RSA PRIVATE KEY-----\n" \
"MIIZpAIBFFKCAQEArKDwRPmAnkF0lomDj6i8I8qDRyTuJOLmCbn8CtPl12QlT7Yc" \
"---------------- redacted for clarity and privacy --------------" \
"Neawrz1V983PPKSrXeim6f6/gZq92ut5mCZZFwkN+muQtlLDixpFjL==" \
"-----END RSA PRIVATE KEY-----\n";

const String AmazonCACert = \
"MIIE0zCCA7ugAwIBAgIQGNrRniZ96LtKIVjNzGs7SjANBgkqhkiG9w0BAQUFADCB" \
"yjELMAkGA1UEBhMCVVMxFzAVBgNVBAoTDlZlcmlTaWduLCBJbmMuMR8wHQYDVQQL" \
"ExZWZXJpU2lnbiBUcnVzdCBOZXR3b3JrMTowOAYDVQQLEzEoYykgMjAwNiBWZXJp" \
"U2lnbiwgSW5jLiAtIEZvciBhdXRob3JpemVkIHVzZSBvbmx5MUUwQwYDVQQDEzxW" \
"ZXJpU2lnbiBDbGFzcyAzIFB1YmxpYyBQcmltYXJ5IENlcnRpZmljYXRpb24gQXV0" \
"aG9yaXR5IC0gRzUwHhcNMDYxMTA4MDAwMDAwWhcNMzYwNzE2MjM1OTU5WjCByjEL" \
"MAkGA1UEBhMCVVMxFzAVBgNVBAoTDlZlcmlTaWduLCBJbmMuMR8wHQYDVQQLExZW" \
"ZXJpU2lnbiBUcnVzdCBOZXR3b3JrMTowOAYDVQQLEzEoYykgMjAwNiBWZXJpU2ln" \
"biwgSW5jLiAtIEZvciBhdXRob3JpemVkIHVzZSBvbmx5MUUwQwYDVQQDEzxWZXJp" \
"U2lnbiBDbGFzcyAzIFB1YmxpYyBQcmltYXJ5IENlcnRpZmljYXRpb24gQXV0aG9y" \
"aXR5IC0gRzUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCvJAgIKXo1" \
"nmAMqudLO07cfLw8RRy7K+D+KQL5VwijZIUVJ/XxrcgxiV0i6CqqpkKzj/i5Vbex" \
"t0uz/o9+B1fs70PbZmIVYc9gDaTY3vjgw2IIPVQT60nKWVSFJuUrjxuf6/WhkcIz" \
"SdhDY2pSS9KP6HBRTdGJaXvHcPaz3BJ023tdS1bTlr8Vd6Gw9KIl8q8ckmcY5fQG" \
"BO+QueQA5N06tRn/Arr0PO7gi+s3i+z016zy9vA9r911kTMZHRxAy3QkGSGT2RT+" \
"rCpSx4/VBEnkjWNHiDxpg8v+R70rfk/Fla4OndTRQ8Bnc+MUCH7lP59zuDMKz10/" \
"NIeWiu5T6CUVAgMBAAGjgbIwga8wDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8E" \
"BAMCAQYwbQYIKwYBBQUHAQwEYTBfoV2gWzBZMFcwVRYJaW1hZ2UvZ2lmMCEwHzAH" \
"BgUrDgMCGgQUj+XTGoasjY5rw8+AatRIGCx7GS4wJRYjaHR0cDovL2xvZ28udmVy" \
"aXNpZ24uY29tL3ZzbG9nby5naWYwHQYDVR0OBBYEFH/TZafC3ey78DAJ80M5+gKv" \
"MzEzMA0GCSqGSIb3DQEBBQUAA4IBAQCTJEowX2LP2BqYLz3q3JktvXf2pXkiOOzE" \
"p6B4Eq1iDkVwZMXnl2YtmAl+X6/WzChl8gGqCBpH3vn5fJJaCGkgDdk+bW48DW7Y" \
"5gaRQBi5+MHt39tBquCWIMnNZBU4gcmU7qKEKQsTb47bDN0lAtukixlE0kF6BWlK" \
"WE9gyn6CagsCqiUXObXbf+eEZSqVir2G3l6BFoMtEMze/aiCKm0oHw0LxOXnGiYZ" \
"4fQRbxC1lfznQgUy286dUV4otp6F01vvpX1FQHKOtw5rDgb7MzVIcbidJ4vEZV8N" \
"hnacRHr2lVz2XTIIM6RUthg/aFzyQkqFOFSDX9HoLPKsEdao7WNq";

And now you can configure your wiFiClient to use the certificate and verify the CA certificate with;

wiFiClient.setCertificate(certificatePemCrt);
wiFiClient.setPrivateKey(privatePemKey);
wiFiClient.setCACert(AmazonCACert);

You could use similar code to the github project linked earlier – this ESP32 example just shows another way of encoding the certificate in your Arduino sketch.

SECURITY CAUTION

Anyone who can get physical access to your device can access the private key if you don’t take additional steps to protect it – and you’d also be surprised how many people upload the code containing the private keys to github, you may not want to do that either! There are steps you can take to make your system more secure, such as storing the private key in a dedicated crypto store like the ATECC508A microchip or, if using the ESP32, consider using the flash encryption features to add further layers of protection. Another great option is to make sure you don’t reuse certificates and keys on multiple devices and ensure that you only grant the minimum necessary permissions so that even if your private key was compromised, you could revoke the certificate paired with the key and only one single device would be impacted.

For more information on the AWS IoT security model, this blog post has more detail.

Vibration analysis with the ESP8266 & MPU6050

Our furnace blower motor began making an awful noise recently and despite best efforts to persuade it to run smoothly by adjusting the belt tension, there was an annoying rhythmical thump-thump-thump noise coming from it. Although detecting this degraded operation was super easy after the fact, I wondered how easy it would be do detect the early signs of a problem like this where essentially I would want to look for unusual vibration patterns to spot them well in advance of being able to hear that anything was wrong.

While looking at vibration sensors, I came across various small gyros and accelerometers and figured that they might be just the thing, so I ordered a few different types and prototyped a small project using the MPU6050 6 axis gyro / accelerometer package.

I used an ESP8266 micro-controller to gather the data and send it to an MQTT topic using AWS IoT Core and the 8×8 display segment is used to tell me when the device is capturing and when it is sending.

The accelerometer package is on the small board with the long plastic stick attached. I decided to use this so I could clip it into a photo hook that I could stick on the furnace motor. I know the physics of this are distinctly questionable, but I was interested if I could make any sense of the accelerometer readings.

Here it is all hooked up and capturing data – hence the large ‘C’ on the display.

The code for the ESP8266 was written using the Arduino IDE and makes use of the MIT licensed i2cdevlib for code to handle the MPU6050 accelerometer which is a remarkably competent sensor in a small package that can do a lot more than this simple project demonstrates.

Hopefully if you’ve been reading previous blogs, you’ll recall that we can use our standard pattern here of;

  1. Send data to AWS IoT Core MQTT topic
  2. Use a Rule to route the message to an AWS IoT Analytics Channel
  3. Connect the Channel to a Pipeline to a Data Store for collecting all the data
  4. Use data sets to perform the analysis

For sending the data to AWS IoT Core, I use the well established Arduino pubsubclient library and my publication method looks like this, with much of the code being for debugging purposes and helping me see what the device is doing.

int publish_mqtt(JsonObject &root,char const *topic) {

    int written = 0;
    if (root.success()) {    
       written = root.printTo(msg);
    }

    sprintf(outTopic,"sensor/%s/%s",macAddrNC,topic);
  
    Serial.print(F("INFO: "));    
    Serial.print(outTopic);
    Serial.print("->");
    Serial.print(msg);
    Serial.print("=");
    
    int published = (pubSubClient.publish(outTopic, msg))? 1:0;
    Serial.println(published);    
    return published;
}

The Rule simply routes all the sensor data to the appropriate topic like this;

But let’s take a look at the dataset – what information are we actually recording from this sensor?

Of course we can look at the C code running on the micro-controller to see what I send, and that looks like this;

void publish_data(int index) {
 
    if (!pubSubClient.connected()) { return; }
 
    DynamicJsonBuffer jsonBuffer(256);
    JsonObject &root = jsonBuffer.createObject();
   
    VectorInt16 datapoint = capture[index];
    root["seq"]=sequence;
    root["i"]= index;
    root["x"]=datapoint.x;
    root["y"]=datapoint.y;
    root["z"]=datapoint.z;
    publish_mqtt(root,"vibration/mpu6050");   
    jsonBuffer.clear();
           
}

And when we extract that data with a simple SQL query to get all of the data, we see a preview like this;

The x/y/z readings are the accelerometer readings for each of the x/y/z axes.  These aren’t quite raw sensor readings, they are the acceleration with the effect of gravity removed, and while this isn’t directly important for this example, the code that does that with the MPU6050 in my C code looks like this;

mpu.dmpGetQuaternion(&q, fifoBuffer);
mpu.dmpGetAccel(&aa, fifoBuffer);
mpu.dmpGetGravity(&gravity, &q);
mpu.dmpGetLinearAccel(&aaReal, &aa, &gravity);
VectorInt16 datapoint = VectorInt16(aaReal.x,aaReal.y,aaReal.z);

What about the sequence number and the i value?

My example code samples data for a few seconds from the sensor at 200Hz and then stops sampling and switches to sending mode, then it repeats this cycle. To help me make sense of it all, the sequence number is the epoch time for the start of each capture run and the i value is simply an index that counts from 0 up through n where n is the number of samples. This helps me analyse each chunk of data separately if I want to.

I was quite excited to see what this data looked like, so I created a Notebook in AWS IoT Analytics and did a simple graph of one of the samples. Hopefully the pattern of reading a dataset and plotting a graph is becoming familiar now so I won’t include all the setup code, but here’s the relevant extract from the Jupyter Notebook;

# Read the dataset

client = boto3.client('iotanalytics')
dataset = "vibration"
dataset_url = client.get_dataset_content(datasetName = dataset)['entries'][0]['dataURI']
df = pd.read_csv(dataset_url)

# Extract 1000 sample points from the sequence that began at 1518074892

analysis = df[((df['seq'] == 1518074892) & (df['i'] < 1000))].sort_values(by='i', ascending=True, inplace=False)

# Graph the accelerometer X axis readings

analysis.plot(title='Vibration Analysis x', \
                         kind='line',x='i',y='x',figsize=(20,8), \
                         color='red',linewidth=1,grid=True)

I was really pretty excited when I saw this first result. The data is clearly cyclical and it looks like the sample rate of 200Hz might have been fast enough to get something usable.

Let’s check this isn’t a fluke and look at the y-axis data as well. It’s worth saying that because I just randomly stuck the sensor onto the motor, my vibration data will be spread across the x,y,z axes and I was interested to see if this rendered the data unusable or whether something as simple as this could work.

This looks slightly cleaner than the x-axis data, so I chose to use that for the next steps.

Now for some basic data science

I have the raw data and what I want to know is – what are the key vibration energies of this motor. This helps answer the question is it running smoothly or is there a problem? How do I turn the waveform above into an energy plot of the main vibration frequencies? This is a job for a fast Fourier transform which “is an algorithm that samples a signal over a period of time and divides it into its frequency components”. Just what I need.

Well almost – perfect. So I now know I want to use a FFT to analyse the data, but how do I do that? This is where the standard data science libraries available with Amazon Sagemaker Jupyter Notebooks come to the rescue and I can use scipy and fftpack with a quick import like this;

import scipy.fftpack

This lets me do the FFT analysis with just a few lines of code;

sig = analysis['y']
sig_fft = scipy.fftpack.fft(sig)

# Why 0.005? The data is being sampled at 200Hz
time_step = 0.005

# And the power (sig_fft is of complex dtype)
power = np.abs(sig_fft)

# The corresponding frequencies
sample_freq = scipy.fftpack.fftfreq(sig.size, d=time_step)

# Only interested in the positive frequencies, the negative just mirror these. 
# Also drop the first data point for 0Hz

sample_freq = sample_freq[1:int(len(sample_freq)/2)]
power = power[1:int(len(power)/2)]

For the moment of truth, let’s plot this on a graph and see if we have a clear signal we can interpret from the data.

plt.figure(figsize=(20, 8))
plt.xlabel('Frequency [Hz]')
plt.ylabel('Power')
plt.title("FFT Spectrum for single axis")
plt.xticks(np.arange(0, max(sample_freq)+1, 2.0))
plt.plot(sample_freq, power, color='blue')

I was pretty excited when I saw this as the plot of power against frequency made sense. The large spike at around 11Hz aligned with the thump-thump-thump noise I could hear and the smaller, but still significant spike at 30Hz could well be the ‘normal’ operating vibration since the mains frequency is 60Hz. I’m guessing a bit at this since I’m neither a data scientist, a motor expert or an electrician, but it made sense to me. The important thing is that we have extracted a clear signal from the data that can be used to provide an insight.

Detecting clouds and clear skies (part two)

Last time we covered how to route data from a cloud sensor to IoT Analytics and how to create a SQL data set that would be executed every 15 minutes containing the most recent data. Now that we have that data, what sort of analysis can we do on it to find out if the sky is cloudy or clear?

AWS IoT Analytics is integrated with a powerful data science tool, Amazon Sagemaker, which has easy to use data exploration and visualization capabilities that you can run from your browser using Jupyter Notebooks. Sounds scary, but actually it’s really straight forward and there are plenty of web based resources to help you learn and explore increasingly advanced capabilities.

Let’s begin by drawing a simple graph of our cloud sensor data as often visualizing the data is the first step towards deciding how to do some analysis. From the IoT Analytics console, tap Aalyze and then Notebooks from the left menu. Tap Create Notebook to reach the screen below.

There are a number of pre-built templates you can explore, but for our project, we’re going to start from a Blank Notebook so tap on that.

To create your Jupyter notebook (and the instance on which it will run), follow the official documentation Explore your Data section and get yourself to the stage where you have a blank notebook in your browser.

Let’s start writing some code. We’ll be using Python for writing our analysis in this example.

Enter the following code in the first empty cell of the notebook. This code loads the boto3 AWS SDK , the pandas library which is great for slicing and dicing your data, and mathplotlib which we will use for drawing our graph. The final statement allows the graph output to appear inline in the notebook when executed.

import boto3
import pandas as pd
from matplotlib import pyplot as plt
%matplotlib inline

Your notebook should start looking like the image below – we’ll explain the rest of the code shortly.

client = boto3.client('iotanalytics')
dataset = "cloudy"
dataset_url = client.get_dataset_content(datasetName = dataset)['entries'][0]['dataURI']
df = pd.read_csv(dataset_url)

This code reads the dataset produced by our SQL query into a panda data frame. One way of thinking about a data frame is that it’s like an Excel spreadsheet of your data with rows and columns and this is a great fit for our data set from IoT Analytics which is already in tabular format as a CSV – so we can use the read_csv function as above.

Finally, to draw a graph of the data, we can write this code in another cell.

df['datetime'] = pd.to_datetime(df["received"]/1000, unit='s')
ax1 = df.plot(kind='line',x='datetime',y='object',color='blue',linewidth=4)

df.plot(title='Is it cloudy?',ax=ax1, \
                         kind='line',x='datetime',y='ambient',figsize=(20,8), \
                         color='cyan',linewidth=4,grid=True)

When you run this cell, you will see the output like this for example

Here’s all the code in one place to give a sense of how little code you need to write to achieve this.

import boto3
import pandas as pd
from matplotlib import pyplot as plt
%matplotlib inline

client = boto3.client('iotanalytics')
dataset = "cloudy"
dataset_url = client.get_dataset_content(datasetName = dataset)['entries'][0]['dataURI']
df = pd.read_csv(dataset_url)
df['datetime'] = pd.to_datetime(df["received"]/1000, unit='s')

ax1 = df.plot(kind='line',x='datetime',y='object',color='blue',linewidth=4)
df.plot(title='Is it cloudy?',ax=ax1, \
                         kind='line',x='datetime',y='ambient',figsize=(20,8), \
                         color='cyan',linewidth=4,grid=True)

Of course what would be really nice would be to be able to run analysis like this automatically every 15 minutes and notify us when conditions change, this will be the topic of a future post that harnesses a recently released feature of IoT Analytics  for automating your workflow and in the meantime you can read more about that in the official documentation.

 

 

Detecting clouds and clear skies (part one)

As a keen, yet lazy, amateur astronomer, my quest for a fully automated observatory continues. My ideal morning would start with a lovely cup of coffee and an email from my observatory telling me what it was able to image overnight along with some nice photos. To achieve this, one of the pieces of information I need the computer system to know is whether the sky is clear or not. If it’s clear, then we can open the observatory roof, if it’s cloudy, we should stop the observation session – that sort of thing.

Unsurprisingly, finding sensors to detect clouds isn’t that straight forward, but it turns out that a possible solution comes from a neat little infra-red temperature sensor. Point one of these straight up to the sky, and you’ll get quite different readings when it’s cloudy or clear, so a bit of data analysis can easily determine if it’s likely to be worth rolling back the observatory roof or not.

For my project, I used some gorilla glue to fix the sensor inside a cable gland and then mounted it on the top of a small project box like this.

Inside the box, all we need is a trusty ESP8266 Micro-controller, a power connector and a few resistors – total project cost around $30. Commercial cloud sensors (yes, you can buy such a thing) start at several hundred $ and up, so if we can get this to work, it will be a very frugal option.

As you can see, I’ve left the USB cable connected to the device so that I can easily re-program the MCU later if required. I could of course do this with an OTA (over the air) update, but for this project the cable is fine.

Here it is, screwed onto the fence in the garden.

So what does the data look like? The upper cyan line is the ‘ambient’ or local temperature at sensor level whereas the dark blue like is the ‘object’ or remote temperature. The larger the difference, the clearer the skies, and when they are reading the same, that typically means there is rain or snow directly on the sensor window.

The software running on the MCU is written in C using the Arduino IDE and the ESP8266 SDK. It doesn’t do anything complex, it connects to the local WiFi network, establishes a secure MQTT connection with AWS IoT Core, and then every 30 seconds or so it reads the temperature sensor and then publishes the data to an MQTT topic. It really is a ‘dumb’ data collector since it makes no attempt to infer the state of the sky locally on the box.

So how do we pick up the MQTT data and analyze it? I’d like to be able to infer the state of the sky now, but also to have a historic record of my data for later analysis, and perhaps to use for training a machine learning model against other sources of data (images of the sky for example). For scenarios where you want to store the connected device data, AWS IoT Analytics is often a good fit and so what I’m going to do is as follows;

  1. Create a Data Store in AWS IoT Analytics to collect all my data
  2. Create a Channel to receive the data from the MQTT Topic
  3. Create a Pipeline to join the Channel to the Data Store, and perhaps send some real-time data to CloudWatch at the same time.
  4. Create a Rule in AWS IoT Core to route data from the MQTT topic to my channel
  5. Schedule a dataset to analyze the data every 15 minutes
  6. Publish to an SNS topic when it’s both dark and the sky seems clear

I covered steps 1 to 4 in an earlier introductory blog with part one and part two, and the principle is the same for any project like this. Let’s turn our attention to the analysis part of the project.

Head back to the IoT Analytics console and from the Analyze sub-menu, select Data sets and then tap Create

SQL Data sets are used when you want to execute a query against your data store and this is the common use case and what we will want to start with. Container Data sets are more advanced and let you trigger the execution of arbitrary Python (or indeed a custom container) once the SQL Data set is ready. Container Data sets are both powerful and flexible as we will see a bit later on.

So let’s start by creating the SQL Data set, tap on Create SQL and pick a suitable name and select the Data Store that you want to execute the query against.

Tap Next and now we get the SQL editing screen where we can enter our query that will run every 15 minutes.

The query I’m using in more detail is;

SELECT ambient,object,status.uptime,status.rssi,status.heap,epoch,received FROM cloudy_skies 
WHERE __dt >= current_date - interval '5' day 
AND full_topic like '%infrared/temperature'

An important note here is the __dt WHERE clause. IoT Analytics stores your messages partitioned by ingest date to make query performance faster and lower your costs. Without this line, the whole data store would be scanned and depending on how much data you have, this could take a very long time to complete. In this case, I’m choosing to pull out the most recent 5 days of data, which is more than I actually need to know if it is currently cloudy or not, but gives me flexibility in the next stage when I author a Jupyter Notebook to do the analysis.

Once you have your query, tap Next to configure the data selection window.

I’m going to use the default ‘None’ option here. The other option, delta windows, is a powerful option that enables you to perform analysis on only the new data that has arrived since you last queried the data. I’ll cover this more advanced topic in a future post, but for now just tap on Next to move on to the scheduling page.

Setting a schedule is entirely optional, but in this case we want to check on sky conditions every 15 minutes, so we can choose that option from the drop-down menu and tap Next to move to the final step, setting the retention policy.

Retention policies are useful when you might have large data sets that are incurring storage costs you’d prefer to avoid and you don’t need the data to be available for long periods. For this project, my data sets are small and I don’t need to take any special action, so just tap on the final Create data set button and we’re done.

Let’s review what we’ve done

We’ve created a Channel connected to a Pipeline feeding a Data store where all the IoT device data will be collected.

We’ve created a rule in IoT Core to route data from the appropriate MQTT topic into the Channel.

We’ve created a Data set that will execute a SQL query every 15 minutes to gather the most recent data.

How do we do some analysis on this data to see if the sky is clear though? I’ll cover that in part two.

 

Realtime metrics with AWS IoT Analytics and CloudWatch

IoT Analytics is great for doing analysis of your IOT device data on a regular cadence, for example daily or hourly. Faster scheduled analysis is possible, and the minimum scheduling frequency was lowered to 15 minutes in August this year, but what if you want something near real-time? Although there isn’t a built in feature for this, if you just want to setup a basic alarm or build some straightforward near real-time dashboards, there’s a simple solution using the power of the IoT Analytics Lambda Activity coupled with AWS CloudWatch.

Messages from my devices flow into AWS IoT Analytics from a number of MQTT topics that are all routed to a Channel using Rules that I’ve setup with AWS IoT.

Data flowing into a Channel passes through a Pipeline before reaching the Datastore and this Pipeline is the key to getting our near real-time metrics.

The Lambda Activity in AWS IoT Analytics

All we have to do is add a Lambda function into our Pipeline and have the Lambda function route appropriate data to AWS CloudWatch so that we can then use all the features of CloudWatch for building our dashboards, configuring alarms and combining our new custom metrics with data from other sources. In the example above, I’ve named the Lambda function filter_to_cloudwatch and let’s take a closer look at how simple this function can be.

import json
import boto3

def cw(topic,value,name):
 cloudwatch = boto3.client('cloudwatch')
 cloudwatch.put_metric_data( MetricData=[ 
 {
  'MetricName': name,
  'Dimensions': [{'Name': 'topic','Value': topic}],
  'Unit': 'None', 
  'Value': value
 }],Namespace='Telescope/Monitoring-Test')
 return

def lambda_handler(event, context):
 for e in event:
  if 'uptime' in e :
   cw(e["full_topic"],e["uptime"],"uptime")
  if 'illumination' in e :
   cw(e["full_topic"],e["illumination"],"illumination")
  if 'temperature' in e :
   cw(e["full_topic"],e["temperature"],"temperature")
 return event

Yes, that is all of the code, it really is that short.

Hopefully the code is self-explanatory, but in essence what happens is that the Lambda function in a Pipeline is passed an array of messages with the size of the array dependent on the batch size that you configure for the activity (the default of 1 means that the Lambda function will be invoked for each individual message, which is fine for scenarios where messages only arrive in the channel every few seconds).

The handler then loops through all the messages and looks to see if the message has attributes for uptime, illumination or temperature – these are the attributes that I want to graph in near real-time in this example. If the attribute exists in the message, we call the cw() function to emit the custom metric to AWS CloudWatch.

Note that the Lambda function simply returns the messages that it received so that they flow through the rest of the Pipeline.

Probably the biggest gotcha is that you need to make sure you have granted IoT Analytics permission to invoke your Lambda function, and you can do this with the following simple AWS CLI command.

aws lambda add-permission --function-name filter_to_cloudwatch --statement-id filter_to_cloudwatch_perms --principal iotanalytics.amazonaws.com --action lambda:InvokeFunction

If you forget this, and you have configured your logging options, you’ll see error messages like this for your cloudwatch log stream aws/iotanalytics/pipelines

[ERROR] Unable to execute Lambda function due to insufficient permissions; dropping the messages, number of messages dropped : 1, functionArn : arn:aws:lambda:us-west-2:<accountid>:function:filter_to_cloudwatch

And that’s it! We can now use all the loveliness of AWS CloudWatch to plot our custom metrics on graphs and dashboards with fine grained time resolution, for example this is a graph of my data every 30 seconds.

Example real-time metrics from the IoT Analytics Pipeline showing in CloudWatch

In conclusion, we’ve seen that with a little lateral thinking, we can leverage the Lambda Activity that is available in the AWS IoT Analytics Pipeline to route just the message attributes we want to a near real-time dashboard in AWS CloudWatch.

Happy Dashboarding!

Data Exploration with Amazon Quicksight and IoT Analytics

We discussed previously how to get data out of your data store by writing SQL queries to produce data sets in CSV format. What about exploring and visualising that data to make insights easier? In this post I’ll talk about the native integration between AWS IoT Analytics and Amazon QuickSight that makes this possible and fun.

First, let’s take a quick look at one of my dashboards.

This shows a variety of data about some of the sensors located in our garden (the locations on the map are not real) and as you can see it does a decent job of visualising the data at hand – but where does that data actually come from?

QuickSight has native support for AWS IoT Analytics Datasets and so your starting point is to create a new QuickSight Dataset using the Data Source selection page that looks like this.

Once you’ve selected a data set in IoT Analytics (remember, a data set is the result of a SQL query against the data store) you have some pretty nice data preparation tools in QuickSight that let you alter column types and even create new calculated columns – and this is an especially useful feature as you can, for example, take a unix timestamp and convert it into a date time object that QuickSight can use as the x-axis on many of your graphs. I wrote more about this here.