import boto3
import json
import time
import urllib.request
import csv
import codecs
# Source dataset
dataset = "telescope_daily"
iota = boto3.client('iotanalytics')
dataset_url = iota.get_dataset_content(datasetName = dataset,versionId = "$LATEST")['entries'][0]['dataURI']
# Destination firehose
firehose = boto3.client('firehose')
streamName = 'export-firehose-'+dataset
# Create the delivery stream
try :
createdFirehose = firehose.create_delivery_stream(
DeliveryStreamName = streamName,
S3DestinationConfiguration = {
'RoleARN': 'arn:aws:iam::ACCOUNTID:role/firehose_rtjm',
'Prefix' : dataset+'/',
'BucketARN': 'arn:aws:s3:::rtjm.firehose.destination'
}
)
except Exception as E:
print (E)
# Wait for the delivery stream to be active
status = None
response = None
while status != 'ACTIVE':
try:
response = firehose.describe_delivery_stream(DeliveryStreamName=streamName)
status = response['DeliveryStreamDescription']['DeliveryStreamStatus']
if (status != 'ACTIVE') :
time.sleep(5)
except Exception as E:
print(E)
# Stream the CSV dataset to Firehose
# Note this code does no error checking and should not be considered production ready, but just an example
stream = urllib.request.urlopen(dataset_url)
reader = csv.DictReader(codecs.iterdecode(stream, 'utf-8'))
rows=0
for row in reader:
record = json.dumps(row)+"\n"
response = firehose.put_record(DeliveryStreamName=streamName,Record={'Data': record})
rows+=1
print('Exported '+str(rows)+' rows')