This document gives a short description of the APIs that EnergyDataDK makes available to the user.
If you are looking for a general introduction to the EnergyDataDK platform, please refer to the user manual.
The platform has three APIs: Datastream, MQTT and Batch Upload. They are described in the following sections.
Please note, that you need a token to access data through the APIs. The token serves as userID & password to verify that you are allowed to access the data. Please note, that you can get a general introduction to the API in the user manual in the API section.
Please report any error in this document or suggestions for improvement to energydatadk@dtu.dk
http:get::/api/v1/datastreams/values
ids. This should be a comma-separated list of datastream IDs.
The resource can be used to retrieve either the latest values, or values which correspond to a timespan.
Please note that this resource does not return JSON for successful requests. In these cases, response data is streamed back to the client as CSV. But you should still specify Accept: application/json in the header to ensure errors are returned to the client as JSON. Parameters for the API are:
ids: Lists of property IDs to exportfrom: ISO8601 formatted dateto: ISO8601 formatted datelatest: Boolean, true if presentlatest must be present. GET /api/v1/datastreams/values?ids=65603,65607,65863&latest=true HTTP/1.1 Host: admin.energydata.dk Accept: application/json Authorization: Bearer <your-token>
HTTP/1.1 200 OK Content-Description: File Transfer Content-Disposition: attachment; filename=data.csv Content-Type: text/csv; charset=UTF-8 Transfer-Encoding: chunked 65603,1547043555943,25.04 65607,1547043557698,25.12 65863,1544089165888,24
To retrieve all values for the given properties inside a timespan, the query parameters from and to must be present. These must be ISO8601 formatted dates, for example "2018-02-01T12:30:00". Note that both from and to are inclusive.
GET /api/v1/datastreams/values?ids=65603,65607,65863&from=2018-0201T00:00:00&to=2018-02-02T00:00:00 HTTP/1.1 Host: admin.energydata.dk Accept: application/json Authorization: Bearer <your-token>
HTTP/1.1 200 OK Content-Description: File Transfer Content-Disposition: attachment; filename=data.csv Content-Type: text/csv; charset=UTF-8 Transfer-Encoding: chunked 65607,1517443389806,25.12 65607,1517443449746,25.12 65607,1517443509696,25.12 65607,1517443569646,25.12 65607,1517443629586,25.12 65607,1517443689537,25.12 65863,1517443749495,25.12 65863,1517443809416,25.12 65863,1517443869366,24.04 65863,1517443929309,24.04 65863,1517443989256,24.04 65863,1517444049233,24.04 65863,1517444109146,24.04 65603,1517443389806,25.12 65603,1517443449746,25.12 65603,1517443509696,25.12 65603,1517443569646,25.12 65603,1517443629586,25.12 65603,1517443689537,25.12
paho-mqtt (https://pypi.python.org/pypi/paho-mqtt)): #! /usr/bin/env python
import paho.mqtt.client as mqtt
broker_host = 'mqtts.energydata.dk'
broker_port = 8883
heartbeat = 60
def connect_logger(client, userdata, flags, rc)
print('Connected with rc {}'.format(rc))
client = mqtt.Client(client_id='<your client_ID name>')
# NOTE! Above works on paho version less than 2.0.0.
# If you use paho ver. 2 or later, you need to specify call-back version. E.g.:
# client = mqtt.Client(client_id='myclient', callback_api_version=mqtt.CallbackAPIVersion.VERSION1)
client.username_pw_set('my-token')
client.on_connect = connect_logger
client.tls_set()
client.connect(broker_host, broker_port, heartbeat)
print('Connecting')
client.loop_forever() #! /bin/bash mosquitto_sub -t '#' \ -h mqtts.energydata.dk \ -p 8883 \ -i '<your client_ID name>' \ -u 'your-token'
In EnergyDataDK time series are defined as datastreams and grouped into a dataset.
To send data to EnergyDataDK using MQTT, you must create a dataset with a topic prefix and a datastream within that dataset with a topic suffix. The topic prefix is one string of alphanumeric characters, the suffix can be several alphanumeric strings separated by /. So if the prefix is my-topic-prefix and the suffix is my/topic/suffix , the complete MQTT topic of that datastream is my-topic-prefix/my/topic/suffix.
This topic will be used on the MQTT broker, to identify the datastream.
Examples with Mosquitto:
#! /bin/bash
mosquitto_sub -t 'my-topic-prefix/my/topic/suffix' \
-h mqtts.energydata.dk \
-p 8883 \
-i '<your client_ID name>' \
-u 'your-token'
#! /bin/bash
mosquitto_pub -t 'my-topic-prefix/my/topic/suffix' \
-h mqtts.energydata.dk \
-p 8883 \ -i '<your client_ID name>' \
-u 'your-token'
-m '{"value":"0.16300000250339508","timestamp":1521797973052}' MQTT messages can be formatted a one of two types:
Single value messages contain a timestamp and value, as described in the table below. The datastream associated with the message is inferred from the MQTT topic.
| Key | Type | Required | Description | Example |
|---|---|---|---|---|
| timestamp | Integer | Yes | Number of milliseconds since 1970-01-01 (Unix epoch) | 1521797973469 |
| value | String, Double or Integer | Yes | Value, encoded either as String, Double or Integer, depending on property definition in DMS | 14.47 |
Example message:
{
"timestamp": 1521797973469,
"value": 14.47
}
my-topic-prefix.
The payload has the following form:
{
"timestamp": 1521797973469,
"value": {
"my/topic/suffix1": 14.47,
"my/topic/suffix2": 34,
"my/topic/suffix3": 4.87,
"my/topic/suffix4": 1,
.... ,
"my/topic/suffixn": 300
}
}
All message payloads must be serialized as JSON.
paho-mqtt (https://pypi.python.org/pypi/pahomqtt/) library with the function wait_for_publish: #! /usr/bin/env python
import time
import json
import paho.mqtt.client as mqtt
broker_host = 'mqtts.energydata.dk'
broker_port = 8883
heartbeat = 60
client = mqtt.Client(client_id='<your client_ID name>')
client.username_pw_set('my-token')
client.connect(broker_host, broker_port, heartbeat)
# Remember to start the loop in order to receive QoS acknowledgements
client.loop_start()
for i in range(0,10000):
timestamp = int(time.time() * 1000)
message = json.dumps({'timestamp': timestamp, 'value': i})
# Publish with QoS1 ('qos=1') and wait to receive QoS acknowledgements from broker
message_info = client.publish('my-topic', message, qos=1)
message_info.wait_for_publish(timeout = 10) # will timeout after 10 seconds if acknowledgement not received curl – this is intended for manual upload, or it can be imbedded in a Python script. There's a Python helper class that makes this easier, see the next section.
Authorization: Bearer bzkMFuZgTzsuOldud98fJ5RkRHBI4Uoprdub1R" Accept: application/json
YYYY-MM-DD[T]HH:mm:ss.SSS[Z], e.g. "2021-05-01T10:12:44.432Z" for May 1st 2023, 10:12:44.432 UTC. The timestamp must be in UTC timezone.
The type of the data in a column must match the datatype of the datastream to which the column belongs. If a datastream is an integer, any values for that datastream must be integers as well.
If a field is left empty, the handling depends on the datatype of the corresponding datastream. If the datastream is of type string, an empty string will be imported. If the datastream is of type integer or double, no value will be imported.
Below is an example file. Note that the first datastream (117217) is of type string, the second datastream (my/topic) is of type double and the last datastream (119221) is of type integer.
;117217;my/topic;119221 2021-03-10T20:24:30.139Z;a_string;23.4121;-10 2021-03-10T20:24:31.144Z;"another string";999888777.121;0 2021-03-10T20:24:32.161Z;a third string;-1.33e-16;45 2021-03-10T20:24:33.186Z;;54.1;11 2021-03-10T20:24:34.201Z;a-fourth-string;;45Note the empty fields in row 5 (datastream 117217) and row 6 (my/topic). For datastream 117217, an empty string will be imported with timestamp
2021-03-10T20:24:33.186Z. For datastream my/topic, no value will be imported with timestamp 2021-03-10T20:24:34.201Z. To create an import, post an import name to https://admin.energydata.dk/api/v1/import, with the import as a URL parameter.
curl -H "Authorization: Bearer bzkMFuZgTzsuOldud98fJ5RkRHBI4Uoprdub1R" -H "Accept: application/json" -X POST
https://admin.energydata.dk/api/v1/import -G --data-urlencode "importname=batch-import-example"
# Response example
# {"user_id":11,"status":"stored","name":"batch-import-example2","updated_at":"2023-03-27T08:20:36.000000Z","created_at":"2023-03-27T08:20:36.000000Z","id":1340}
By calling get or delete on the endpointhttps://admin.energydata.dk/api/v1/import/1340 you can see or delete an import with the id 1340
Get the upload URL with the id returned in the creation above:
curl -H "Authorization: Bearer zGefiozgtCOfW70jnydNkTF35WOctQUnEMGAYKpi" -H "Accept: application/json"
https://admin.energydata.dk/api/v1/import/1340/upload_url
# Response example
# {
# "upload_url":
"https://s3.energydata.dk/import/inbox/example-user/b23sdfb23b163-4dfc-84c8-87ded19fdse1a.csv?X-Amz-ContentSha256=UNSIGNED-PAYLOAD&X-Amz-Algorithm=AWS4-HMAC-SHA256&XAmz-Credential=edk-s3root%2F20230327%2Fdefault%2Fs3%2Faws4_request&X-AmzDate=20230327T083455Z&X-Amz-SignedHeaders=host&X-AmzExpires=7200&X-AmzSignature=16697645e0f009c791e62f237dfd80e74d9ddfda83feb8801sd5 af186c81e0c0b"
}
batch-import-example.csv
curl -X PUT -H "Content-Type: application/octet-stream" -data-binary "@batch-import-example.csv" "https://s3.energydata.dk/import/inbox/example-user/b23sdfb23-b163-4dfc-84c8-87ded19fdse1a.csv?X-Amz-ContentSha256=UNSIGNED-PAYLOAD&X-Amz-Algorithm=AWS4-HMAC-SHA256&XAmz-Credential=edk-s3root%2F20230327%2Fdefault%2Fs3%2Faws4_request&X-AmzDate=20230327T083455Z&X-Amz-SignedHeaders=host&X-AmzExpires=7200&X-AmzSignature=16697645e0f009c791e62f237dfd80e74d9ddfda83feb8801sd5 af186c81e0c0b" > /dev/null
To validate the file format and data, send a PUT to https://admin.energydata.dk/api/v1/import/1340/validate, in the following example the import id 1340 is used
curl -H "Authorization: Bearer bzkMFuZgTzsuOldud98fJ5RkRHBI4Uoprdub1R" -H "Accept: application/json" -X PUT https://admin.energydata.dk/api/v1/import/1340/validate"
A correct request will just accept the task. To see the status of the validation you need to post a GET request as specified in section 2.1. On the returned import will be a status and any errors.
PUT to https://admin.energydata.dk/api/v1/import/1340/ingest, in the following example the import id 1340 is used
curl -H "Authorization: Bearer bzkMFuZgTzsuOldud98fJ5RkRHBI4Uoprdub1R" -H "Accept: application/json" -X PUT https://admin.energydata.dk/api/v1/import/1340/ingest"A correct request will just accept the task. To see the status of the ingestion you need to post a GET request as specified in section 2.1. On the returned import will be a status and any errors.
This is the documentation for a class made to make importing into EnergyDataDK easier from Python. The code can be found in this repository:
https://git.elektro.dtu.dk/energydatadk/batch-import-python-api
EnergyDataImport
class EnergyDataImport()
A simple class to make batch importing to EnergyDataDK easier from Python
This class handles building a CSV file in the correct format to be used for importing to EnergyDataDK via the batch API. The batch API itself is documented seperately (see previous section). This class can handle everything from creating the proper CSV, to uploading, validating, and ingesting the file. You can check the ‘example_import.py’ script for an example of how to use this class.
__init____init__(upload_filename: str, properties: List, sftp_username: str, sftp_password: str, energydata_api_token: str, overwrite: bool = False, tmp_dir: str = '/tmp/energydata_batch_uploud', autoclean_tmp_files: bool = True)
Constructor for class. Create an instance of EnergydataImport for each batch of data you want to upload. An instance can not be reused between multiple imports.
Arguments:
upload_filenamestring – A filename for the import file generated behind the scenes. A local file in tmp_dir will be created with this name, and the same filename is used when uploading to the SFTP server.propertieslist – List of properties to which data will be added. Can be either property id’s or topics.sftp_usernamestring – Username for the SFTP server on ‘api.energydata.dk’ from where batch imports run. Ask an admin for access credentials.sftp_passwordstring – Password for the SFTP server on ‘api.energydata.dk’ from where batch imports run. Ask an admin for access credentials.energydata_api_tokenstring – API token as found in: https://admin.energydata.dk/api-tokensoverwritebool, optional – If a local file with filename {tmp_dir}/{upload_filename} or a file with on the SFTP upload folder with {upload_filename} is found, this setting will control whatever we should overwrite the file or not. Defaults to False.tmp_dirstring, optional – A directory for temporarily storing data file before they are uploaded to SFTP. Defaults to ‘/tmp/energydata_batch_upload’.autoclean_tmp_filesbool, optional – If this parameter is set to False the files generated in tmp_dir will not be removed once import context is closed. Defaults to True.add_valuesadd_values(time: datetime, values: List)
Call while in open state to keep adding values to be batch imported.
Arguments:
time datetime – Timestamp for values to be addedvalues list – A list of the values to be addedRaises:
Exception – An exception is raised if the time argument is a datetime object without any timezone informationException – The length of the values list must be equal to the number of properties passed in the constructor, otherwise an exception is raised.uploadupload(print_progress=True)
add_values method, this method can be called to upload the generated CSV file to the SFTP server, and create a import job with the EnergyDataDK API. Once called, state of the job will transition from open to uploading, uploaded and finally to stored once the job is created. Arguments:
print_progressbool, optional – Whatever the upload progress should be continuously printed. Defaults to True.validatevalidate(block=True, print_progress=True)
When in stored state, this method can be called to trigger validation of the data to be imported. This will happen by calling the corresponding validate endpoint on EnergyDataDK. State will transition from stored to validating and finally to ready.
Arguments:
blockbool, optional – Whatever this method should block while validation is in progress. Defaults to True.print_progress bool, optional – Whatever the upload progress should be continuously printed. Defaults to True.ingestingest(block=True, print_progress=True)
When in ready state, this method can be called to trigger ingestion of the data to be imported. This will happen by calling the corresponding ingest endpoint on EnergyDataDK. State will transition from ready to ingesting and finally to done.
Arguments:
block
bool, optional – Whatever this method should block while ingestion is in progress. Defaults to True.print_progress
bool, optional – Whatever the upload progress should be continuously printed. Defaults to True.refresh_status refresh_status()
This method can be used to manually refresh and sync the internal state of the import job with the state in EnergyDataDK. If for instance validate is called with block=False, this method must be used to refresh the state before ingest can be called, once validation has completed on the server.
Returns:
[type] – Return the current status as in the form of an enum of type EnergyDataImport.Status.