Streams API

Streams Overview

Data Streams on the device cloud provide a mechanism for storing time-series values over a long period of time. Each individual value in the time series is known as a Data Point.

There are a few basic operations supported by the device cloud on streams which are supported by the device cloud and this library. Here we give examples of each.

Listing Streams

Although it is not recommended for production applications, it is often useful when building tools to be able to fetch a list of all streams. This can be done by using StreamsAPI.get_streams():

dc = DeviceCloud('user', 'pass')
for stream in dc.streams.get_streams():
    print "%s: %s" % (stream.get_stream_id(),
                      stream.get_description())

Creating a Stream

Streams can be created in two ways, both of which are supported by this library.

  1. Create a stream explicitly using StreamsAPI.create_stream()
  2. Get a reference to a stream using StreamsAPI.get_stream() that does not yet exist and write a datapoint to it.

Here’s examples of these two methods for creating a new stream:

dc = DeviceCloud('user', 'pass')

# explicitly create a new data stream
humidity_stream = dc.streams.create_stream(
    stream_id="mystreams/hudidity",
    data_type="float",
    description="Humidity")
humidity_stream.write(Datapoint(81.2))

# create data stream implicitly
temperature_stream = streams.get_stream("/%s/temperature" % some_id)
temperature_stream.write(Datapoint(
        stream_id="mystreams/temperature" % some_id,
        data=74.1,
        description="Outside Air Temperature in F",
        data_type=STREAM_TYPE_FLOAT,
        unit="Degrees Fahrenheit"
))

Getting Information About A Stream

Whether we know a stream by id and have gotten a reference using StreamsAPI.get_stream() or have discovered it using StreamsAPI.get_streams(), the DataStream should be able to provide access to all metadata about the stream that you may need. Here we show several of them:

strm = dc.streams.get_stream("test")
print strm.get_stream_id()
print strm.get_data_type()
print strm.get_units()
print strm.get_description()
print strm.get_data_ttl()
print strm.get_rollup_ttl()
print strm.get_current_value()  # return DataPoint object

Note

DataStream.get_current_value() does not use cached values by default and will make a web service call to get the most recent current value unless use_cached is set to True when called.

Deleting a Stream

Deleting a data stream is possible by calling DataStream.delete():

strm = dc.streams.get_stream("doomed")
strm.delete()

Updating Stream Metadata

This feature is currently not supported. Some stream information may be updated by writing a DataPoint and including updated stream info elements.

DataPoint objects

The DataPoint class encapsulates all information required for both writing data points as well as retrieving information about data points stored on the device cloud.

API Documentation

Module providing classes for interacting with device cloud data streams

exception devicecloud.streams.StreamException

Base class for stream related exceptions

exception devicecloud.streams.NoSuchStreamException

Failure to find a stream based on a given id

exception devicecloud.streams.InvalidRollupDatatype

Roll-up’s are only valid on numerical data types

class devicecloud.streams.StreamsAPI(*args, **kwargs)

Provide interface for interacting with device cloud streams API

For further information, see devicecloud.streams.

create_stream(stream_id, data_type, description=None, data_ttl=None, rollup_ttl=None, units=None)

Create a new data stream on the device cloud

This method will attempt to create a new data stream on the device cloud. This method will only succeed if the stream does not already exist.

Parameters:
  • stream_id (str) – The path/id of the stream being created on the device cloud.
  • data_type (str) – The type of this stream. This must be in the set { INTEGER, LONG, FLOAT, DOUBLE, STRING, BINARY, UNKNOWN }. These values are available in constants like STREAM_TYPE_INTEGER.
  • description (str) – An optional description of this stream. See get_description().
  • data_ttl (int) – The TTL for data points in this stream. See get_data_ttl().
  • rollup_ttl (int) – The TTL for performing rollups on data. See :meth:~DataStream.get_rollup_ttl`.
  • units (str) – Units for data in this stream. See get_units()
get_streams(stream_prefix=None)

Return the iterator over streams preset on device cloud.

Parameters:stream_prefix – An optional prefix to limit the iterator to; all streams are returned if it is not specified.
Returns:iterator over all DataStream instances on the device cloud
get_stream(stream_id)

Return a reference to a stream with the given stream_id

Note that the requested stream may not exist yet. If this is the case, later calls on the stream itself may fail. To ensure that the stream exists, one can use get_stream_if_exists() which will return None if the stream is not already created.

Parameters:

stream_id – The path of the stream on the device cloud

Raises:
  • TypeError – if the stream_id provided is the wrong type
  • ValueError – if the stream_id is not properly formed
Returns:

datastream instance with the provided stream_id

Return type:

DataStream

get_stream_if_exists(stream_id)

Return a reference to a stream with the given stream_id if it exists

This works similar to get_stream() but will return None if the stream is not already created.

Parameters:

stream_id – The path of the stream on the device cloud

Raises:
  • TypeError – if the stream_id provided is the wrong type
  • ValueError – if the stream_id is not properly formed
Returns:

DataStream instance with the provided stream_id

Return type:

DataStream

bulk_write_datapoints(datapoints)

Perform a bulk write (or set of writes) of a collection of data points

This method takes a list (or other iterable) of datapoints and writes them to the device cloud in an efficient manner, minimizing the number of HTTP requests that need to be made.

As this call is performed from outside the context of any particular stream, each DataPoint object passed in must include information about the stream into which the point should be written.

If all data points being written are for the same stream, you may want to consider using bulk_write_datapoints() instead.

Example:

datapoints = []
for i in range(300):
    datapoints.append(DataPoint(
        stream_id="my/stream%d" % (i % 3),
        data_type=STREAM_TYPE_INTEGER,
        units="meters",
        data=i,
    ))
dc.streams.bulk_write_datapoints(datapoints)

Depending on the size of the list of datapoints provided, this method may need to make multiple calls to the device cloud (in chunks of 250).

Parameters:

datapoints (list) – a list of datapoints to be written to the device cloud

Raises:
  • TypeError – if a list of datapoints is not provided
  • ValueError – if any of the provided data points do not have all required information (such as information about the stream)
  • DeviceCloudHttpException – in the case of an unexpected error in communicating with the device cloud.
class devicecloud.streams.DataPoint(data, stream_id=None, description=None, timestamp=None, quality=None, location=None, data_type=None, units=None, dp_id=None, customer_id=None, server_timestamp=None)

Encapsulate information about a single data point

This class encapsulates the data required for both pushing data points to the device cloud as well as for storing and provding methods to access data from streams that has been retrieved from the device cloud.

classmethod from_json(stream, json_data)

Create a new DataPoint object from device cloud JSON data

Parameters:
  • stream (DataStream) – The DataStream out of which this data is coming
  • json_data (dict) – Deserialized JSON data from the device cloud about this device
Raises ValueError:
 

if the data is malformed

Returns:

(DataPoint) newly created DataPoint

classmethod from_rollup_json(stream, json_data)

Rollup json data from the server looks slightly different

Parameters:
  • stream (DataStream) – The DataStream out of which this data is coming
  • json_data (dict) – Deserialized JSON data from the device cloud about this device
Raises ValueError:
 

if the data is malformed

Returns:

(DataPoint) newly created DataPoint

get_id()

Get the ID of this data point if available

The ID will only exist for data points retrieved from the data point and should not be set on data points that are being created. This value is not designed to be set when creating data points.

get_data()

Get the actual data value associated with this data point

set_data(data)

Set the data for this data point

This data may be converted upon access at a later point in time based on the data type of this stream (if set).

get_stream_id()

Get the stream ID for this data point if available

set_stream_id(stream_id)

Set the stream id associated with this data point

get_description()

Get the description associated with this data point if available

set_description(description)

Set the description for this data point

get_timestamp()

Get the timestamp of this datapoint as a datetime.datetime object

This is the client assigned timestamp for this datapoint. If this was not set by the client, it will be the same as the server timestamp.

set_timestamp(timestamp)

Set the timestamp for this data point

The provided value should be either None, a datetime.datetime object, or a string with either ISO8601 or unix timestamp form.

get_server_timestamp()

Get the date and time at which the server received this data point

get_quality()

Get the quality as an integer

This is a user-defined value whose meaning (if any) could vary per stream. May not always be set.

set_quality(quality)

Set the quality for this sample

Quality is stored on the device cloud as a 32-bit integer, so the input to this function should be either None, an integer, or a string that can be converted to an integer.

get_location()

Get the location for this data point

The location will be either None or a 3-tuple of floats in the form (latitude-degrees, longitude-degrees, altitude-meters).

set_location(location)

Set the location for this data point

The location must be either None (if no location data is known) or a 3-tuple of floating point values in the form (latitude-degrees, longitude-degrees, altitude-meters).

get_data_type()

Get the data type for this data point

The data type is associted with the stream itself but may also be included in data point writes. The data type information in the point is also used to determine which type conversions should be applied to the data.

set_data_type(data_type)

Set the data type for ths data point

The data type is actually associated with the stream itself and should not (generally) vary on a point-per-point basis. That being said, if creating a new stream by writing a datapoint, it may be beneficial to include this information.

The data type provided should be in the set of available data types of { INTEGER, LONG, FLOAT, DOUBLE, STRING, BINARY, UNKNOWN }.

get_units()

Get the units of this datapoints stream if available

set_units(unit)

Set the unit for this data point

Unit, as with data_type, are actually associated with the stream and not the individual data point. As such, changing this within a stream is not encouraged. Setting the unit on the data point is useful when the stream might be created with the write of a data point.

to_xml()

Convert this datapoint into a form suitable for pushing to device cloud

An XML string will be returned that will contain all pieces of information set on this datapoint. Values not set (e.g. quality) will be ommitted.

class devicecloud.streams.DataStream(conn, stream_id, cached_data=None)

Encapsulation of a DataStream’s methods and attributes

get_stream_id()

Get the id/path of this stream

Returns:id/path of this stream
Return type:str
get_data_type(use_cached=True)

Get the data type of this stream if it exists

The data type is the type of data stored in this data stream. Valid types include:

  • INTEGER - data can be represented with a network (= big-endian) 32-bit two’s-complement integer. Data with this type maps to a python int.
  • LONG - data can be represented with a network (= big-endian) 64-bit two’s complement integer. Data with this type maps to a python int.
  • FLOAT - data can be represented with a network (= big-endian) 32-bit IEEE754 floating point. Data with this type maps to a python float.
  • DOUBLE - data can be represented with a network (= big-endian) 64-bit IEEE754 floating point. Data with this type maps to a python float.
  • STRING - UTF-8. Data with this type map to a python string
  • BINARY - Data with this type map to a python string.
  • UNKNOWN - Data with this type map to a python string.
Parameters:use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.
Returns:The data type of this stream as a string
Return type:str
get_units(use_cached=True)

Get the unit of this stream if it exists

Units are a user-defined field stored as a string

Parameters:use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.
Returns:The unit of this stream as a string
Return type:str or None
get_description(use_cached=True)

Get the description associated with this data stream

Parameters:

use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.

Raises:
Returns:

The description associated with this stream

Return type:

str or None

get_data_ttl(use_cached=True)

Retrieve the dataTTL for this stream

The dataTtl is the time to live (TTL) in seconds for data points stored in the data stream. A data point expires after the configured amount of time and is automatically deleted.

Parameters:

use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.

Raises:
Returns:

The dataTtl associated with this stream in seconds

Return type:

int or None

get_rollup_ttl(use_cached=True)

Retrieve the rollupTtl for this stream

The rollupTtl is the time to live (TTL) in seconds for the aggregate roll-ups of data points stored in the stream. A roll-up expires after the configured amount of time and is automatically deleted.

Parameters:

use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.

Raises:
Returns:

The rollupTtl associated with this stream in seconds

Return type:

int or None

get_current_value(use_cached=False)

Return the most recent DataPoint value written to a stream

The current value is the last recorded data point for this stream.

Parameters:

use_cached (bool) – If False, the function will always request the latest from the device cloud. If True, the device will not make a request if it already has cached data.

Raises:
Returns:

The most recent value written to this stream (or None if nothing has been written)

Return type:

DataPoint or None

delete()

Delete this stream from the device cloud along with its history

This call will return None on success and raise an exception in the event of an error performing the deletion.

Raises:
delete_datapoint(datapoint)

Delete the provided datapoint from this stream

Raises devicecloud.DeviceCloudHttpException:
 in the case of an unexpected http error
delete_datapoints_in_time_range(start_dt=None, end_dt=None)

Delete datapoints from this stream between the provided start and end times

If neither a start or end time is specified, all data points in the stream will be deleted.

Parameters:
  • start_dt – The datetime after which data points should be deleted or None if all data points from the beginning of time should be deleted.
  • end_dt – The datetime before which data points should be deleted or None if all data points until the current time should be deleted.
Raises devicecloud.DeviceCloudHttpException:
 

in the case of an unexpected http error

bulk_write_datapoints(datapoints)

Perform a bulk write of a number of datapoints to this stream

It is assumed that all datapoints here are to be written to this stream and the stream_id on each will be set by this method to this streams id (regardless of whether it is set or not). To write multiple datapoints which span multiple streams, use bulk_write_endpoints() instead.

Parameters:datapoints (list) – A list of datapoints to be written into this stream
write(datapoint)

Write some raw data to a stream using the DataPoint API

This method will mutate the datapoint provided to populate it with information available from the stream as it is available (but without making any new HTTP requests). For instance, we will add in information about the stream data type if it is available so that proper type conversion happens.

Values already set on the datapoint will not be overridden (except for path)

Parameters:datapoint (DataPoint) – The DataPoint that should be written to the device cloud
read(start_time=None, end_time=None, use_client_timeline=True, newest_first=True, rollup_interval=None, rollup_method=None, timezone=None, page_size=1000)

Read one or more DataPoints from a stream

Warning

The data points from the device cloud is a paged data set. When iterating over the result set there could be delays when we hit the end of a page. If this is undesirable, the caller should collect all results into a data structure first before iterating over the result set.

Parameters:
  • start_time (datetime.datetime or None) – The start time for the window of data points to read. None means that we should start with the oldest data available.
  • end_time (datetime.datetime or None) – The end time for the window of data points to read. None means that we should include all points received until this point in time.
  • use_client_timeline (bool) – If True, the times used will be those provided by clients writing data points into the cloud (which also default to server time if the a timestamp was not included by the client). This is usually what you want. If False, the server timestamp will be used which records when the data point was received.
  • newest_first (bool) – If True, results will be ordered from newest to oldest (descending order). If False, results will be returned oldest to newest.
  • rollup_interval (str or None) – the roll-up interval that should be used if one is desired at all. Rollups will not be performed if None is specified for the interval. Valid roll-up interval values are None, “half”, “hourly”, “day”, “week”, and “month”. See DataPoints documentation for additional details on these values.
  • rollup_method (str or None) – The aggregation applied to values in the points within the specified rollup_interval. Available methods are None, “sum”, “average”, “min”, “max”, “count”, and “standarddev”. See DataPoint documentation for additional details on these values.
  • timezone (str or None) – timezone for calculating roll-ups. This determines roll-up interval boundaries and only applies to roll-ups of a day or larger (for example, day, week, or month). Note that it does not apply to the startTime and endTime parameters. See the Timestamps and Supported Time Zones sections for more information.
  • page_size (int) – The number of results that we should attempt to retrieve from the device cloud in each page. Generally, this can be left at its default value unless you have a good reason to change the parameter for performance reasons.
Returns:

A generator object which one can iterate over the DataPoints read.