Example Application Code

The examples below show fully realized examples using the available class methods.

Read/Write example

import asyncio
from random import randint
from inmation_api_client import Client, Options, Item, ItemValue

CLIENT = None

USERNAME = 'user'
PASSWORD = 'pass'
WS_URL = "ws://127.0.0.1:8000/ws"

def main():
    global CLIENT
    event_loop = asyncio.get_event_loop()
    CLIENT = create_api_client(event_loop)

    items_path = "/System/Core/"
    items = [
        Item(items_path + 'Item01'),
        Item(items_path + 'Item02'),
        Item(items_path + 'Item03'),
    ]
    # Read items
    event_loop.run_until_complete(read_items_at_once(items))

    write_items = [
        ItemValue(items_path + 'Item01', randint(1, 2**10)),
        ItemValue(items_path + 'Item02', randint(1, 2**10)),
        ItemValue(items_path + 'Item03', randint(1, 2**10)),
    ]
    # Write items
    event_loop.run_until_complete(write_items_at_once(write_items))

async def read_items_at_once(items):
    def read_cbk(*args):
        err = args[0]
        if err:
            print(err.message)
            return
        else:
            _items = args[1]
            if isinstance(_items, list):
                print("{} - {}".format('Item', 'Value'))
                for item in _items:
                    if 'error' in item:
                        print('Error: {}'.format(item['error']['msg']))
                    else:
                        item_val = item['v'] if 'v' in item else 'No Value'
                        print("{} - {}".format(item['p'], item_val))
    await CLIENT.read(items, read_cbk)

async def write_items_at_once(items):
    def write_cbk(*args):
        err = args[0]
        if err:
            print(err.message)
            return
        else:
            _items = args[1]
            if isinstance(_items, list):
                print("{} - {}".format('Item', 'Value'))
                for item in _items:
                    if 'error' in item:
                        print('Error: {}'.format(item['error']['msg']))
                    else:
                        print("{} - {}".format(item['p'], item['v']))
    await CLIENT.write(items, write_cbk)

def create_api_client(ioloop):
    client = Client(ioloop)
    ioloop.run_until_complete(client.connect_ws(WS_URL, Options(USERNAME, PASSWORD)))

    def connection_changed(conn_info):
        """ closure """
        print('Connection state: {}, {}, authenticated: {}'.format(conn_info.state, conn_info.state_string, conn_info.authenticated))
    client.on_ws_connection_changed(connection_changed)

    def on_error(err):
        if err:
            print("Error: {}".format(err.message))
    client.on_error(on_error)

    return client

if __name__ == '__main__':
    main()

Read Raw Historical Data Example

import asyncio
from datetime import datetime, timedelta
from inmation_api_client import Client, Options, HistoricalDataItem

CLIENT = None

USERNAME = 'user'
PASSWORD = 'pass'
WS_URL = "ws://127.0.0.1:8000/ws"

def main():
    global CLIENT
    event_loop = asyncio.get_event_loop()
    CLIENT = create_api_client(event_loop)

    items_path = "/System/Core/"
    items = [
        HistoricalDataItem(items_path + 'Item01', "AGG_TYPE_RAW"),
        HistoricalDataItem(items_path + 'Item02', "AGG_TYPE_RAW"),
    ]

    # Read historical data
    read_historical_data(items)

    event_loop.run_forever()
    event_loop.close()

def read_historical_data(items):
    now = datetime.now()
    now_minus_month = now + timedelta(-30)
    format = '%Y-%m-%dT%H:%M:%S.000Z'

    hd_stime = now_minus_month.strftime(format)
    hd_etime = now.strftime(format)

    num_of_intervals = 3

    def rhd_cbk(*args):
        err = args[0]
        if err:
            print(err.message)
            return
        else:
            _items = args[1]
            if isinstance(_items, dict):
                print("{} {}{}".format("Item",'\t'*7, "Values"))

                for item in _items['items']:
                    if 'intervals' in item:
                        val = None
                        for num_interval in range(0, num_of_intervals):
                            if 'V' in item['intervals'][num_interval]:
                                val = item['intervals'][num_interval]['V']
                            print("{} (Interval {:d}): {}".format(item['p'], num_interval+1, val))

    asyncio.ensure_future(CLIENT.read_historical_data(items, hd_stime, hd_etime, num_of_intervals, rhd_cbk))

def create_api_client(ioloop):
    client = Client(ioloop)
    ioloop.run_until_complete(client.connect_ws(WS_URL, Options(USERNAME, PASSWORD)))

    def connection_changed(conn_info):
        print('Connection state: {}, {}, authenticated: {}'.format(conn_info.state, conn_info.state_string, conn_info.authenticated))
    client.on_ws_connection_changed(connection_changed)

    def on_error(err):
        if err:
            print("Error\t {}".format(err.message))
    client.on_error(on_error)

    return client

if __name__ == '__main__':
    main()