State Collection

DGI state collection provides a method for collecting a casually consistent state of devices in a group of DGI.

Using State Collection

State collections are requested using the DGI message passing interface. Modules submit requests to state collection, state collection runs those collections during it’s phase, and returns the collected state a message to the requesting module.

Supported Signal Types

The following signal and device types are supported. To support more signals, you’ll need to modify the StateCollection code. See Adding New Signal Types

Device Type Supported Signal(s)
Sst gateway
Drer generation
Desd storage
Load drain
Fid state

Requesting State Collection

State collection requests are initiated with a sc::RequestMessage. The message must contain:

  • module - The module that requests the state collection (in order to return the result to the requesting module. Set with set_module()
  • One or more DeviceSignalRequestMessages which contain the device type and signal you wish to collect. Each DeviceSignalRequestMessages must have their type and signal values set. type is the type of device to collect the value from (For example, “Sst”) and the specific signal from that device (For example, “gateway”). State collection will collect the values from the first device of that type attached to the DGI.

Example: Collecting Single Device/Value

This example requests values from one device (SST) per DGI in the group:

sc::StateCollectionMessage msg;
sc::RequestMessage * submsg = msg.mutable_request_message();
submsg->set_module("YOURMODULE");

sc::DeviceSignalRequestMessage * subsubmsg = submsg->add_device_signal_request_message();
subsubmsg->set_type("Sst");
subsubmsg->set_signal("gateway");

ModuleMessage m;
m.mutable_state_collection_message()->CopyFrom(msg);
m.set_recipient_module("sc");

The module prepares an sc::StateCollectionMessage then accesses its child request_message, and adds the signals that need to be collected (In this case, SST’s gateway values). It then packs the message into a module message and addresses it to State Collection.

Example: Multiple Devices

This example requests values from multiple devices (SST, DESD, DRER) per DGI in the group:

sc::StateCollectionMessage msg;
sc::RequestMessage * state_request = msg.mutable_request_message();
state_request->set_module("YOURMODULE");

sc::DeviceSignalRequestMessage * device_state;
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Sst");
device_state->set_signal("gateway");

device_state = state_request->add_device_signal_request_message();
device_state->set_type("Desd");
device_state->set_signal("storage");

device_state = state_request->add_device_signal_request_message();
device_state->set_type("Drer");
device_state->set_signal("generation");

ModuleMessage m;
m.mutable_state_collection_message()->CopyFrom(msg);
m.set_recipient_module("sc");

Collected State Response

State collection will send back the following state response, which contains the collected state:

message CollectedStateMessage
{
    repeated double gateway = 1;
    repeated double generation = 2;
    repeated double storage = 3;
    repeated double drain = 4;
    repeated double state = 5;
    required int32 num_intransit_accepts = 6;
}

Values can be accessed by iterating over the values in the sc::CollectedStateMessage fields. Each of the fields in the above message can be accessed through a function call to that field’s name. For example, accessing the gateway values can be done with the gateway() method of the sc::CollectedStateMessage.

Here is an example HandleCollectedState method:

HandleCollectedState(const sc::CollectedStateMessage &m, const CPeerNode& peer)
{
    ...
    //for SST device
    BOOST_FOREACH(float v, m.gateway())
    {
        //print out each gateway value from collected message
        Logger.Info << "Gateway value is " << v << std::endl;
    }

    //for DRER device
    BOOST_FOREACH(float v, m.generation())
    {
        //print out each generation value from collected message
        Logger.Info << "Generation value is " << v << std::endl;
    }

    //for DESD device
    BOOST_FOREACH(float v, m.storage())
    {
        //print out each storage value from collected message
        Logger.Info << "Storage value is " << v << std::endl;
    }
}

Adding New Signal Types

To add a new signal or device to state collection, first add a new entry to the CollectedState message in src/messages/StateCollection.proto. For example, to add a new frequency signal to a new or existing device one line related to signal type frequency should be added as follows:

message CollectedStateMessage
{
    repeated double gateway = 1;
    repeated double generation = 2;
    repeated double storage = 3;
    repeated double drain = 4;
    repeated double state = 5;
    repeated double frequency = 6; // New line for the new signal.
    required int32 num_intransit_accepts = 7;
}

Make sure you adjust the assigned numbers for the fields accordingly.

Next, in the StateResponse() method of sc/StateCollection.cpp add the new device or signal. In this example, we have added both a new device (Omega) and a new signal to that device (frequency):

if (dssm.type() == "Sst")
{
    if(dssm.count()>0)
    {
        csm->add_gateway(dssm.value());
    }
    else
    {
        csm->clear_gateway();
    }
}
else if (dssm.type() == "Drer")
{
    if(dssm.count()>0)
    {
        csm->add_generation(dssm.value());
    }
    else
    {
        csm->clear_generation();
    }
}
else if (dssm.type() == "Desd")
{
    if(dssm.count()>0)
    {
       csm->add_storage(dssm.value());
    }
    else
    {
       csm->clear_storage();
    }
}
else if (dssm.type() == "Omega")
{
    if(dssm.count()>0)
    {
        csm->add_frequency(dssm.value());
    }
    else
    {
        csm->clear_frequency();
    }
}

When LB requests the state of the OMEGA device with SST, DESD, DRER, requested message will need to add following code related with the OMEGA device in LoadBalance.cpp file:

sc::StateCollectionMessage msg;
sc::RequestMessage * state_request = msg.mutable_request_message();
state_request->set_module("lb");

sc::DeviceSignalRequestMessage * device_state;
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Sst");
device_state->set_signal("gateway");

device_state = state_request->add_device_signal_request_message();
device_state->set_type("Desd");
device_state->set_signal("storage");

device_state = state_request->add_device_signal_request_message();
device_state->set_type("Drer");
device_state->set_signal("generation");

// New device and signal
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Omega");
device_state->set_signal("frequency");

When LB handles received states back in LoadBalance.cpp file, the following code related with OMEGA with signal type frequency should be added:

HandleCollectedState(const sc::CollectedStateMessage &m)
{
    ...
    //for SST device
    BOOST_FOREACH(float v, m.gateway())
    {
        //print out each gateway value from collected message
        Logger.Info << "Gateway value is " << v << std::endl;
    }

    //for DRER device
    BOOST_FOREACH(float v, m.generation())
    {
        //print out each generation value from collected message
        Logger.Info << "Generation value is " << v << std::endl;
    }

    //for DESD device
    BOOST_FOREACH(float v, m.storage())
    {
        //print out each storage value from collected message
        Logger.Info << "Storage value is " << v << std::endl;
    }

    // New device and signal
    //for OMEGA device
    BOOST_FOREACH(float v, m.frequency())
    {
        //print out each frequency value from collected message
        Logger.Info << "Frequency value is " << v << std::endl;
    }
}

Implementation Details

Theory: Algorithm in State Collection

The DGI State Collection module is implemented based on the Chandy-Lamport algorithm [1], which is used to collect consistent states of all participants in a distributed system. A consistent global state is one corresponding to a consistent cut. A consistent cut is left closed under the causal precedence relation. In another words, if one event is belongs to a cut, and all events happened before this event also belong to the cut, then the cut is considered to be consistent. The algorithm works as follows:

  • The initiator starts state collection by recording its own states and broadcasting a marker out to other peers. At the same time, the initiator starts recording messages from other peers until it receives the marker back from other peers.
  • Upon receiving the marker for the first time, the peer records its own state, forwards the marker to others (include the initiator) and starts recording messages from other peers until it receives the marker back from other peers.

The following diagram illustrates the Chandy-Lamport algorithm working on three nodes. The initiator is the leader node chosen by Group Management module in DGI.

../_images/sc-algorithm.jpg

Message Passing

State collection defines the following message types

  • MarkerMessage
  • DeviceSingalStateMessage
  • StateMessage
  • DeviceSingalRequestMessage
  • RequestMessage
  • CollectedStateMessage
  • StateCollectionMessage

SCAgent Reference

State Collection Functions

  • HandleIncomingMessage: “Downcasts” incoming messages into a specific message type, and passes the message to an appropriate handler.
  • HandleRequest: Handle RequestMessage from other modules. Extract type and value of devices and insert into a list with certain format.
  • Initiate: Initiator records its local state and broadcasts marker to the peer node.
  • TakeSnapshot: Record its local states according to the device list.
  • HandleMarker: Handle MarkerMessage.
  • SaveForward: Save its local state and send marker out.
  • SendStateBack: Send its collected state back to the initiator.
  • HandleState: Handle StateMessage return back from the peer node.
  • StateResponse: Handle collected states and prepare CollectedStateMessage back to the requested module. Following are signal types that has been defined by the current protocol buffers for CollectedStateMessage.
class freedm::broker::sc::SCAgent

Description:
Declaration of Chandy-Lamport Algorithm Each node that wants to initiate the state collection records its local state and sends a marker message to all other peer nodes. Upon receiving a marker for the first time, peer nodes record their local states and start recording any message from incoming channel until receive marker from other nodes (these messages belong to the channel between the nodes).

Public Functions

SCAgent()

Constructor.

SCAgent

Description:
: Constructor for the state collection module.
Precondition:
PoxisMain prepares parameters and invokes module.
Postcondition:
Object initialized and ready to enter run state.
Limitations:
: None

Protected Functions

CPeerNode GetMe()

Gets a CPeerNode representing this process.

GetMe

Description:
Gets a CPeerNode that refers to this process.
Return
A CPeerNode referring to this process.

std::string GetUUID() const

Gets the UUID of this process.

GetUUID

Description:
Gets this process’s UUID.
Return
This process’s UUID

Private Functions

CPeerNode AddPeer(CPeerNode peer)

Add a peer to peer set from a pointer to a peer node object.

AddPeer

Description:
Add a peer to peer set from a pointer to a peer node object m_AllPeers is a specific peer set for SC module.
Precondition:
m_AllPeers
Postcondition:
Add a peer to m_AllPeers
Return
a pointer to a peer node
Parameters
  • peer -

CPeerNode GetPeer(std::string uuid)

Get a pointer to a peer from UUID.

GetPeer

Description:
Get a pointer to a peer from UUID. m_AllPeers is a specific peer set for SC module.
Precondition:
m_AllPeers
Postcondition:
Add a peer to m_AllPeers
Return
a pointer to the peer
Parameters
  • uuid -

    string

void HandleAccept(CPeerNode peer)

Handle receiving messages.

This function will be called to handle Accept messages from LoadBalancing. Normally, state collection can safely ignore these messages, but if they arrive during state collection’s own phase, then there is a problem and they need to be added to the collected state.

Parameters
  • peer -

    the DGI that sent the message

virtual void HandleIncomingMessage(boost::shared_ptr< const ModuleMessage > msg, CPeerNode peer)

Handles received messages.

“Downcasts” incoming messages into a specific message type, and passes the message to an appropriate handler.

Parameters
  • msg -

    the incoming message

  • peer -

    the node that sent this message (could be this DGI)

void HandleMarker(const MarkerMessage & msg, CPeerNode peer)

SCAgent::HandleMarker

Description:
This function will be called to handle marker message.
Key:
sc.marker
Precondition:
Messages are obtained.
Postcondition:
parsing marker messages based on different conditions.
Interaction Peers:
Invoked by dispatcher, other SC
Parameters
  • msg -

    the received message

  • peer -

    the node

void HandlePeerList(const gm::PeerListMessage & msg, CPeerNode peer)

SCAgent::HandlePeerList

Description:
This function will be called to handle PeerList message.
Key:
any.PeerList
Precondition:
Messages are obtained.
Postcondition:
parsing messages, reset to default state if receiving PeerList from different leader.
Interaction Peers:
Invoked by dispatcher, other SC
Parameters
  • msg -

    the received message

  • peer -

    the node

void HandleRequest(const RequestMessage & msg, CPeerNode peer)

SCAgent::HandleRequest

Description:
This function will be called to handle state collect request message.
Key:
sc.request
Precondition:
Messages are obtained.
Postcondition:
start state collection by calling Initiate().
Parameters
  • msgpeer -

void HandleState(const StateMessage & msg, CPeerNode peer)

SCAgent::HandleState

Description:
This function will be called to handle state message.
Key:
sc.state
Precondition:
Messages are obtained.
Postcondition:
parsing messages based on state or in-transit channel message.
Interaction Peers:
Invoked by dispatcher, other SC
Parameters
  • msg -

    the received message

  • peer -

    the node

void Initiate()

Initiator starts state collection.

Initiate

Description:
Initiator redcords its local state and broadcasts marker.
Precondition:
Receiving state collection request from other module.
Postcondition:
The node (initiator) starts collecting state by saving its own states and broadcasting a marker out.
Device I/O:
TakeSnapshot()
Return
Send a marker out to all known peers
Citation:
Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems, Vol. 3, No. 1, 1985, pp. 63-75

void SaveForward(StateVersion latest, const MarkerMessage & msg)

Peer save local state and forward maker.

SaveForward

Description:
SaveForward is used by the node to save its local state and send marker out.
Precondition:
Marker message is received.
Postcondition:
The node saves its local state and sends marker out.
Parameters
  • latest -

    the current marker’s version

  • msg -

    the message tp semd

void SendStateBack()

Peer sends collected states back to the initiator.

SendStateBack

Description:
SendStateBack is used by the peer to send collect states back to initiator.
Precondition:
Peer has completed its collecting states in local side.
Postcondition:
Peer sends its states back to the initiator.
Limitation:
Currently, only sending back gateway value and channel transit messages.

void StateResponse()

Initiator sends collected states back to the request module.

StateResponse

Description:
This function deals with the collectstate and prepare states sending back.
Precondition:
The initiator has collected all states.
Postcondition:
Collected states are sent back to the request module.
Interaction Peers:
other SC processes
Return
Send message which contains gateway values and channel transit messages
Limitation:
Currently, only gateway values and channel transit messages are collected and sent back.

void TakeSnapshot(const std::vector< std::string > & devicelist)

Save local state.

TakeSnapshot

Description:
TakeSnapshot is used to collect local states.
Precondition:
The initiator starts state collection or the peer receives marker at first time.
Postcondition:
Save local state in container m_curstate
Limitation:
Currently, it is used to collect only the gateway values for LB module

Private Members

std::multimap< StateVersion, StateMessage > collectstate

collect states container and its iterator

PeerSet m_AllPeers

all known peers

unsigned int m_countdone

count number of “Done” messages

unsigned int m_countmarker

count number of marker

unsigned int m_countstate

count number of states

StateMessage m_curstate

current state

StateVersion m_curversion

current version of marker

std::string m_module

module that request state collection

bool m_NotifyToSave

flag to indicate save channel message

std::string m_scleader

save leader

Private Static Functions

ModuleMessage PrepareForSending(const StateCollectionMessage & message, std::string recipient = "sc")

Wraps a StateCollectionMessage in a ModuleMessage.

Wraps a StateCollectionMessage in a ModuleMessage.

Return
a ModuleMessage containing a copy of the StateCollectionMessage
Parameters
  • message -

    the message to prepare. If any required field is unset, the DGI will abort.

  • recipient -

    the module (sc/lb/gm/clk etc.) the message should be delivered to

References

[1] Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems, Vol. 3, No. 1, 1985, pp. 63-75.