GTSB with Cloud Sink

The Generic Time Series Buffer (GTSB) provides flexible management and acknowledgement of received data. With the GTSB users have full control over the data being sent into the cloud. Several levels of reliable data buffering can be implemented, making sure that no data is lost during the transfer.

Creating the GTSB Object

  1. Select a parent object (Core, Connector, DataStoreGroup) and right-click, select Admin  New  Data Stores  Generic Time Series Buffer from the context menu to open the Create Object wizard.

  2. Give the object a unique name and enter a description if necessary. The Lua processing script will be added later.

    The Common section of the Create Generic Time Series Buffer wizard
    Figure 1. The Common Section of the Create Generic Time Series Buffer Wizard
  3. Expand the Interface Extension section and for the Extension Selector property select the intended cloud interface from the drop-down list. For this example use 'MQTT Publisher'.

    For more information regarding the configuration of MQTT Datasources, see the MQTT Datasource Hands On.
  4. In the MQTT Publisher Parameters subsection, provide a Topic.

    The MQTT Publisher Parameter subsection
    Figure 2. The MQTT Publisher Parameter Subsection
  5. Click Create to create the new GTSB object.

  6. The next step is to register the Generic Time Series Buffer in the executing Core. To do this …​

    1. Select the Core object in the I/O Model.

    2. In the Properties Panel of the Core, expand the Data Store Configuration section and click on the table icon for the Data Store Sets property.

    3. Add a new row to the Data Store Sets configuration table, providing a name and selecting the newly generated GTSB object from the drop-down list in the Data Stores column. For the ease of later reference it’s called 'The GTSB DSS' in this example.

      The Data Store Sets Table Property
      Figure 3. The Data Store Sets Table Property
    4. Then click OK to close the table and Apply to apply the changes of the properties of the Core.

First Lua Functionality

For background information regarding the configuration of Generic Time Series Buffers and how Lua functionality is added to these objects, see the Userview for the Generic Time Series Buffer object.

The first version of the script illustrates a simple usecase where a single message is published to the cloud sink and a success/failure notification is received. If the cloud interface reported a success, the notification will be acknowledged so that the message is removed from the GTSB queue.

A Simple Processing Script
local json = require("rapidjson")

local helper = {}
function helper.PAYLOADBUILDER(_, pid, v, q, t)
  return json.encode({ pid = pid, v = v, q = q, t = t })
end

return function(...)
  local iter, sink = ...

  if iter.length > 0 then
    for saf_id, prp_id, v, q, t, d in iter() do
      local payload = helper:PAYLOADBUILDER(prp_id, v, q, t)
      local suc, err = sink:SEND(payload)
      if not suc then error(err) else iter:ack(saf_id) end
    end
  end
end

Explanation

First the helper class is created. This only provides the PAYLOADBUILDER function which in this example converts the data into JSON format. It’s a representative for any implementation of preprocessing of the data.

local helper = {}
function helper.PAYLOADBUILDER(_, pid, v, q, t)
  return json.encode({ pid = pid, v = v, q = q, t = t })
end

In the main function of the processing script, 'iter' and 'sink' are variables which are passed to the script by the Generic Times Series Buffer. 'iter' is an interface to the Store and Forward system which will be used to receive messages waiting to be sent and 'sink' represents the interface to the cloud. For more details see the Generic Times Series Buffer.

Then the script checks if there is at least on message in the SaF queue which can be processed.

if iter.length > 0 then ... end

If that’s the case, the script iterates through the messages, using the following loop:

for saf_id, prp_id, v, q, t, d in iter() do ... end

Optionally, the message can be preprocessed. In this example the 'helper:PAYLOADBUILDER()' function converts the data into JSON format.

local payload = helper:PAYLOADBUILDER(prp_id, v, q, t)

Now the payload can be sent to the cloud, using the 'SEND' function of the sink object.

local suc, err = sink:SEND(payload)

If writing to the cloud sink was successful, the message can be acknowledged. Messages which are acknowledged will be removed from GTSB queue.
If writing failed, this is indicated as an error. In this case the message is not acknowledged, so that it remains in the GTSB queue.

if not suc then error(err) else iter:ack(saf_id) end
Calling a standard Lua function error will make the GTSB go into failure processing mode. This will result in the GTSB object state turning red, so that the user can see that something is not working as expected.
SaFGenericBufferRetryLatency can be used to specify how often the system attempts to send the data. This helps to avoid unnecessary spamming for cases when the cloud interface is temporally unavailable (e.g. due to a network outage).

Connecting a Data Source to the GTSB and Writing Demo Data

The easiest way to provide simulation data is to enter it manually into Variable object which will act as the data source for this example.

To create this Variable object in the I/O Model …​

  1. Right-click on the same component where GTSB is running, and select Admin  New  Data Processing  Variable from the context menu.

  2. In the Common section of the New Object wizard provide a name.

    The Common Section of the New Variable wizard
    Figure 4. The Common Section of the New Variable Wizard
  3. To connect this new Variable object to the GTSB, navigate to the Archive Options section and, for the Archive Selection property, select the Data Store Set which you created for the GTSB in the Core from the drop-down list - 'The GTSB DSS' in this example.

  4. In the Value Storage Strategy subsection, enable 'Raw History'.

    The Archive Options Section of the New Variable wizard
    Figure 5. The Archive Options Section of the New Variable Wizard
  5. Then click Create to create the new Variable object in the I/O Model.

To write some value into the variable object, select it in the I/O Model, and in the Property Panel double-click on the Faceplate to open the Write Values dialog. Enter a value which matches the selected data type (Int32 by default) then click Write.

Writing Data to the Variable object
Figure 6. Writing Data to the Variable Object
The message was received and sent by the MQTT broker
Figure 7. The Message Was Received and Sent by the MQTT Broker

Performance Improvements

The previous example is working, but it is very slow. To process thousands of messages per second, sending them one by one is not efficient.

Instead of sending messages individually, they should be collected in a Lua table which will then be passed to the SEND function of the sink interface.

The Improved Script
local json = require("rapidjson")
local helper = {}
function helper.PAYLOADBUILDER(_, pid, v, q, t)
  return json.encode({ pid = pid, v = v, q = q, t = t })
end

return function(...)
  local iter, sink = ...

  if iter.length > 0 then
    local payload = {}
    local last_saf_id = nil
    for saf_id, prp_id, v, q, t, d in iter() do
      table.insert(payload, helper:PAYLOADBUILDER(prp_id, v, q, t))
      last_saf_id = saf_id
    end

    local suc, err = sink:SEND(payload)

    if not suc then error(err) else iter:ack(last_saf_id) end
  end
end

Explanation

The script iterates over all the available messages, does the processing, and packs them into a Lua table. A variable (last_saf_id) is needed to store the last known SAF id in order to do the acknowledgement.

local payload = {}
local last_saf_id = nil
for saf_id, prp_id, v, q, t, d in iter() do
    table.insert(payload, helper:PAYLOADBUILDER(prp_id, v, q, t))
    last_saf_id = saf_id
end

Then the whole table is sent by the sink:SEND function.

local suc, err = sink:SEND(payload)

Finally, if the delivery was successful, the batch is acknowledged.

if not suc then error(err) else iter:ack(last_saf_id) end

Avoiding Duplication

The sink:SEND function works in such a way that it returns 'false' if a single message from the batch fails. It is not possible to acknowledge individual messages from the GTSB. Only a sequence of messages can be acknowledged. With cloud digestion happening asynchronously for each message, this is problematic. Situations may occur where, for example, the first 10 messages have been delivered, while the next 10 failed, and the last 10 again been delivered. Currently this can not be acknowledged without losing or duplicating messages. All the messages have to be resent, even those which had been already acknowledge by the cloud server.

In the following version of the script, a third parameter is returned by the 'sink:SEND' function: 'context'. It has information about each individual message in the batch. This information is used to identify and collect the individual messages which failed and need to be resent.

local json = require("rapidjson")
local helper = {}
function helper.PAYLOADBUILDER(_, pid, v, q, t)
  return json.encode({ pid = pid, v = v, q = q, t = t })
end

return function(...)
  local iter, sink = ...

  if iter.length > 0 then
    -- Building the payload
    local payload = {}
    local last_saf_id = nil
    for saf_id, prp_id, v, q, t, d in iter() do
      table.insert(payload, helper:PAYLOADBUILDER(prp_id, v, q, t))
      last_saf_id = saf_id
    end

    local no_attempt = 0
    while true do
      attempt = attempt + 1
      local suc, err, context = sink:SEND(payload)
      local failed_messages = \{}
      if suc then
        break
      else
        if context.proced == 0 then error(err) end
        if attempt >= 5 then error("Still could not send in 5 attempts")
        for _, message in pairs(context.messages) do
          if message. success == false then
            table.insert(failed_messages, message.message)
          end
        end
        payload = failed_messages
      end
    end

    iter:ack(last_saf_id)
  end
end

Explanation

This loop limits the attempts to resend the messages which failed previously:

local no_attempt = 0
while true do
  attempt = attempt + 1
  …
  if attempt >= 5 then error("Still could not send in 5 attempts")
end

After attempting to send the messages, the script iterates over the context information to identify the messages which could not be delivered. Then these messages are sent again.

for _, message in pairs(context.messages) do
  if message. success == false then
    table.insert(failed_messages, message.payload)
  end
end

The context.messages field will only appear if context.proc is > 0. It can be 0 if the cloud interface was not able to connect to the server, so no message processing took place.

if context.proced == 0 then error(err) end

The Schema of the 'Context' Parameter

The Context Parameter for MQTT Datasources
context = \{
    cloud = Boolean, Flag that indicates whether processing has reached the cloud interface or not
    success = Boolean, Flag that indicates the successful finish of cloud processing
    error = String, Error, in case if success if false
    heartbeat = Timestamp, The last update of the context table
    proced = Number, Number of processed messages
    acked = Number, Number of acknowledged messages by the cloud server
    failed = Number, Number of failed messages
    messages = \{, Table, an array of information about processed messages
        {
            procid = Number, Message process is
            success = Boolean, Flag that indicates that this message has been successfully processed and acknowledged by the cloud server
            error = String, Error, in case if Success if false
            topic = String, MQTT topic of the message
            message = String, MQTT message
            qos = Number, MQTT quality of service
            retain = Boolean, MQTT retain flag
        },
        …
    }

}
The Context Parameter for KAFKA Datasources
context = \{
    cloud = Boolean, Flag that indicates whether processing has reached the cloud interface or not
    success = Boolean, Flag that indicates the successful finish of cloud processing
    error = String, Error, in case if success if false
    heartbeat = Timestamp, The last update of the context table
    proced = Number, Number of processed messages
    acked = Number, Number of acknowledged messages by the cloud server
    failed = Number, Number of failed messages
    messages = \{, Table, an array of information about processed messages
        {
            procid = Number, Message process is
            success = Boolean, Flag that indicates that this message has been successfully processed and acknowledged by the cloud server
            error = String, Error, in case if Success if false
            topic = String, KAFKA topic of the message
            message = String, KAFKA message
            partition = String, KAFKA partition
            key = String, KAFKA key
        },
        …
    }
}

Handling Long Network Disruptions

So far the script has been improved so that it can handle failed messages without creating duplicates. But this will only work for sporadic failures and short network disruptions. A better solution is needed to cover long network disruptions. A better solution would be to circulate failed messages back to the event buffer queue.

For this a new Variable object is required. To create it …​

  1. Right-click on the same component where GTSB is running, and select Admin  New  Data Processing  Variable from the context menu.

  2. In the Common section of the New Object wizard provide a name, e.g. 'The GTSB Loop Buffer'.

    The Common Section of 'The GTSB Loop Buffer' Variable object
    Figure 8. The Common Section of 'The GTSB Loop Buffer' Variable Object
  3. In the Archive Options section, for the Archive Selection property, select the Data Store Set for the GTSB ('The GTSB DSS') from the drop-down list, and in the Value Storage Strategy subsection enable Raw History - as for 'The GTSB Input Variable' before.

    The Archive Selection Section of 'The GTSB Loop Buffer' Variable object
    Figure 9. The Archive Selection Section of 'The GTSB Loop Buffer' Variable Object
  4. Then click Create.

In the following version of the script, all failed messages will be written to the 'Loop Buffer' Variable object and therefore redirected back to GTSB with new SaF ids and the table will be acknowledged regardless of the cloud writing result.

local loop_buffer_prop_id = syslib.getpropertyid("../The GTSB Loop Buffer")

local json = require("rapidjson")
local helper = {}
function helper.PAYLOADBUILDER(_, pid, v, q, t)
  return json.encode({ pid = pid, v = v, q = q, t = t })
end

return function(...)
  local iter, sink = ...
  if iter.length > 0 then
    local payload = {}
    local last_saf_id = nil
    for saf_id, prp_id, v, q, t, d in iter() do
      if prp_id == loop_buffer_prop_id then
        table.insert(payload, v)
      else
        table.insert(payload, helper:PAYLOADBUILDER(prp_id, v, q, t))
      end
      last_saf_id = saf_id
    end
    local suc, err, context = sink:SEND(payload)
    if not suc then
      if context.proced == 0 then error(err) end
      for _, message in pairs(context.messages) do
        if message.failed then
          syslib.setvalue(loop_buffer_prop_id, message.message)
        end
      end
    end
    iter:ack(last_saf_id)
  end
end

Explanation

Before the message is added to the table, the script checks if this message comes from the loop buffer. In this case, no preprocessing is needed, since this already happened on the previous attempt to send.

if prp_id == loop_buffer_prop_id then table.insert(payload, v) end

All failed messages are now being send to the loop buffer variable, so those messages will circulate through GTSB until it is finally accepted by the cloud server.

for _, message in pairs(context.messages) do
  if message.failed then
    syslib.setvalue(loop_buffer_prop_id, message.message)
  end
end