In Memory Aggregation

We used the syslib.buffer() function to create an in-memory buffer to store the raw values from a data-producing object, and then used the syslib.peek() function to visualize the buffer and check that it contained values. The next step is to create another buffer that will aggregate the raw values from the first buffer.

Creating an Aggregation Buffer

To do this we will use the syslib.buffer() Lua function again but in a different variation: syslib.buffer(object_or_path, “buffer name”, input, duration, size, aggregation_period, aggregation_type) In this variation, an object or path to the item holding the buffer must still be defined, along with a name for the aggregation buffer and it size and duration. There are also two extra arguments for creating an aggregation buffer: aggregation period and aggregation type.

  • The input is the name of the buffer (as a string) that will supply the raw values for the aggregation buffer. In this example, we will use the name of the first buffer we created (“buff”)

  • The aggregation type is entered as a string and must match one of the Aggregation coding group values (e.g. AGG_TYPE_AVERAGE). See full list of aggregates and descriptions here.

  • The aggregation period represents the interval size for the aggregation in milliseconds and must be a non-negative integer. If set to zero, an aggregate is calculated every time the input buffer receives a new raw value. The aggregate will be calculated over the specified aggregate period. Each aggregate period will be started on a “whole” number time for the specified aggregate period. For example, specifying an aggregation period of 1 minute (60,000 ms) will mean that the aggregation will be calculated at 0.0 seconds at the start of each minute.

Create Objects for Aggregation Buffer

We will use an ActionItem to execute the script to create the aggregation buffer:

  1. Create an ActionItem underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → ActionItem.

  2. In the Create Object wizard, name the object “Aggregation Buffer” and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/CORE/Examples/DemoData/ProcessData/FC4711")
    syslib.buffer(obj, "minavg", "buff", 60000, 1, 60000, "AGG_TYPE_AVERAGE")
  3. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

The above script creates a buffer “minavg”, that calculates an average aggregate every minute from the values in the original buffer “buff”. The buffer has a duration of 60,000 milliseconds and a matching aggregate period of 60,000 milliseconds. The buffer size is set to only hold one value so every minute a new aggregate is calculated and the old one purged.

We can now create another GenItem to read the values using the syslib.peek() Lua function. We will also create a Data Holder item to display the aggregated value.

  1. First, create a Data Holder item underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Data Holder Item.

  2. In the Create Object wizard, name the object “FC4711 Average” and click Create.

  3. Next, create an GenItem underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Generic Item.

  4. In the Create Object wizard, name the object “Avg Buffer Reader” and change the generation period to 60,000 milliseconds. click Next.

  5. Click Next on the Limits page to go to the Generation Type section of the Wizard.

  6. On the Generation Type page, change the generation type to Lua Script Data Generator and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/CORE/Examples/DemoData/ProcessData/FC4711")
    local avg, qualities, timestamps, count = syslib.peek(obj, "minavg")
     local set = syslib.setvalue(“/System/Core/FC4711 Average”, avg[1], qualities[1], timestamps[1])
  7. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

The above script uses the syslib.peek() function to access the aggregate buffer value and the syslib.setvalue() function to write the VQT data to the “FC4711 Average” Holder item. After creating all the items the I/O Model tree should look similar to below:

Aggregation Buffer - I/O Model Tree
Figure 1. Aggregation Buffer - I/O Model Tree

After 1 minute, select the “FC4711 Average” Holder Item in the I/O Model tree and look at the faceplate in the Object Properties panel. An aggregated value should be written along with a quality and timestamp.

Aggregated Buffer Value in Faceplate - Every 60 seconds
Figure 2. Aggregated Buffer Value in Faceplate - Every 60 seconds

As was mentioned earlier, the timestamp is a “round” number of the 60 second aggregation period specified and starts at the beginning of each minute (e.g. the seconds section of the timestamp is at 00). You will notice that the timestamp is not the previous minute interval that passed but the one before. For example, if the current time is 3.18.35 PM the last recorded timestamp will be for 3.17.00 PM not 3.18.00 PM. The timestamp assigned to each aggregate is the timestamp at the beginning of the aggregation period not the end. To assign the timestamp to the end of the aggregation period you can add on the length of the aggregation period to the timestamp when setting the value of the Data Holder item in the Lua script (e.g. timestamps[1] + 60000).

Calculate a Rolling or Moving Aggregation value

So far, we have demonstrated how to set up an in-memory buffer from which we could set up aggregation buffers to perform in-memory aggregation over set interval periods. We can also set up in-memory aggregation that updates at the same rate as the item producing the raw values. The aggregation is calculated every time a new value enters the buffer. This rolling aggregation gives a constantly updating aggregated value over the interval period specified when creating the buffer. Rolling aggregates are useful as KPIs and it is often helpful to calculate an average over a longer period of time to pick up on long term trends and even out the short-term oscillations present in raw data.

Create Objects for Rolling Aggregation Buffer

Setting up a rolling aggregation buffer is similar to setting up a normal aggregation buffer. We will again use the syslib.buffer() Lua function we will use the raw values buffer “buff” as the input for the aggregation buffer.

  1. Create an ActionItem underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Action Item.

  2. In the Create Object wizard, name the object “Rolling Buffer” and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/CORE/Examples/DemoData/ProcessData/FC4711")
    syslib.buffer(obj, "rollavg", "buff", 1000, 1, 0, "AGG_TYPE_AVERAGE")
  3. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

The above code attaches the buffer to the FC4711 flow item that is populating the “buff” buffer with values. We name our rolling aggregate buffer rollavg and set the duration to 1000 milliseconds and the size to 1 (this buffer only holds one value and it purges that value every one second). The aggregation period is set to zero, this means that the buffer is updated every time the input buffer “buff” receives a new value (every second).

Next, we set up the GenItem and Data Holder items to read and display the rolling average values.

  1. Create a Data Holder Item underneath the Core by selecting the Core Object, right clicking and selecting Admin  New  DataProcessing  Data Holder Item.

  2. In the Create Object wizard, name the object “FC4711 Rolling Average” and click Create.

  3. Next, create an GenItem underneath the Core by selecting the Core Object, right clicking and selecting Admin  New  DataProcessing  Generic Item.

  4. In the Create Object wizard, name the object “Rolling Buffer Reader” and keep the generation period as 1000 milliseconds. Click Next.

  5. Click Next on the Limits page to go to the Generation Type section of the Wizard

  6. On the Generation Type page, change the generation type to Lua Script Data Generator and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/Core/Examples/DemoData/ProcessData/FC4711")
    local avg, qualities, timestamps, count = syslib.peek(obj, "rollavg")
    syslib.setvalue(
        "/System/Core/FC4711 Rolling Average",
        avg[1],
        qualities[1],
        timestamps[1]
    )
  7. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

After creating the three objects, the I/O model tree should look something like below:

Rolling Buffer - I/O Model Tree
Figure 3. Rolling Buffer - I/O Model Tree

Select the “FC4711 Rolling Average” Data Holder item in the I/O Model and observe the values updating in the Faceplate in the Object Properties panel.

Rolling Average Aggregate - Changing Value in Faceplate
Figure 4. Rolling Average Aggregate - Changing Value in Faceplate

The values are the rolling average calculated over the 60 seconds of data from the FC4711 flow item. The average aggregate is calculated every second as a new value enters the “buff” buffer. Why is the rolling average calculating the aggregate over an interval of 60 seconds when we do not specify this in the rolling aggregate buffer creation? The aggregation buffer calculates the average using every value currently in the input buffer (when we set up the input buffer “buff” we set its duration and size to 60000 ms and 60 respectively – a value every second for 60 seconds). To calculate a rolling average over different time periods a new input buffer must be created with the duration and size set to your specifications.

The rolling average value has the same timestamp as the original updating value in the FC4711 item except it is exactly one second earlier (the timestamp in this case is assigned according to the last value that enters the original buffer). To see the difference in timestamps between the original "FC4711" simulation item, the "FC4711 Average" and "FC4711 Rolling Average", add all three items to a RealTime Grid in DataStudio and observe the timestamps.

Original and Aggregated Items - RealTimeGrid
Figure 5. Original and Aggregated Items - RealTimeGrid

Custom Aggregations and Functions using the syslib.buffer

Another variation of the syslib.buffer() Lua function allows you to specify your own aggregation type by entering a custom function to aggregate data. syslib.buffer(object_or_path, “buffer name”, input, duration, size, function) The arguments for this variation are similar to the variations we have seen before with the addition of the “function” argument. The function argument takes the shape of a Lua function, entered as a string that is executed every time the input buffer receives a new value. The return value of the function is then entered into the aggregation buffer. The function has limitations though and has to be constructed within those limitations. The following example shows a template that can be used as a base to create your own custom aggregation function:

local func = [[
return function(input, peek, tear)
    local values = peek(input)
    if #values >= 10 then
        local sum = 0
        for i=1,10 do
            sum = sum + values[i]
        end
        tear(input)
        return sum / 10
    end
end
]]

The defined function has three arguments input, peek and tear. “input” is a reference to the input buffer, and “peek” and “tear” work similarly to the peek and tear functions mentioned and used earlier. This function uses “peek” to read the input buffer, when the buffer contains 10 values it uses a ‘for’ loop to add all the values in the buffer. The ‘tear’ is then used to empty the input buffer before returning the mean average of the last 10 values (which is subsequently added to the aggregation buffer).

The internal calculation part of the function can be changed but the general structure, including the arguments, is necessary for the function to work. To see this in action we will set up an aggregation buffer of this type:

  1. Create an ActionItem underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Action Item.

  2. In the Create Object wizard, name the object “Function Buffer” and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/CORE/Examples/DemoData/ProcessData/FC4711")
    local func = [[
        return function(input, peek, tear)
            local values = peek(input)
            if #values >= 10 then
                local sum = 0
                for i=1,10 do
                    sum = sum + values[i]
                end
                tear(input)
                return sum / 10
            end
        end
    ]]
    
    syslib.buffer(obj, "buff2", ".ItemValue", 10000, 10)
    syslib.buffer(obj, "funcbuff", "buff2", 10000, 1, func)
  3. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

In this ActionItem executed Lua script we firstly define the function “func”. Then, we create an initial buffer “buff2” to buffer the raw values from the FC4711 item. The function aggregation buffer "funcbuff" is then created using “buff2” as input as populating itself with the return values of the “func” function.

Next, we set up the GenItem and Data Holder items to read and display the aggregation function value:

  1. Create a Data Holder Item underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Data Holder Item.

  2. In the Create Object wizard, name the object “FC4711 Function Average” and click Create.

  3. Next, create an GenItem underneath the Core by selecting the Core Object, right clicking and selecting Admin → New → DataProcessing → Generic Item.

  4. In the Create Object wizard, name the object “Function Buffer Reader” and keep the generation period as 1000 milliseconds. click Next.

  5. Click Next on the Limits page to go to the Generation Type section of the Wizard

  6. On the Generation Type page, change the generation type to Lua Script Data Generator and in the Lua script body enter the following code:

    local obj = syslib.getobject("/System/CORE/Examples/DemoData/ProcessData/FC4711")
    local avg, qualities, timestamps, count = syslib.peek(obj, "funcbuff")
    local set = syslib.setvalue(“/System/Core/FC4711 Function Average”, avg[1], qualities[1], timestamps[1])
  7. Click Ok to confirm the Lua script and return to the Create Object wizard then click Create to create the object in the I/O Model tree.

Select the “FC4711 Function Average” Data Holder Item in the I/O model tree and observe the values updating (a value will be written after 10 seconds when the function has an input of 10 values). The item should update every 10 seconds with an average aggregation of the last 10 values. To convert this to a Rolling average you can remove the following line from the function definition: tear(input)

Removing this line stops the initial buffer being emptied after each aggregation so the function will calculate the average over the last 10 values each time a new value enters the initial buffer.