State Collection¶
DGI state collection provides a method for collecting a casually consistent state of devices in a group of DGI.
Using State Collection¶
State collections are requested using the DGI message passing interface. Modules submit requests to state collection, state collection runs those collections during it’s phase, and returns the collected state a message to the requesting module.
Supported Signal Types¶
The following signal and device types are supported. To support more signals, you’ll need to modify the StateCollection code. See Adding New Signal Types
Device Type | Supported Signal(s) |
---|---|
Sst | gateway |
Drer | generation |
Desd | storage |
Load | drain |
Fid | state |
Requesting State Collection¶
State collection requests are initiated with a sc::RequestMessage
.
The message must contain:
- module - The module that requests the state collection (in order to return the result to the requesting module. Set with set_module()
- One or more
DeviceSignalRequestMessages
which contain the device type and signal you wish to collect. EachDeviceSignalRequestMessages
must have their type and signal values set. type is the type of device to collect the value from (For example, “Sst”) and the specific signal from that device (For example, “gateway”). State collection will collect the values from the first device of that type attached to the DGI.
Example: Collecting Single Device/Value¶
This example requests values from one device (SST) per DGI in the group:
sc::StateCollectionMessage msg;
sc::RequestMessage * submsg = msg.mutable_request_message();
submsg->set_module("YOURMODULE");
sc::DeviceSignalRequestMessage * subsubmsg = submsg->add_device_signal_request_message();
subsubmsg->set_type("Sst");
subsubmsg->set_signal("gateway");
ModuleMessage m;
m.mutable_state_collection_message()->CopyFrom(msg);
m.set_recipient_module("sc");
The module prepares an sc::StateCollectionMessage
then accesses its child request_message
, and adds the signals that need to be collected (In this case, SST’s gateway values).
It then packs the message into a module message and addresses it to State Collection.
Example: Multiple Devices¶
This example requests values from multiple devices (SST, DESD, DRER) per DGI in the group:
sc::StateCollectionMessage msg;
sc::RequestMessage * state_request = msg.mutable_request_message();
state_request->set_module("YOURMODULE");
sc::DeviceSignalRequestMessage * device_state;
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Sst");
device_state->set_signal("gateway");
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Desd");
device_state->set_signal("storage");
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Drer");
device_state->set_signal("generation");
ModuleMessage m;
m.mutable_state_collection_message()->CopyFrom(msg);
m.set_recipient_module("sc");
Collected State Response¶
State collection will send back the following state response, which contains the collected state:
message CollectedStateMessage
{
repeated double gateway = 1;
repeated double generation = 2;
repeated double storage = 3;
repeated double drain = 4;
repeated double state = 5;
required int32 num_intransit_accepts = 6;
}
Values can be accessed by iterating over the values in the sc::CollectedStateMessage
fields.
Each of the fields in the above message can be accessed through a function call to that field’s name.
For example, accessing the gateway values can be done with the gateway() method of the sc::CollectedStateMessage
.
Here is an example HandleCollectedState method:
HandleCollectedState(const sc::CollectedStateMessage &m, const CPeerNode& peer)
{
...
//for SST device
BOOST_FOREACH(float v, m.gateway())
{
//print out each gateway value from collected message
Logger.Info << "Gateway value is " << v << std::endl;
}
//for DRER device
BOOST_FOREACH(float v, m.generation())
{
//print out each generation value from collected message
Logger.Info << "Generation value is " << v << std::endl;
}
//for DESD device
BOOST_FOREACH(float v, m.storage())
{
//print out each storage value from collected message
Logger.Info << "Storage value is " << v << std::endl;
}
}
Adding New Signal Types¶
To add a new signal or device to state collection, first add a new entry to the CollectedState message in src/messages/StateCollection.proto
. For example, to add a new frequency signal to a new or existing device one line related to signal type frequency should be added as follows:
message CollectedStateMessage
{
repeated double gateway = 1;
repeated double generation = 2;
repeated double storage = 3;
repeated double drain = 4;
repeated double state = 5;
repeated double frequency = 6; // New line for the new signal.
required int32 num_intransit_accepts = 7;
}
Make sure you adjust the assigned numbers for the fields accordingly.
Next, in the StateResponse()
method of sc/StateCollection.cpp
add the new device or signal. In this example, we have added both a new device (Omega) and a new signal to that device (frequency):
if (dssm.type() == "Sst")
{
if(dssm.count()>0)
{
csm->add_gateway(dssm.value());
}
else
{
csm->clear_gateway();
}
}
else if (dssm.type() == "Drer")
{
if(dssm.count()>0)
{
csm->add_generation(dssm.value());
}
else
{
csm->clear_generation();
}
}
else if (dssm.type() == "Desd")
{
if(dssm.count()>0)
{
csm->add_storage(dssm.value());
}
else
{
csm->clear_storage();
}
}
else if (dssm.type() == "Omega")
{
if(dssm.count()>0)
{
csm->add_frequency(dssm.value());
}
else
{
csm->clear_frequency();
}
}
When LB requests the state of the OMEGA device with SST, DESD, DRER, requested message will need to add following code related with the OMEGA device in LoadBalance.cpp file:
sc::StateCollectionMessage msg;
sc::RequestMessage * state_request = msg.mutable_request_message();
state_request->set_module("lb");
sc::DeviceSignalRequestMessage * device_state;
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Sst");
device_state->set_signal("gateway");
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Desd");
device_state->set_signal("storage");
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Drer");
device_state->set_signal("generation");
// New device and signal
device_state = state_request->add_device_signal_request_message();
device_state->set_type("Omega");
device_state->set_signal("frequency");
When LB handles received states back in LoadBalance.cpp file, the following code related with OMEGA with signal type frequency should be added:
HandleCollectedState(const sc::CollectedStateMessage &m)
{
...
//for SST device
BOOST_FOREACH(float v, m.gateway())
{
//print out each gateway value from collected message
Logger.Info << "Gateway value is " << v << std::endl;
}
//for DRER device
BOOST_FOREACH(float v, m.generation())
{
//print out each generation value from collected message
Logger.Info << "Generation value is " << v << std::endl;
}
//for DESD device
BOOST_FOREACH(float v, m.storage())
{
//print out each storage value from collected message
Logger.Info << "Storage value is " << v << std::endl;
}
// New device and signal
//for OMEGA device
BOOST_FOREACH(float v, m.frequency())
{
//print out each frequency value from collected message
Logger.Info << "Frequency value is " << v << std::endl;
}
}
Implementation Details¶
Theory: Algorithm in State Collection¶
The DGI State Collection module is implemented based on the Chandy-Lamport algorithm [1], which is used to collect consistent states of all participants in a distributed system. A consistent global state is one corresponding to a consistent cut. A consistent cut is left closed under the causal precedence relation. In another words, if one event is belongs to a cut, and all events happened before this event also belong to the cut, then the cut is considered to be consistent. The algorithm works as follows:
- The initiator starts state collection by recording its own states and broadcasting a marker out to other peers. At the same time, the initiator starts recording messages from other peers until it receives the marker back from other peers.
- Upon receiving the marker for the first time, the peer records its own state, forwards the marker to others (include the initiator) and starts recording messages from other peers until it receives the marker back from other peers.
The following diagram illustrates the Chandy-Lamport algorithm working on three nodes. The initiator is the leader node chosen by Group Management module in DGI.
Message Passing¶
State collection defines the following message types
- MarkerMessage
- DeviceSingalStateMessage
- StateMessage
- DeviceSingalRequestMessage
- RequestMessage
- CollectedStateMessage
- StateCollectionMessage
SCAgent Reference¶
State Collection Functions
- HandleIncomingMessage: “Downcasts” incoming messages into a specific message type, and passes the message to an appropriate handler.
- HandleRequest: Handle RequestMessage from other modules. Extract type and value of devices and insert into a list with certain format.
- Initiate: Initiator records its local state and broadcasts marker to the peer node.
- TakeSnapshot: Record its local states according to the device list.
- HandleMarker: Handle MarkerMessage.
- SaveForward: Save its local state and send marker out.
- SendStateBack: Send its collected state back to the initiator.
- HandleState: Handle StateMessage return back from the peer node.
- StateResponse: Handle collected states and prepare CollectedStateMessage back to the requested module. Following are signal types that has been defined by the current protocol buffers for CollectedStateMessage.
-
class
freedm::broker::sc::
SCAgent
¶ - Description:
- Declaration of Chandy-Lamport Algorithm Each node that wants to initiate the state collection records its local state and sends a marker message to all other peer nodes. Upon receiving a marker for the first time, peer nodes record their local states and start recording any message from incoming channel until receive marker from other nodes (these messages belong to the channel between the nodes).
Public Functions
-
SCAgent
() Constructor.
- Description:
- : Constructor for the state collection module.
- Precondition:
- PoxisMain prepares parameters and invokes module.
- Postcondition:
- Object initialized and ready to enter run state.
- Limitations:
- : None
Protected Functions
-
CPeerNode
GetMe
() Gets a CPeerNode representing this process.
GetMe
- Description:
- Gets a CPeerNode that refers to this process.
- Return
- A CPeerNode referring to this process.
-
std::string
GetUUID
() const Gets the UUID of this process.
GetUUID
- Description:
- Gets this process’s UUID.
- Return
- This process’s UUID
Private Functions
-
CPeerNode
AddPeer
(CPeerNode peer) Add a peer to peer set from a pointer to a peer node object.
AddPeer
- Description:
- Add a peer to peer set from a pointer to a peer node object m_AllPeers is a specific peer set for SC module.
- Precondition:
- m_AllPeers
- Postcondition:
- Add a peer to m_AllPeers
- Return
- a pointer to a peer node
- Parameters
peer
-
-
CPeerNode
GetPeer
(std::string uuid) Get a pointer to a peer from UUID.
GetPeer
- Description:
- Get a pointer to a peer from UUID. m_AllPeers is a specific peer set for SC module.
- Precondition:
- m_AllPeers
- Postcondition:
- Add a peer to m_AllPeers
- Return
- a pointer to the peer
- Parameters
uuid
-string
-
void
HandleAccept
(CPeerNode peer) Handle receiving messages.
This function will be called to handle Accept messages from LoadBalancing. Normally, state collection can safely ignore these messages, but if they arrive during state collection’s own phase, then there is a problem and they need to be added to the collected state.
- Parameters
peer
-the DGI that sent the message
-
virtual void
HandleIncomingMessage
(boost::shared_ptr< const ModuleMessage > msg, CPeerNode peer) Handles received messages.
“Downcasts” incoming messages into a specific message type, and passes the message to an appropriate handler.
- Parameters
msg
-the incoming message
peer
-the node that sent this message (could be this DGI)
-
void
HandleMarker
(const MarkerMessage & msg, CPeerNode peer) - Description:
- This function will be called to handle marker message.
- Key:
- sc.marker
- Precondition:
- Messages are obtained.
- Postcondition:
- parsing marker messages based on different conditions.
- Interaction Peers:
- Invoked by dispatcher, other SC
- Parameters
msg
-the received message
peer
-the node
-
void
HandlePeerList
(const gm::PeerListMessage & msg, CPeerNode peer) - Description:
- This function will be called to handle PeerList message.
- Key:
- any.PeerList
- Precondition:
- Messages are obtained.
- Postcondition:
- parsing messages, reset to default state if receiving PeerList from different leader.
- Interaction Peers:
- Invoked by dispatcher, other SC
- Parameters
msg
-the received message
peer
-the node
-
void
HandleRequest
(const RequestMessage & msg, CPeerNode peer) - Description:
- This function will be called to handle state collect request message.
- Key:
- sc.request
- Precondition:
- Messages are obtained.
- Postcondition:
- start state collection by calling Initiate().
- Parameters
msgpeer
-
-
void
HandleState
(const StateMessage & msg, CPeerNode peer) - Description:
- This function will be called to handle state message.
- Key:
- sc.state
- Precondition:
- Messages are obtained.
- Postcondition:
- parsing messages based on state or in-transit channel message.
- Interaction Peers:
- Invoked by dispatcher, other SC
- Parameters
msg
-the received message
peer
-the node
-
void
Initiate
() Initiator starts state collection.
Initiate
- Description:
- Initiator redcords its local state and broadcasts marker.
- Precondition:
- Receiving state collection request from other module.
- Postcondition:
- The node (initiator) starts collecting state by saving its own states and broadcasting a marker out.
- Device I/O:
- TakeSnapshot()
- Return
- Send a marker out to all known peers
- Citation:
- Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems, Vol. 3, No. 1, 1985, pp. 63-75
-
void
SaveForward
(StateVersion latest, const MarkerMessage & msg) Peer save local state and forward maker.
SaveForward
- Description:
- SaveForward is used by the node to save its local state and send marker out.
- Precondition:
- Marker message is received.
- Postcondition:
- The node saves its local state and sends marker out.
- Parameters
latest
-the current marker’s version
msg
-the message tp semd
-
void
SendStateBack
() Peer sends collected states back to the initiator.
SendStateBack
- Description:
- SendStateBack is used by the peer to send collect states back to initiator.
- Precondition:
- Peer has completed its collecting states in local side.
- Postcondition:
- Peer sends its states back to the initiator.
- Limitation:
- Currently, only sending back gateway value and channel transit messages.
-
void
StateResponse
() Initiator sends collected states back to the request module.
StateResponse
- Description:
- This function deals with the collectstate and prepare states sending back.
- Precondition:
- The initiator has collected all states.
- Postcondition:
- Collected states are sent back to the request module.
- Interaction Peers:
- other SC processes
- Return
- Send message which contains gateway values and channel transit messages
- Limitation:
- Currently, only gateway values and channel transit messages are collected and sent back.
-
void
TakeSnapshot
(const std::vector< std::string > & devicelist) Save local state.
TakeSnapshot
- Description:
- TakeSnapshot is used to collect local states.
- Precondition:
- The initiator starts state collection or the peer receives marker at first time.
- Postcondition:
- Save local state in container m_curstate
- Limitation:
- Currently, it is used to collect only the gateway values for LB module
Private Members
-
std::multimap< StateVersion, StateMessage >
collectstate
collect states container and its iterator
-
PeerSet
m_AllPeers
all known peers
-
unsigned int
m_countdone
count number of “Done” messages
-
unsigned int
m_countmarker
count number of marker
-
unsigned int
m_countstate
count number of states
-
StateMessage
m_curstate
current state
-
StateVersion
m_curversion
current version of marker
-
std::string
m_module
module that request state collection
-
bool
m_NotifyToSave
flag to indicate save channel message
-
std::string
m_scleader
save leader
Private Static Functions
-
ModuleMessage
PrepareForSending
(const StateCollectionMessage & message, std::string recipient = "sc") Wraps a StateCollectionMessage in a ModuleMessage.
Wraps a StateCollectionMessage in a ModuleMessage.
- Return
- a ModuleMessage containing a copy of the StateCollectionMessage
- Parameters
message
-the message to prepare. If any required field is unset, the DGI will abort.
recipient
-the module (sc/lb/gm/clk etc.) the message should be delivered to
References¶
[1] Distributed Snapshots: Determining Global States of Distributed Systems, ACM Transactions on Computer Systems, Vol. 3, No. 1, 1985, pp. 63-75.