PIBridge Datapipes

The inmationPIBridge service implements the OSI PI (AF) SDK functionality called DataPipes.
https://docs.osisoft.com/bundle/af-sdk/page/html/T_OSIsoft_AF_Data_AFDataPipe.htm

This functionality allows to subscribe to different types of events of the OSI PI historian, including late delivery of data, which is the reason of the introduction of this functionality within the inmation PI Bridge.

Using DataPipes the inmation Connector can receive data with a historical timestamp (older than the most recent time) on an event basis via an event subscription. Please refer to the OSI PI documentation for more details.

This functionality is not used so far by any of the higher level objects like History Transporter within the.

Configuration of the inmation PI Bridge Service

Before using the new PI Bridge instance, be sure to check the configuration options applicable to the DataPipe subscription feature. You can find the options in the inmationPIBridge.exe.config file in the package installation folder.

pibridge datapipes 6
Table 1. PI Bridge Configuration Options related to DataPipe Subscription
parameter description default setting

SubscriptionMaxItemsCount

Events are buffered by the PI Bridge until they are retrieved by the Connector. This is maximum number of events being buffered. If events are coming in faster then being retrieved by the Connector and this maximum number is reached, the buffer will overflow and data will be lost.

Default: 10.000

SubscriptionMaxReturnedItemsCount

When retrieving events from the PI Bridge buffer with one of the Lua functions provided (see below), they will be returned as an array. This setting defines the maximum number of events retrieved (=maximum array size returned) with one Lua call.

Default: 1.000

SubscriptionMaxGetEventsIntervalSeconds

Events have to be retrieved from the PI Bridge buffer regularly. Otherwise the corresponding DataPipe will automatically be removed. This setting defines the maximum time in seconds that the buffer and DataPipe will be held in memory without the Connector retrieving data.

Default: 60

SubscriptionRefreshRateSeconds

Events have to be fetched by the inmation PI Bridge from the OSI PI Server that buffers events itself. This is done periodically in the rate specified by this setting.

Default: 10

Connector-PI Bridge Communication

The Connector communicates with inmation PI Bridge via a Lua interface. This interface is defined in an embedded Lua library called esi-pibridge.

The functions for the DataPipe functionality in the Lua library are:

  • DATAPIPESSUBSCRIBE

  • GETARCHIVESIGNUPSFROMPIDATAPIPE

  • GETSNAPSHOTSIGNUPSFROMPIDATAPIPE

  • GETSIGNUPSFROMAFDATAPIPE

  • GETSIGNUPSFROMAFDATAPIPEBYPISERVER

  • GETARCHIVESIGNUPSFROMPIDATAPIPEBYPISERVER

  • GETSNAPSHOTSIGNUPSFROMPIDATAPIPEBYPISERVER

  • GETTIMESERIESSIGNUPSFROMPIDATAPIPEBYPISERVER

  • GETTIMESERIESSIGNUPSFROMPIDATAPIPE

  • DATAPIPESUNSUBSCRIBE

  • ADDSIGNUPSWITHINITEVENTSTOAFDATAPIPE

  • ADDSIGNUPSTOAFDATAPIPE

  • ADDARCHIVESIGNUPSTOPIDATAPIPE

  • REMOVESIGNUPSFROMAFDATAPIPE

  • REMOVEARCHIVESIGNUPSFROMPIDATAPIPE

  • REMOVESNAPSHOTSIGNUPSFROMPIDATAPIPE

  • REMOVETIMESERIESSIGNUPSFROMPIDATAPIPE

  • ADDTIMESERIESSIGNUPSWITHINITEVENTSTOPIDATAPIPE

  • ADDSNAPSHOTSIGNUPSTOPIDATAPIPE

  • ADDTIMESERIESSIGNUPSTOPIDATAPIPE

  • GETEVENTSFROMAFDATAPIPE

  • GETARCHIVEEVENTSFROMPIDATAPIPE

  • GETSNAPSHOTEVENTSFROMPIDATAPIPE

  • GETTIMESERIESEVENTSFROMPIDATAPIPE

They directly map to the corresponding functions in the PI (AF) SDK.

DataPipes Configuration

DataPipes can be used by Lua executing objects (like Generic Items or Action Items) which are located underneath the Connector object for the PI Server host in the I/O Model.

For this example, right-click the Connector object on the PI Server host (or a folder underneath) and select Admin  New  Data Processing  Generic Item.

  1. In the Common section of the New Object wizard, provide a suitable name and, optionally, a description.

    Creating a new Generic Item - the Common Section
    Figure 1. Creating a new Generic Item - the Common Section
  2. Then move on to the Generation Type section. From the drop-down options list, select 'Lua Script Data Generator', then click on the …​ icon to open the Script Editor dialog.

    Opening the Script Editor dialog
    Figure 2. Opening the Script Editor dialog
  3. Copy the script below into the Script Editor dialog and adjust the values for the host and pisvr variables according to your situation.

  4. The click OK to close the dialog.

    The script in the Script Editor dialog
    Figure 3. The script in the Script Editor dialog
  5. Then click Create to create the Generic Item object in the I/O Model.

Underneath the Generic Item running the script, a number of Variable objects are created for data coming through the DataPipes.

Variables with the data coming through the DataPipes
Figure 4. Variables with the data coming through the DataPipes

The Example Lua Script

The following example shows the basic usage in a GenericItem.

-- DataPipes Example

local J = require'rapidjson'
local PB = require'esi-pibridge'
local host = "127.0.0.1"  --IP ADDRESS OF PI BRIDGE HOST
local pisvr = "PISVR001"  --NAME OF THE PISERVER TO CONNECT TO
local port = 6668     --LISTENING PORT OF PI BRIDGE
local timeout = 10      --CONNECTION TIME OUT TO PI BRIDGE
local cnt = 0       --GENITEM EXECUTION COUNTER
local subscriptionhandle   --VARIABLE PERSISTING ACROSS GENITEM EXECUTIONS
local tags = "*"      --TAG FILTER FOR, DO !-NOT-! USE "*" IN PRODUCTION

--MAIN function
return function()
  cnt = cnt + 1
  local result, exception
  local retval= {}
  --THIS SECTION ONLY EXECUTES ON THE FIRST CALL
  if cnt == 1 then (1)
    PB:SETCONNECTION{
      ["port"] = port,
      ["host"] = host,
      ["timeout"] = timeout,
      ["debugoption"] = false
    }

    --CONNECT TO PISERVER
    result, exception = PB:CONNECTTOPI{["piserver"] = pisvr}
    if exception ~= nil then return J.encode(exception) end

    --CREATE THE DATAPIPE SUBSCRIPTION, PRESERVRE THE HANDLE
    subscriptionhandle, exception = PB:DATAPIPESSUBSCRIBE{["piserver"] = pisvr} (2)
    if exception ~= nil then return J.encode(exception) end

    --ADD TAGS TO THE SUBSCRIPTION
    result, exception = PB:ADDSNAPSHOTSIGNUPSTOPIDATAPIPE{
                ["piserver"] = pisvr,
                ["handle"] = subscriptionhandle,
                ["tags"] = {tags}
    }

    if exception ~= nil then return J.encode(exception) end
    else

      --THIS PART EXECUTES FROM CALL #2 ONWARDS
      local moreevents = true

      while moreevents do
        result, exception = PB:GETSNAPSHOTEVENTSFROMPIDATAPIPE{
          ["piserver"] = pisvr,
          ["handle"] = subscriptionhandle
        }

      if exception ~= nil then
        --IF WE CAN NOT GET EVENTS ANYMORE, WE TRY TO RECONNECT ON NEXT CALL
        cnt = 0
        return J.encode(exception)
      end

      --SAVE RESULT FOR OUTPUT
      table.insert(retval, result)

      --#EVENTS ARE LIMTIED BY CONFIG, WHEN THERE ARE MORE EVENTS,
      --GET-FUNC IS CALLED IMMEDIATELY AGAIN BY WHILE LOOP
      moreevents = result.moreevents (3)

      --IF DATAPIPE OVERFLOWS, WE STEP OUT
      --RECONFIGURE YOUR PIBRIDGE OR PROCESS DATA FASTER IF YOU ENOUNTER THIS
      if result.overflow then error("Overflow in data pipe buffer!") end (4)

    end

  end

  --PROCESS THE RESULTS (5)
  --NOTE: DOES NOTE TAKE IDENTICAL TAG NAMES INTO ACCOUNT!
  local V = require("esi-variables")
  for i=1, #retval do
    local curres = retval[i]
    local curevents = curres.events
    for ii=1, #curevents do
      local p = curevents[ii].Value.name
      local t = curevents[ii].Value.t
      local v = curevents[ii].Value.v
      V:SET(p,v,0,t)
    end
  end

  --RETURN THE RESULT AS JSON IN ADDITION
  return J.encode(retval) (6)

end
1 The execution counter cnt is used to create the DataPipe subscription only once.
2 Note how subscriptionhandle is persisted over multiple executions of the GenericItem.
3 The moreevents and …​
4 overflow indicators in the results of the event retrieval are used to react on true/false values. In this example, the execution stops on an overflow on purpose to emphasize on this situation.
5 The processing of the return values results in variables automatically created beneath the executing GenericItem. This does not take into account that names in the result can be identical for different sources in OSI PI.
6 The full result is returned as JSON. If the result is a very large JSON this can fail, especially if the result is also stored in MongoDB (JSON document size limit).