Generally speaking, this article repeated the "Broadcast Engine" section in Programming Guide. If you use Broadcast Engine and it meets all of your needs, you will not find much new in this article.
By events, I particularly mean a process satisfying the following statements:
- The caller sends the same message to several receivers.
- All calls are performed concurrently.
- The caller finally gets to know receiver replies.
I hope the first statement does not require any additional explanation.
All calls are performed concurrently. If the connection to a specific client is slow (or is broken), sending to other clients will not be delayed until that specific client replies (or the server recognizes its unavailability after time out).
The third statement stems from real business needs. Usually a caller has to know whether its recipients successfully receive the message. Also it would be good to gather their replies when possible.
.NET Native Events
The sample is here. Open the "Server\Server.sln" solution file.
The well-known layer contains delegate declaration and a publicly available event.
|
/// <summary> /// Is called by the server when a message is sent. /// </summary> public delegate void MessageDeliveredEventHandler(string message);
/// <summary> /// ChatRoom provides common methods for the chatting. /// </summary> public interface IChatRoom { /// <summary> /// Sends the message to all clients. /// </summary> /// <param name="message">Message to send.</param> void SendMessage(string message);
/// <summary> /// Message delivered event. /// </summary> event MessageDeliveredEventHandler MessageDelivered; } |
In my implementation the caller calls the SendMessage method, which, in its turn, fires the event. This call can be made directly by a client though.
Clients create delegate instances pointing to the MarshalByRefObject-derived class and add handlers to the event. The only issue here is that the delegate should point to a well-known class, so as a work-around I declared a well-known class that invokes late-bound method (the MessageReceiver class).
|
IChatRoom iChatRoom = (IChatRoom) Activator.GetObject (typeof(IChatRoom), "gtcp://127.0.0.1:8737/ChatRoom.rem"); iChatRoom.MessageDelivered += new MessageDeliveredEventHandler(messageReceiver.MessageDelivered);
// ... ask user to enter the message // and force the event iChatRoom.SendMessage(str); |
The server provides an instance implementing the IChatRoom interface.
|
/// <summary> /// Sends the message to all clients. /// </summary> /// <param name="message">Message to send.</param> public void SendMessage(string message) { Console.WriteLine("\"{0}\" message will be sent to all clients.", message);
if (this.MessageDelivered != null) this.MessageDelivered(message); }
/// <summary> /// Message delivered event. /// </summary> public event MessageDeliveredEventHandler MessageDelivered; |
The pros:
- Very easy to implement if all business objects are located in a well-known layer.
The cons:
- Late binding is required for business objects located in "unknown for clients" DLL.
- Calls are made consecutively. The next client will be called only when the previous one returns a result.
- If a client is unreachable or throws an exception, invoking is stopped and the rest of clients will not receive the message.
- You should manage sponsorship separately.
You can mark the MessageReceiver.MessageDelivered method with a one-way attribute to solve the second problem. But you should understand that there is no way to get call results in this case. Disconnected clients will never get excluded from the event recipient list. It's like a memory leak.
|
[OneWay] public void MessageDelivered(string message) { if (this.MessageDeliveredHandler != null) this.MessageDeliveredHandler(message); } |
Summary
This scheme is completely unacceptable. It is slow, unreliable and no good under my conditions.
You can use this scheme for short-living affairs that do not have too many clients and each client needs to have a possibility to break the event process.
Interface-based approach
The example is here. Open the “Server\Server.sln” solution file.
The known layer contains the event provider interface and the client receiver interface:
|
/// <summary> /// Describes a callback called when a message is received. /// </summary> public interface IChatClient { /// <summary> /// Is called by the server when a message is accepted. /// </summary> /// <param name="message">A message.</param> object ReceiveMessage(string message); }
/// <summary> /// ChatRoom provides common methods for chatting. /// </summary> public interface IChatRoom { /// <summary> /// Sends the message to all clients. /// </summary> /// <param name="message">Message to send.</param> void SendMessage(string message);
/// <summary> /// Attaches a client. /// </summary> /// <param name="iChatClient">Receiver that will receive chat messages.</param> void AttachClient(IChatClient iChatClient); } |
The IChatClient interface must be implemented by any object which wants to receive chat messages. The client class implements the IChatClient interface
|
namespace Client { class ChatClient : MarshalByRefObject, IChatClient { static void Main(string[] args) { // client attaches to the event IChatRoom iChatRoom = (IChatRoom) Activator.GetObject(typeof (IChatRoom), "gtcp://127.0.0.1:8737/ChatRoom.rem"); iChatRoom.AttachClient(new ChatClient());
//... and asks user to enter a message.
// Then fires the event iChatRoom.SendMessage(str);
//... } } } |
The server implements the IChatRoom interface and allows attaching clients. I keep clients in the hash only because I want to remove failed receivers quickly.
I added additional comments to the snippet below.
|
class ChatServer : MarshalByRefObject, IChatRoom { /// <summary> /// Contains entries of MBR uri => client MBR implementing IChatClient interface. /// </summary> static Hashtable _clients = new Hashtable();
/// <summary> /// Attaches the client. /// </summary> /// <param name="iChatClient">Client to be attached.</param> public void AttachClient(IChatClient iChatClient) { if (iChatClient == null) return ;
lock(_clients) { //**************** // I just register this receiver under MBR uri. So I can // find and perform an // operation or remove it quickly at any time I will need it. _clients[RemotingServices.GetObjectUri((MarshalByRefObject) iChatClient)] = iChatClient; } }
/// <summary> /// To kick off the async call. /// </summary> public delegate object ReceiveMessageEventHandler(string message);
/// <summary> /// Sends the message to all clients. /// </summary> /// <param name="message">Message to send.</param> /// <returns>Number of clients having received this /// message.</returns> public void SendMessage(string message) { lock(_clients) { Console.WriteLine("\"{0}\" message will be sent to all clients.", message); AsyncCallback asyncCallback = new AsyncCallback (OurAsyncCallbackHandler);
foreach (DictionaryEntry entry in _clients) { // get the next receiver IChatClient iChatClient = (IChatClient) entry.Value; ReceiveMessageEventHandler remoteAsyncDelegate = new ReceiveMessageEventHandler(iChatClient.ReceiveMessage);
// make up the cookies for the async callback AsyncCallBackData asyncCallBackData = new AsyncCallBackData(); asyncCallBackData.RemoteAsyncDelegate = remoteAsyncDelegate; asyncCallBackData.MbrBeingCalled = (MarshalByRefObject) iChatClient;
// and initiate the call IAsyncResult RemAr = remoteAsyncDelegate.BeginInvoke (message, asyncCallback, asyncCallBackData); } } }
// Called by .NET Remoting when async call is finished. public static void OurAsyncCallbackHandler(IAsyncResult ar) { AsyncCallBackData asyncCallBackData = (AsyncCallBackData) ar.AsyncState;
try { object result = asyncCallBackData.RemoteAsyncDelegate.EndInvoke(ar);
// the call is successfully finished and // we have call results here } catch(Exception ex) { // The call has failed. // You can analyze an exception // to understand the reason. // I just exclude the failed receiver here. Console.WriteLine("Client call failed: {0}.", ex.Message); lock(_clients) { _clients.Remove( RemotingServices.GetObjectUri (asyncCallBackData.MbrBeingCalled) ); } } } }
|
The pros:
- All calls are made concurrently.
- Failed receivers do not affect other receivers.
- You can apply any policies to the failed receivers.
- You know the results of calls and you can gather ref and out parameters.
The cons:
- Much more complicated than the first scenario.
Summary
This is exactly the pattern you should use if you need to implement an event and you use native channels. I did not implement attaching and detaching sponsors here, but you should definitely consider it if your clients do not hold on receivers.
Broadcast Engine
This approach looks like the previous one. But it is easier on the server side and has absolutely different internal implementation. The sample is here.
Both the known layer and the client have absolutely the same implementation. We will find the difference only on the server.
The server constructs an instance of the Dispatcher class that contains a list of recipients:
|
private static Dispatcher _dispatcher = new Dispatcher(typeof (IChatClient)); private static IChatClient _caller; |
To perform absolutely async processing, the server attaches a handler and switches on the async mode.
|
static void Main(string[] args) { //... _dispatcher.BroadcastCallFinishedHandler += new BroadcastCallFinishedHandler ( ChatServer.BroadcastCallFinishedHandler ); _dispatcher.CallIsAsync = true; _caller = (IChatClient) _dispatcher.TransparentProxy;
//... } |
Every time the client wants to receive messages, the server puts it into a dispatcher instance:
|
/// <summary> /// Attaches the client. /// </summary> /// <param name="iChatClient">Client to attach.</param> public void AttachClient(IChatClient iChatClient) { if (iChatClient == null) return ;
_dispatcher.Add((MarshalByRefObject) iChatClient); } |
When the server wants to fire an event, it just calls a method on the provided proxy. This call will be automatically sent to all registered receivers:
|
/// <summary> /// Sends message to all clients. /// </summary> /// <param name="message">Message to send.</param> /// <returns>Number of clients having received this message.</returns> public void SendMessage(string message) { Console.WriteLine("\"{0}\" message will be sent to all clients.", message); _caller.ReceiveMessage(message); } |
In my sample I ignore call results. Anyway, Dispatcher will automatically exclude failed receivers after the 4th failure by default. But if I want to do it, I will write something like this:
|
public void BroadcastCallFinishedHandler(Dispatcher dispatcher, IMessage message, ResultCollector resultCollector) { lock(resultCollector) { foreach(DictionaryEntry entry in resultCollector.Successful) { IMethodReturnMessage iMethodReturnMessage = (IMethodReturnMessage) entry.Value;
// here you get client responses // including out and ref parameters Console.WriteLine("Returned object = {0}", iMethodReturnMessage.ReturnValue.ToString()); }
foreach(DictionaryEntry entry in resultCollector.Failed) { string mbrUri = (string) entry.Key; Exception ex = null; if (entry.Value is Exception) ex = (Exception) entry.Value; else ex = ((IMethodReturnMessage) entry.Value).Exception; MarshalByRefObject failedObject = dispatcher.FindObjectByUri(mbrUri);
Console.WriteLine("Receiver {0} has failed. Error: {1}", mbrUri, ex.Message); // here you have failed MBR object (failedObject) // and Exception (ex) } } } |
You have all results gathered in one place, so you can make any decisions here.
Broadcast Engine can be run in a synchronous mode. In this mode all invocations are made concurrently, but it waits until all clients reply or the time out expires. Sometimes it is very useful, but in this mode holds on the current thread until the call is finished. Take a look at Programming Guide for further information.
Summary
The pros:
- Easier in use than the second approach.
- All invocations are made concurrently.
- Failure of one receiver does not affect other receivers.
- Broadcast Engine automatically recognizes situations when the receiver is able to receive messages via a true broadcast channel. If receivers have not received a message via the true broadcast channel, Broadcast Engine repeats the sending via the usual channel. Therefore, you can use IP multicasting with minimum efforts.
- Broadcast Engine takes care of the sponsorship for the attached MBR receivers.
The cons:
- You still have to declare the well-known interface in order to impement events.
Broadcast Engine Filtering
In order to understand how event filtering works, let's change the task. Now clients choose a symbol and subscribe to the server’s events. The server generates a random string and sends it only to the clients whose symbols are in this string.
C# sample is here. VB.NET sample is here. You can modify the IP address of the server in the configuration file.
So, the known layer contains two interfaces
|
/// <summary> /// IStringProvider. /// </summary> public interface IStringProvider { /// <summary> /// Adds IStringReceiver to the receiver list. /// </summary> /// <param name="iStringReceiver">String receiver.</param> /// <param name="symbol">Symbol that must be in the string.</param> void Subscribe(IStringReceiver iStringReceiver, char symbol); }
/// <summary> /// IStringReceiver. /// </summary> public interface IStringReceiver { /// <summary> /// Takes a string from the server. /// </summary> /// <param name="str">A string.</param> object ReceiveString(string str); } |
The client implements the IStringReceiver interface, fetches a proxy for the IStringProvider object from the server, chooses a symbol, subscribes to events and waits for strings:
|
/// <summary> /// ChatClient demostrates simple client application. /// </summary> class ChatClient : MarshalByRefObject, IStringReceiver { /// <summary> /// Singleton instance. /// </summary> public static ChatClient Instance = new ChatClient();
/// <summary> /// Chosen character. /// </summary> public static char chosenChar;
/// <summary> /// The main entry point for the application. /// </summary> [STAThread] static void Main(string[] args) { // . . . // Configuring Remoting environment... // . . .
Random random = new Random(); chosenChar = (char) ( (short) 'a' + random.Next(26) ); Console.WriteLine(string.Format("'{0}' character has been chosen.", chosenChar));
for(;;) { try { IStringProvider iStringProvider = (IStringProvider) Activator.GetObject(typeof(IStringProvider), ConfigurationSettings.AppSettings["RemoteHostUri"] + "/StringProvider.rem"); iStringProvider.Subscribe ( ChatClient.Instance, chosenChar ); Console.WriteLine("Subscribed. Press ENTER to exit."); Console.ReadLine(); return ; } catch(Exception ex) { // . . . }
// . . . } }
/// <summary> /// Takes a string from the server. /// </summary> /// <param name="str">A string.</param> public object ReceiveString(string str) { Console.WriteLine("String \"{0}\" has been received from the server.", str); return null; } } |
Also the client detects whether the server has been restarted (the connection has been restored and the server has changed its URI) and resubscribes to the event:
|
public static void GenuineChannelsEventHandler(object sender, GenuineEventArgs e) { Console.WriteLine("Global event: {0}\r\nUrl: {1}", e.EventType, e.HostInformation == null ? "<not specified>" : e.HostInformation.ToString());
if (e.EventType == GenuineEventType.GeneralServerRestartDetected) { // server has been restarted so we have to register // our listener again IStringProvider iStringProvider = (IStringProvider) Activator.GetObject(typeof(IStringProvider), ConfigurationSettings.AppSettings["RemoteHostUri"] + "/StringProvider.rem");
iStringProvider.Subscribe ( ChatClient.Instance, chosenChar ); } } |
You can terminate and then start it again. Clients will detect its restart, resubscribe to the event and continue receiving strings.
The server puts client’s symbol into Client Session while subscribing and specify Client Session as a tag object:
|
/// <summary> /// Adds IStringReceiver to the receiver list. /// </summary> /// <param name="iStringReceiver">String receiver.</param> /// <param name="symbol">Symbol that must be in the string.</param> public void Subscribe(IStringReceiver iStringReceiver, char symbol) { // save symbol to client's session GenuineUtility.CurrentSession["symbol"] = symbol; // and subscribe the receiver this._dispatcher.Add((MarshalByRefObject) iStringReceiver, GenuineUtility.CurrentSession);
Console.WriteLine(string.Format("Client with {0} symbol has been registered.", symbol)); } |
The filter implements the IMulticastFilter interface and takes the string being sent during creation.
Actually, it is not necessary to create Filter for each call because you can fetch the string being sent immediately from the call parameters. Though it’s pretty dangerous. If you change invocation parameters (or their order) or make several invocations instead of one, you will have to rewrite all such filters.
|
public class StringFilter : IMulticastFilter { /// <summary> /// Constructs StringFilter. /// </summary> /// <param name="str"></param> public StringFilter(string str) { this._str = str; }
private string _str;
/// <summary> /// Returns receivers that should be called. /// </summary> /// <param name="cachedReceiverList">All registered receivers (read-only cached array).</param> /// <param name="iMessage">The call.</param> /// <returns>Receivers that will be called.</returns> public object[] GetReceivers(object[] cachedReceiverList, IMessage iMessage) { // get string immediately from the call IMethodCallMessage iMethodCallMessage = (IMethodCallMessage) iMessage; string str = iMethodCallMessage.Args[0] as string;
// construct result list that will contain filtered receivers object[] resultList = new object[cachedReceiverList.Length]; int resultListPosition = 0;
// go though all the receivers for ( int i = 0; i < cachedReceiverList.Length; i++ ) { // get the next receiver ReceiverInfo receiverInfo = cachedReceiverList[i] as ReceiverInfo; if (receiverInfo == null) continue;
// obtain its session ISessionSupport session = (ISessionSupport) receiverInfo.Tag; // and add it if its symbol is in the string if ( this._str.IndexOf((char) session["symbol"]) > -1 ) resultList[ resultListPosition++ ] = receiverInfo; }
return resultList; } } |
Please note that I create an array with the same initial size. Null entries will be ignored, so I just leave them null. I go though the list of receivers, take Client Session (I provided it as a tag while adding), fetch the client’s symbol and check on whether this symbol is in the string. If it is, I add this receiver to the result receiver list.
Never do anything with the provided cachedReceiverList parameter! It can be used by the dispatcher and other filters in different threads concurrently. Another array will be created after a receiver is added or removed.
When the server makes a call, it forces the filter in the local context.
|
// construct the filter StringFilter stringFilter = new StringFilter(str);
using(new DispatcherFilterKeeper(stringFilter)) { // all calls via any dispatcher will pass stringFilter // within this scope iStringReceiver.ReceiveString(str); } |
You can change the client's symbol at Client Session at any moment, the filter will immediately start using it. You can call other filters in your filter. Please refer to Programming Guide for further information on Filter Contexts (Dispatcher Context, Thread Context or Local Place Context) and how to enable a filter in different contexts. A filter set at Thread Context or Local Place Contexts affects all calls made via any Dispatcher instance.
You can use Symmetric Security Session for filtering messages in IP Multicasting environment. Just create an appropriate Security Session, send Symmetric keys to specific clients (I would recommend to use the 256-bit Rijndael algorithm with the ECB chaining mode) and then send messages under this Security Session. All clients will receive a message but only those specific ones will be able to decrypt it.
Summary
I recommend only two ways for implementing events: the second and the third approaches.
I've described IP multicasting in a separate article.