Introduction
Inspired by Microsoft’s Prism framework, I decided to implement their EventAggregator class as a learning exercise. The EventAggregator is a class within Prism that serves as a mediator between publishers and subscribers of an event. Imagine a platform where arbitrary subscribers can place the messages that they’re interested in onto it. Next imagine publishers placing messages on this same platform for subscribers to respond to. What you have imagined is what I call a message bus. Instead of using the class name “EventAggregator”, I decided to rename the class to MessageBus. This implementation requires the following classes:
- MessageBus
- The message platform
- Observer
- An object that specifies the messages it wants to be notified about as well as message-handler
- Messages
- Any literal value (typically a string)
In addition, I could have also implemented a class called Subscription. This class could be used by an observer to manage their subscription to the MessageBus. I will show this in another post. Demo The following code demonstrates how to use the MessageBus for subscribing and publishing. Note, even a publisher can be a subscriber to requests.
using Bizmonger.Patterns; using Microsoft.VisualStudio.TestPlatform.UnitTestFramework; using System.Diagnostics; [TestClass] public class Test1 { #region Testware const string SOME_SUBSCRIPTION = "SOME_EVENT"; #endregion [TestMethod] public void MyTest() { MessageBus.Instance.Register(ParticipationType.PUBLISHER, SOME_SUBSCRIPTION, OnPublish); var payload = "subscriber_parameter"; MessageBus.Instance.Register(ParticipationType.SUBSCRIBER, SOME_SUBSCRIPTION, payload, (obj) => { Debug.WriteLine("Response to an event with" + obj); MessageBus.Instance.Register(ParticipationType.PUBLISHER, SOME_SUBSCRIPTION, OnPublish); }); } private void OnPublish(object obj) { MessageBus.Instance.Publish(SOME_SUBSCRIPTION, string.Format("published_{0}", obj)); } }
MessageBus
A MessageBus is a platform to for publishers and subscribers to manage messages while being decoupled from one another. Here’s a simple implementation of MessageBus:
</pre> <pre>using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; namespace Bizmonger.Patterns { public class MessageBus { #region Singleton static MessageBus _messageBus = null; private MessageBus() { } public static MessageBus Instance { get { if (_messageBus == null) { _messageBus = new MessageBus(); } return _messageBus; } } #endregion #region Members Dictionary<string, List<Observer>> _observers = new Dictionary<string, List<Observer>>(); Dictionary<string, List<Observer>> _oneTimeObservers = new Dictionary<string, List<Observer>>(); Dictionary<string, List<Observer>> _waitingSubscribers = new Dictionary<string, List<Observer>>(); Dictionary<string, List<Observer>> _waitingUnsubscribers = new Dictionary<string, List<Observer>>(); int _publishingCount = 0; #endregion public void Subscribe(string subscription, Action<object> response) { Subscribe(subscription, response, _observers); } public void SubscribeFirstPublication(string subscription, Action<object> response) { Subscribe(subscription, response, _oneTimeObservers); } public int Unsubscribe(string subscription, Action<object> response) { List<Observer> observersToUnsubscribe = null; var found = _observers.TryGetValue(subscription, out observersToUnsubscribe); var waitingSubscribers = new List<Observer>(); _waitingSubscribers.Values.ToList().ForEach(o => waitingSubscribers.AddRange(o)); observersToUnsubscribe.AddRange(waitingSubscribers.Where(o => o.Respond == response)); var oneTimeObservers = new List<Observer>(); _oneTimeObservers.Values.ToList().ForEach(o => oneTimeObservers.AddRange(o)); observersToUnsubscribe.AddRange(oneTimeObservers.Where(o => o.Respond == response)); if (_publishingCount == 0) { observersToUnsubscribe.ForEach(o => _observers.Remove(o.Subscription)); } else { waitingSubscribers.AddRange(observersToUnsubscribe); } return observersToUnsubscribe.Count; } public int Unsubscribe(string subscription) { List<Observer> foundObservers = null; var found = _observers.TryGetValue(subscription, out foundObservers); if (!found) { foundObservers = new List<Observer>(); _observers.Add(subscription, foundObservers); } foundObservers.AddRange(_waitingSubscribers[subscription]); foundObservers.AddRange(_oneTimeObservers[subscription]); if (_publishingCount == 0) { _observers.Remove(subscription); } else if (_waitingUnsubscribers[subscription] != null) { _waitingUnsubscribers[subscription].AddRange(foundObservers); } return foundObservers.Count; } public void Publish(string subscription, object payload = null) { _publishingCount++; Publish(_observers, subscription, payload); Publish(_oneTimeObservers, subscription, payload); Publish(_waitingSubscribers, subscription, payload); _oneTimeObservers.Remove(subscription); _waitingUnsubscribers.Clear(); _publishingCount--; } private void Publish(Dictionary<string, List<Observer>> observers, string subscription, object payload) { Debug.Assert(_publishingCount >= 0); List<Observer> foundObservers = null; observers.TryGetValue(subscription, out foundObservers); if (foundObservers != null) { foundObservers.ToList().ForEach(o => o.Respond(payload)); } } public IEnumerable<Observer> GetObservers(string subscription) { List<Observer> foundObservers = null; _observers.TryGetValue(subscription, out foundObservers); return foundObservers; } public void Clear() { _observers.Clear(); _oneTimeObservers.Clear(); } #region Helpers private void Subscribe(string subscription, Action<object> response, Dictionary<string, List<Observer>> observers) { Debug.Assert(_publishingCount >= 0); var observer = new Observer() { Subscription = subscription, Respond = response }; if (_publishingCount == 0) { Add(subscription, observers, observer); } else { Add(subscription, _waitingSubscribers, observer); } } private static void Add(string subscription, Dictionary<string, List<Observer>> observers, Observer observer) { List<Observer> foundObservers = null; var observersExist = observers.TryGetValue(subscription, out foundObservers); if (observersExist) { foundObservers.Add(observer); } else { foundObservers = new List<Observer>() { observer }; observers.Add(subscription, foundObservers); } } #endregion } }
Observer
An observer is a subscriber to a message.
using System; namespace Bizmonger.Patterns { public class Observer { public string Subscription { get; set; } public Action<object> Respond {get; set; } } }
ParticipationType
Participation specifies an object as a subscriber or publisher.
Participation namespace Bizmonger.Patterns { public enum ParticipationType { SUBSCRIBER = 0, PUBLISHER } }
Conclusion
In conclusion, I have described an implementation that can be leveraged to decouple a subscriber from a publisher of an event. This example did not include the Subscription class. Also, this example did not include the support for subscribing to a message only once which could be useful when performing a subscription within a method that could be invoked numerous times within the life-cycle of an application. With that said, I have learned that subscriptions are best executed within a constructor of an object so that duplicate registrations do not occur, even though the MessageBus can incorporate logic to prevent duplicate observers (i.e. HashSet). This example also does not support memory-leak prevention. In the next article, I will show a version of MessageBus that accounts for some of these concerns.
Code? How does Subscriber1 subscribes only to Message1 and Subscriber2 only to Message2?