By | 27 July 2015

Using Chunking Rules with OnRouteSend PeopleCode

In a previous article, I wrote about chunking rules and how they can work with FULLSYNCs. If you have not read that article then please do so prior to continuing.

The chunking rules work great with the delivered FULLSYNC push processes and configuration. The FULLSYNC push process will figure out what set of data from the source table needs to get to each subscribing node. Each subscribing node will only get the data that they care about and nothing more. In that case, this logic is built into the publication process and happens at run time.

In the following schematic, we have two different subscribers to the project table but each one only cares about certain business units.

However, what happens when you have a near real-time “SYNC” operation configured which creates publications every time someone hits save on the data entry page?

In a completely vanilla configuration, if you had a “SYNC” operation turned on for the project table (or really any setup table like department, location, etc) the publication would go to all subscriber nodes. Said another way, delivered near-real-time “sync” operations do NOT honor any chunking rules. What happens in this situation is that there is some SavePostChange PeopleCode that triggers a SYNC operation from the changed data in the component buffer. The integration broker will find all the nodes that have an active Service Operation Routing and create a publication contract and perform the publication to all nodes.

That is represented in this schematic.

In a vanilla scenario, the “subscriber 1” would have to know to ignore any messages that had a business unit of “NYC”. Additionally, “subscriber 2” would have to know to ignore messages that had business unit values of “SAN”. This is fine in some situations. However, there are situations where this would be unacceptable and the subscribing systems should only be sent the data that it cares about by the publisher. Luckily we have a method to deal with this situation.

Enabling Chunking Rules for SYNC Operations

These “near-real-time” SYNC publications are really “Outbound Asynchronous Service Operations”. There is a delivered integration broker event that we can tap into and insert custom PeopleCode which can read the data in the message and decide at runtime what nodes the message should be published to. We can remove nodes if the logic determines there is data in the message that should not go to certain nodes. We can also stop from publishing to any node. This is accomplished with and OnRouteSend service operation handler. This OnRouteSend event triggers after the integration engine has received the message and before the publication contracts are created and allows you to remove or modify the contracts that are created programmatically. You can inspect the message content and override Service Operation routings. We use this OnRouteSend to override node publication and selectivity not publish to certain nodes at certain times. There is a section in PeopleBooks call Content Based Routing which describes this.

First we need to create a application class that implements the OnRouteSend method.

I will call this class CHG_IB_UTILS:onRouteSendBusUnitChunking and include the code below.

The pseudo code of this is:

  • Pull the BUSINESS_UNIT value out of LEVEL zero.
    • We do this in a generic manner so this one class can be used across different service operations that have different record names.
  • Get all nodes that the current service operation has outbound routing to and stash those in an array.
  • Loop through each Node configured and for each
    • Check if the Business Unit Chunking rules stored in record EO_BUSUNT_EOC have a setup to publish the current business unit to that node.
      • If the business unit should be send to the node then push that node name to a new array (%This.destinationNodes) that serves as the list of nodes we want to publish to.
      • If the business unit is not referenced for that node then do not copy it to the list of publication nodes.
  • Once all the nodes have been processed, then determine if %This.destinationNodes has any nodes defined.
    • If there are no nodes we return %IntBroker_ROUTE_NONE which tells the integration engine to not create any publication contracts.
    • If there are some nodes we return %IntBroker_ROUTE_SOME which tells the integration broker that we have overridden the routing definitions and to check the %This.destinationNodes for the list of nodes that should get a publication contract.

Here is the full PeopleCode Class.

import PS_PT:Integration:IRouter;

class onRouteSendBusUnitChunking implements PS_PT:Integration:IRouter
   property array of any destinationNodes;
   
   method OnRouteSend(&msgParm As Message) Returns integer;
   method getIBRoutings(&msgParm As Message) Returns array of string;
end-class;



method OnRouteSend
   /+ &msgParm as Message +/
   /+ Returns Integer +/
   /+ Extends/implements PS_PT:Integration:IRouter.OnRouteSend +/
   
   
   /***********************************************************************\
   * Send to all, none or some destinationNodes:                           *
   * %IntBroker_ROUTE_SOME; must then set array of any destinationNodes    *
   * %IntBroker_ROUTE_ALL                                                  *
   * %IntBroker_ROUTE_NONE                                                 *
   \***********************************************************************/
   %This.destinationNodes = CreateArrayAny();
    
   Local string &BUSINESS_UNIT_IN_MESSAGE;
   &BUSINESS_UNIT_IN_MESSAGE = &msgParm.GetRowset().GetRow(1).GetRecord(1).BUSINESS_UNIT.Value;
   
  
   %This.destinationNodes = CreateArrayAny();
   
   Local array of string &aFullRoutingNodeList;
   &aFullRoutingNodeList = %This.getIBRoutings(&msgParm);
   
   Local integer &i;
   

   For &i = 1 To &aFullRoutingNodeList.Len
      Local Record &recBUToNodeMapping;
      &recBUToNodeMapping = CreateRecord(Record.EO_BUSUNT_EOC);
      &recBUToNodeMapping.CHUNK_RULE_ID.Value = "BUSINESS_UNIT";
      &recBUToNodeMapping.MSGNODENAME.Value = &aFullRoutingNodeList [&i];
      &recBUToNodeMapping.BUSINESS_UNIT.Value = &BUSINESS_UNIT_IN_MESSAGE;
      
      If &recBUToNodeMapping.SelectByKeyEffDt(%Date) Then
         %This.destinationNodes.Push(&aFullRoutingNodeList [&i]);
      End-If;
   End-For;
   
   If %This.destinationNodes.Len = 0 Then
      
      Return %IntBroker_ROUTE_NONE;
   Else
      Return %IntBroker_ROUTE_SOME;
      
   End-If;
end-method;

method getIBRoutings
   /+ &msgParm as Message +/
   /+ Returns Array of String +/
   
   Local array of string &targetNodes = CreateArrayRept("", 0);
   Local Rowset &IBroutes = CreateRowset(Record.PSIBRTNGDEFN_VW);
   Local number &index, &rowCount;
   
   &rowCount = &IBroutes.Fill("WHERE FILL.IB_OPERATIONNAME = :1 AND FILL.EFFDT = (SELECT MAX(B.EFFDT) FROM PSIBRTNGDEFN_VW B WHERE B.IB_OPERATIONNAME = FILL.IB_OPERATIONNAME AND B.EFF_STATUS = 'A') AND FILL.EFF_STATUS = 'A'", &msgParm.OperationName);
   
   For &index = 1 To &rowCount
      &targetNodes.Push(&IBroutes(&index).PSIBRTNGDEFN_VW.RECEIVERNODENAME.Value);
   End-For;
   
   Return &targetNodes;
   
end-method;

Once you have your application package in place, you need to configure your code to run for the service operation you want to apply the chunking rules to. That is done by placing a special Handler in the Service Operation definition page. Here is a screenshot of how to set this up. You can only have one OnRouteSend handler configured per Service Operation. Generally, you do not see handlers configured for outbound publications. So this handler may be the only active handler.

SETID OnRouteSend Handler Code

The code sample above was for filtering on BUSINESS_UNIT. I am including an example of a SETID filter.

Application Class: CHG_IB_UTILS:onRouteSendSETIDChunking

import PS_PT:Integration:IRouter;

class onRouteSendSETIDChunking implements PS_PT:Integration:IRouter
   property array of any destinationNodes;
   
   method OnRouteSend(&msgParm As Message) Returns integer;
   method getIBRoutings(&msgParm As Message) Returns array of string;
end-class;



method OnRouteSend
   /+ &msgParm as Message +/
   /+ Returns Integer +/
   /+ Extends/implements PS_PT:Integration:IRouter.OnRouteSend +/
   
   
   /***********************************************************************\
   * Send to all, none or some destinationNodes:                           *
   * %IntBroker_ROUTE_SOME; must then set array of any destinationNodes    *
   * %IntBroker_ROUTE_ALL                                                  *
   * %IntBroker_ROUTE_NONE                                                 *
   \***********************************************************************/
   %This.destinationNodes = CreateArrayAny();
   
   Local string &SetIDForDept;
   &SetIDForDept = &msgParm.GetRowset().GetRow(1).GetRecord(1).SETID.Value;
   
   
   %This.destinationNodes = CreateArrayAny();
   
   Local array of string &aFullRoutingNodeList;
   &aFullRoutingNodeList = %This.getIBRoutings(&msgParm);
   
   Local integer &i;
   
   
   For &i = 1 To &aFullRoutingNodeList.Len
   
      Local Record &recSetidToNodeMapping;
      &recSetidToNodeMapping = CreateRecord(Record.EO_SETID_EOC);
      &recSetidToNodeMapping.CHUNK_RULE_ID.Value = "SETID";
      &recSetidToNodeMapping.MSGNODENAME.Value = &aFullRoutingNodeList [&i];
      &recSetidToNodeMapping.SETID.Value = &SetIDForDept;
      
      If &recSetidToNodeMapping.SelectByKeyEffDt(%Date) Then
         %This.destinationNodes.Push(&aFullRoutingNodeList [&i]);
      Else
         &bByPassingAReceiverNode = True;
      End-If;
   End-For;
   
   
   If %This.destinationNodes.Len = 0 Then
      
      Return %IntBroker_ROUTE_NONE;
   Else
      Return %IntBroker_ROUTE_SOME;
      
   End-If;
end-method;

method getIBRoutings
   /+ &msgParm as Message +/
   /+ Returns Array of String +/
   
   Local array of string &targetNodes = CreateArrayRept("", 0);
   Local Rowset &IBroutes = CreateRowset(Record.PSIBRTNGDEFN_VW);
   Local number &index, &rowCount;
   
   &rowCount = &IBroutes.Fill("WHERE FILL.IB_OPERATIONNAME = :1 AND FILL.EFFDT = (SELECT MAX(B.EFFDT) FROM PSIBRTNGDEFN_VW B WHERE B.IB_OPERATIONNAME = FILL.IB_OPERATIONNAME AND B.EFF_STATUS = 'A') AND FILL.EFF_STATUS = 'A'", &msgParm.OperationName);
   
   For &index = 1 To &rowCount
      &targetNodes.Push(&IBroutes(&index).PSIBRTNGDEFN_VW.RECEIVERNODENAME.Value);
   End-For;
   
   Return &targetNodes;
   
end-method;

Additional Reading