Streams API¶
Streams Overview¶
Data Streams on Device Cloud provide a mechanism for storing time-series values over a long period of time. Each individual value in the time series is known as a Data Point.
There are a few basic operations supported by Device Cloud on streams which are supported by Device Cloud and this library. Here we give examples of each.
Listing Streams¶
Although it is not recommended for production applications, it is often useful
when building tools to be able to fetch a list of all streams. This can be
done by using StreamsAPI.get_streams()
:
dc = DeviceCloud('user', 'pass')
for stream in dc.streams.get_streams():
print "%s: %s" % (stream.get_stream_id(),
stream.get_description())
Creating a Stream¶
Streams can be created in two ways, both of which are supported by this library.
Create a stream explicitly using
StreamsAPI.create_stream()
Get a reference to a stream using
StreamsAPI.get_stream()
that does not yet exist and write a datapoint to it.
Here’s examples of these two methods for creating a new stream:
dc = DeviceCloud('user', 'pass')
# explicitly create a new data stream
humidity_stream = dc.streams.create_stream(
stream_id="mystreams/hudidity",
data_type="float",
description="Humidity")
humidity_stream.write(Datapoint(81.2))
# create data stream implicitly
temperature_stream = streams.get_stream("/%s/temperature" % some_id)
temperature_stream.write(Datapoint(
stream_id="mystreams/temperature" % some_id,
data=74.1,
description="Outside Air Temperature in F",
data_type=STREAM_TYPE_FLOAT,
unit="Degrees Fahrenheit"
))
Getting Information About A Stream¶
Whether we know a stream by id and have gotten a reference using
StreamsAPI.get_stream()
or have discovered it using
StreamsAPI.get_streams()
, the DataStream
should
be able to provide access to all metadata about the stream that
you may need. Here we show several of them:
strm = dc.streams.get_stream("test")
print strm.get_stream_id()
print strm.get_data_type()
print strm.get_units()
print strm.get_description()
print strm.get_data_ttl()
print strm.get_rollup_ttl()
print strm.get_current_value() # return DataPoint object
Note
DataStream.get_current_value()
does not use cached values by
default and will make a web service call to get the most recent current
value unless use_cached
is set to True when called.
Deleting a Stream¶
Deleting a data stream is possible by calling DataStream.delete()
:
strm = dc.streams.get_stream("doomed")
strm.delete()
Updating Stream Metadata¶
This feature is currently not supported. Some stream information may
be updated by writing a DataPoint
and including updated
stream info elements.
DataPoint objects¶
The DataPoint
class encapsulates all information required for
both writing data points as well as retrieving information about data
points stored on Device Cloud.
API Documentation¶
Module providing classes for interacting with device cloud data streams
- exception devicecloud.streams.StreamException¶
Base class for stream related exceptions
- exception devicecloud.streams.NoSuchStreamException¶
Failure to find a stream based on a given id
- exception devicecloud.streams.InvalidRollupDatatype¶
Roll-up’s are only valid on numerical data types
- class devicecloud.streams.StreamsAPI(*args, **kwargs)¶
Provide interface for interacting with device cloud streams API
For further information, see
devicecloud.streams
.- create_stream(stream_id, data_type, description=None, data_ttl=None, rollup_ttl=None, units=None)¶
Create a new data stream on Device Cloud
This method will attempt to create a new data stream on Device Cloud. This method will only succeed if the stream does not already exist.
- Parameters
stream_id (str) – The path/id of the stream being created on Device Cloud.
data_type (str) – The type of this stream. This must be in the set { INTEGER, LONG, FLOAT, DOUBLE, STRING, BINARY, UNKNOWN }. These values are available in constants like
STREAM_TYPE_INTEGER
.description (str) – An optional description of this stream. See
get_description()
.data_ttl (int) – The TTL for data points in this stream. See
get_data_ttl()
.rollup_ttl (int) – The TTL for performing rollups on data. See :meth:~DataStream.get_rollup_ttl`.
units (str) – Units for data in this stream. See
get_units()
- get_streams(stream_prefix=None)¶
Return the iterator over streams preset on device cloud.
- Parameters
stream_prefix – An optional prefix to limit the iterator to; all streams are returned if it is not specified.
- Returns
iterator over all
DataStream
instances on Device Cloud
- get_stream(stream_id)¶
Return a reference to a stream with the given
stream_id
Note that the requested stream may not exist yet. If this is the case, later calls on the stream itself may fail. To ensure that the stream exists, one can use
get_stream_if_exists()
which will return None if the stream is not already created.- Parameters
stream_id – The path of the stream on Device Cloud
- Raises
TypeError – if the stream_id provided is the wrong type
ValueError – if the stream_id is not properly formed
- Returns
datastream instance with the provided stream_id
- Return type
- get_stream_if_exists(stream_id)¶
Return a reference to a stream with the given
stream_id
if it existsThis works similar to
get_stream()
but will return None if the stream is not already created.- Parameters
stream_id – The path of the stream on Device Cloud
- Raises
TypeError – if the stream_id provided is the wrong type
ValueError – if the stream_id is not properly formed
- Returns
DataStream
instance with the provided stream_id- Return type
- bulk_write_datapoints(datapoints)¶
Perform a bulk write (or set of writes) of a collection of data points
This method takes a list (or other iterable) of datapoints and writes them to Device Cloud in an efficient manner, minimizing the number of HTTP requests that need to be made.
As this call is performed from outside the context of any particular stream, each DataPoint object passed in must include information about the stream into which the point should be written.
If all data points being written are for the same stream, you may want to consider using
bulk_write_datapoints()
instead.Example:
datapoints = [] for i in range(300): datapoints.append(DataPoint( stream_id="my/stream%d" % (i % 3), data_type=STREAM_TYPE_INTEGER, units="meters", data=i, )) dc.streams.bulk_write_datapoints(datapoints)
Depending on the size of the list of datapoints provided, this method may need to make multiple calls to Device Cloud (in chunks of 250).
- Parameters
datapoints (list) – a list of datapoints to be written to Device Cloud
- Raises
TypeError – if a list of datapoints is not provided
ValueError – if any of the provided data points do not have all required information (such as information about the stream)
DeviceCloudHttpException – in the case of an unexpected error in communicating with Device Cloud.
- class devicecloud.streams.DataPoint(data, stream_id=None, description=None, timestamp=None, quality=None, location=None, data_type=None, units=None, dp_id=None, customer_id=None, server_timestamp=None)¶
Encapsulate information about a single data point
This class encapsulates the data required for both pushing data points to the device cloud as well as for storing and provding methods to access data from streams that has been retrieved from Device Cloud.
- classmethod from_json(stream, json_data)¶
Create a new DataPoint object from device cloud JSON data
- Parameters
stream (DataStream) – The
DataStream
out of which this data is comingjson_data (dict) – Deserialized JSON data from Device Cloud about this device
- Raises
ValueError – if the data is malformed
- Returns
- classmethod from_rollup_json(stream, json_data)¶
Rollup json data from the server looks slightly different
- Parameters
stream (DataStream) – The
DataStream
out of which this data is comingjson_data (dict) – Deserialized JSON data from Device Cloud about this device
- Raises
ValueError – if the data is malformed
- Returns
- get_id()¶
Get the ID of this data point if available
The ID will only exist for data points retrieved from the data point and should not be set on data points that are being created. This value is not designed to be set when creating data points.
- get_data()¶
Get the actual data value associated with this data point
- set_data(data)¶
Set the data for this data point
This data may be converted upon access at a later point in time based on the data type of this stream (if set).
- get_stream_id()¶
Get the stream ID for this data point if available
- set_stream_id(stream_id)¶
Set the stream id associated with this data point
- get_description()¶
Get the description associated with this data point if available
- set_description(description)¶
Set the description for this data point
- get_timestamp()¶
Get the timestamp of this datapoint as a
datetime.datetime
objectThis is the client assigned timestamp for this datapoint. If this was not set by the client, it will be the same as the server timestamp.
- set_timestamp(timestamp)¶
Set the timestamp for this data point
The provided value should be either None, a datetime.datetime object, or a string with either ISO8601 or unix timestamp form.
- get_server_timestamp()¶
Get the date and time at which the server received this data point
- get_quality()¶
Get the quality as an integer
This is a user-defined value whose meaning (if any) could vary per stream. May not always be set.
- set_quality(quality)¶
Set the quality for this sample
Quality is stored on Device Cloud as a 32-bit integer, so the input to this function should be either None, an integer, or a string that can be converted to an integer.
- get_location()¶
Get the location for this data point
The location will be either None or a 3-tuple of floats in the form (latitude-degrees, longitude-degrees, altitude-meters).
- set_location(location)¶
Set the location for this data point
The location must be either None (if no location data is known) or a 3-tuple of floating point values in the form (latitude-degrees, longitude-degrees, altitude-meters).
- get_data_type()¶
Get the data type for this data point
The data type is associted with the stream itself but may also be included in data point writes. The data type information in the point is also used to determine which type conversions should be applied to the data.
- set_data_type(data_type)¶
Set the data type for ths data point
The data type is actually associated with the stream itself and should not (generally) vary on a point-per-point basis. That being said, if creating a new stream by writing a datapoint, it may be beneficial to include this information.
The data type provided should be in the set of available data types of { INTEGER, LONG, FLOAT, DOUBLE, STRING, BINARY, UNKNOWN }.
- get_units()¶
Get the units of this datapoints stream if available
- set_units(unit)¶
Set the unit for this data point
Unit, as with data_type, are actually associated with the stream and not the individual data point. As such, changing this within a stream is not encouraged. Setting the unit on the data point is useful when the stream might be created with the write of a data point.
- to_xml()¶
Convert this datapoint into a form suitable for pushing to device cloud
An XML string will be returned that will contain all pieces of information set on this datapoint. Values not set (e.g. quality) will be ommitted.
- class devicecloud.streams.DataStream(conn, stream_id, cached_data=None)¶
Encapsulation of a DataStream’s methods and attributes
- get_data_type(use_cached=True)¶
Get the data type of this stream if it exists
The data type is the type of data stored in this data stream. Valid types include:
INTEGER - data can be represented with a network (= big-endian) 32-bit two’s-complement integer. Data with this type maps to a python int.
LONG - data can be represented with a network (= big-endian) 64-bit two’s complement integer. Data with this type maps to a python int.
FLOAT - data can be represented with a network (= big-endian) 32-bit IEEE754 floating point. Data with this type maps to a python float.
DOUBLE - data can be represented with a network (= big-endian) 64-bit IEEE754 floating point. Data with this type maps to a python float.
STRING - UTF-8. Data with this type map to a python string
BINARY - Data with this type map to a python string.
UNKNOWN - Data with this type map to a python string.
- get_units(use_cached=True)¶
Get the unit of this stream if it exists
Units are a user-defined field stored as a string
- get_description(use_cached=True)¶
Get the description associated with this data stream
- Parameters
use_cached (bool) – If False, the function will always request the latest from Device Cloud. If True, the device will not make a request if it already has cached data.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
devicecloud.streams.NoSuchStreamException – if this stream has not yet been created
- Returns
The description associated with this stream
- Return type
str or None
- get_data_ttl(use_cached=True)¶
Retrieve the dataTTL for this stream
The dataTtl is the time to live (TTL) in seconds for data points stored in the data stream. A data point expires after the configured amount of time and is automatically deleted.
- Parameters
use_cached (bool) – If False, the function will always request the latest from Device Cloud. If True, the device will not make a request if it already has cached data.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
devicecloud.streams.NoSuchStreamException – if this stream has not yet been created
- Returns
The dataTtl associated with this stream in seconds
- Return type
int or None
- get_rollup_ttl(use_cached=True)¶
Retrieve the rollupTtl for this stream
The rollupTtl is the time to live (TTL) in seconds for the aggregate roll-ups of data points stored in the stream. A roll-up expires after the configured amount of time and is automatically deleted.
- Parameters
use_cached (bool) – If False, the function will always request the latest from Device Cloud. If True, the device will not make a request if it already has cached data.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
devicecloud.streams.NoSuchStreamException – if this stream has not yet been created
- Returns
The rollupTtl associated with this stream in seconds
- Return type
int or None
- get_current_value(use_cached=False)¶
Return the most recent DataPoint value written to a stream
The current value is the last recorded data point for this stream.
- Parameters
use_cached (bool) – If False, the function will always request the latest from Device Cloud. If True, the device will not make a request if it already has cached data.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
devicecloud.streams.NoSuchStreamException – if this stream has not yet been created
- Returns
The most recent value written to this stream (or None if nothing has been written)
- Return type
DataPoint
or None
- delete()¶
Delete this stream from Device Cloud along with its history
This call will return None on success and raise an exception in the event of an error performing the deletion.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
devicecloud.streams.NoSuchStreamException – if this stream has already been deleted
- delete_datapoint(datapoint)¶
Delete the provided datapoint from this stream
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
- delete_datapoints_in_time_range(start_dt=None, end_dt=None)¶
Delete datapoints from this stream between the provided start and end times
If neither a start or end time is specified, all data points in the stream will be deleted.
- Parameters
start_dt – The datetime after which data points should be deleted or None if all data points from the beginning of time should be deleted.
end_dt – The datetime before which data points should be deleted or None if all data points until the current time should be deleted.
- Raises
devicecloud.DeviceCloudHttpException – in the case of an unexpected http error
- bulk_write_datapoints(datapoints)¶
Perform a bulk write of a number of datapoints to this stream
It is assumed that all datapoints here are to be written to this stream and the stream_id on each will be set by this method to this streams id (regardless of whether it is set or not). To write multiple datapoints which span multiple streams, use
bulk_write_endpoints()
instead.- Parameters
datapoints (list) – A list of datapoints to be written into this stream
- write(datapoint)¶
Write some raw data to a stream using the DataPoint API
This method will mutate the datapoint provided to populate it with information available from the stream as it is available (but without making any new HTTP requests). For instance, we will add in information about the stream data type if it is available so that proper type conversion happens.
Values already set on the datapoint will not be overridden (except for path)
- read(start_time=None, end_time=None, use_client_timeline=True, newest_first=True, rollup_interval=None, rollup_method=None, timezone=None, page_size=1000)¶
Read one or more DataPoints from a stream
Warning
The data points from Device Cloud is a paged data set. When iterating over the result set there could be delays when we hit the end of a page. If this is undesirable, the caller should collect all results into a data structure first before iterating over the result set.
- Parameters
start_time (
datetime.datetime
or None) – The start time for the window of data points to read. None means that we should start with the oldest data available.end_time (
datetime.datetime
or None) – The end time for the window of data points to read. None means that we should include all points received until this point in time.use_client_timeline (bool) – If True, the times used will be those provided by clients writing data points into the cloud (which also default to server time if the a timestamp was not included by the client). This is usually what you want. If False, the server timestamp will be used which records when the data point was received.
newest_first (bool) – If True, results will be ordered from newest to oldest (descending order). If False, results will be returned oldest to newest.
rollup_interval (str or None) – the roll-up interval that should be used if one is desired at all. Rollups will not be performed if None is specified for the interval. Valid roll-up interval values are None, “half”, “hourly”, “day”, “week”, and “month”. See DataPoints documentation for additional details on these values.
rollup_method (str or None) – The aggregation applied to values in the points within the specified rollup_interval. Available methods are None, “sum”, “average”, “min”, “max”, “count”, and “standarddev”. See DataPoint documentation for additional details on these values.
timezone (str or None) – timezone for calculating roll-ups. This determines roll-up interval boundaries and only applies to roll-ups of a day or larger (for example, day, week, or month). Note that it does not apply to the startTime and endTime parameters. See the Timestamps and Supported Time Zones sections for more information.
page_size (int) – The number of results that we should attempt to retrieve from the device cloud in each page. Generally, this can be left at its default value unless you have a good reason to change the parameter for performance reasons.
- Returns
A generator object which one can iterate over the DataPoints read.