C#: Decoupling a Subscriber from a Publisher of an Event

Introduction

Inspired by Microsoft’s Prism framework, I decided to implement their EventAggregator class as a learning exercise. The EventAggregator is a class within Prism that serves as a mediator between publishers and subscribers of an event. Imagine a platform where arbitrary subscribers can place the messages that they’re interested in onto it. Next imagine publishers placing messages on this same platform for subscribers to respond to. What you have imagined is what I call a message bus. Instead of using the class name “EventAggregator”, I decided to rename the class to MessageBus. This implementation requires the following classes:

  • MessageBus
    • The message platform
  • Observer
    • An object that specifies the messages it wants to be notified about as well as message-handler
  • Messages
    • Any literal value (typically a string)

In addition, I could have also implemented a class called Subscription. This class could be used by an observer to manage their subscription to the MessageBus. I will show this in another post. Demo The following code demonstrates how to use the MessageBus for subscribing and publishing. Note, even a publisher can be a subscriber to requests.


using Bizmonger.Patterns;
using Microsoft.VisualStudio.TestPlatform.UnitTestFramework;
using System.Diagnostics;

[TestClass]
public class Test1
{
    #region Testware
    const string SOME_SUBSCRIPTION = "SOME_EVENT";
    #endregion

    [TestMethod]
    public void MyTest()
    {
        MessageBus.Instance.Register(ParticipationType.PUBLISHER, SOME_SUBSCRIPTION, OnPublish);

        var payload = "subscriber_parameter";
        MessageBus.Instance.Register(ParticipationType.SUBSCRIBER, SOME_SUBSCRIPTION, payload, (obj) =>
            {
                Debug.WriteLine("Response to an event with" + obj);
                MessageBus.Instance.Register(ParticipationType.PUBLISHER, SOME_SUBSCRIPTION, OnPublish);
            });
    }

    private void OnPublish(object obj)
    {
        MessageBus.Instance.Publish(SOME_SUBSCRIPTION, string.Format("published_{0}", obj));
    }
}

MessageBus

A MessageBus is a platform to for publishers and subscribers to manage messages while being decoupled from one another. Here’s a simple implementation of MessageBus:

</pre>
<pre>using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
 
namespace Bizmonger.Patterns
{
    public class MessageBus
    {
        #region Singleton
        static MessageBus _messageBus = null;
        private MessageBus() { }
 
        public static MessageBus Instance
        {
            get
            {
                if (_messageBus == null)
                {
                    _messageBus = new MessageBus();
                }
 
                return _messageBus;
            }
        }
        #endregion
 
        #region Members
        Dictionary<string, List<Observer>> _observers = new Dictionary<string, List<Observer>>();
        Dictionary<string, List<Observer>> _oneTimeObservers = new Dictionary<string, List<Observer>>();
        Dictionary<string, List<Observer>> _waitingSubscribers = new Dictionary<string, List<Observer>>();
        Dictionary<string, List<Observer>> _waitingUnsubscribers = new Dictionary<string, List<Observer>>();
 
        int _publishingCount = 0;
        #endregion
 
        public void Subscribe(string subscription, Action<object> response)
        {
            Subscribe(subscription, response, _observers);
        }
 
        public void SubscribeFirstPublication(string subscription, Action<object> response)
        {
            Subscribe(subscription, response, _oneTimeObservers);
        }
 
        public int Unsubscribe(string subscription, Action<object> response)
        {
            List<Observer> observersToUnsubscribe = null;
            var found = _observers.TryGetValue(subscription, out observersToUnsubscribe);
 
            var waitingSubscribers = new List<Observer>();
            _waitingSubscribers.Values.ToList().ForEach(o => waitingSubscribers.AddRange(o));
            observersToUnsubscribe.AddRange(waitingSubscribers.Where(o => o.Respond == response));
 
            var oneTimeObservers = new List<Observer>();
            _oneTimeObservers.Values.ToList().ForEach(o => oneTimeObservers.AddRange(o));
            observersToUnsubscribe.AddRange(oneTimeObservers.Where(o => o.Respond == response));
 
            if (_publishingCount == 0)
            {
                observersToUnsubscribe.ForEach(o => _observers.Remove(o.Subscription));
            }
 
            else
            {
                waitingSubscribers.AddRange(observersToUnsubscribe);
            }
 
            return observersToUnsubscribe.Count;
        }
 
        public int Unsubscribe(string subscription)
        {
            List<Observer> foundObservers = null;
            var found = _observers.TryGetValue(subscription, out foundObservers);
 
            if (!found)
            {
                foundObservers = new List<Observer>();
                _observers.Add(subscription, foundObservers);
            }
 
            foundObservers.AddRange(_waitingSubscribers[subscription]);
            foundObservers.AddRange(_oneTimeObservers[subscription]);
 
            if (_publishingCount == 0)
            {
                _observers.Remove(subscription);
            }
 
            else if (_waitingUnsubscribers[subscription] != null)
            {
                _waitingUnsubscribers[subscription].AddRange(foundObservers);
            }
 
            return foundObservers.Count;
        }
 
        public void Publish(string subscription, object payload = null)
        {
            _publishingCount++;
 
            Publish(_observers, subscription, payload);
            Publish(_oneTimeObservers, subscription, payload);
            Publish(_waitingSubscribers, subscription, payload);
 
            _oneTimeObservers.Remove(subscription);
            _waitingUnsubscribers.Clear();
 
            _publishingCount--;
        }
 
        private void Publish(Dictionary<string, List<Observer>> observers, string subscription, object payload)
        {
            Debug.Assert(_publishingCount >= 0);
 
            List<Observer> foundObservers = null;
            observers.TryGetValue(subscription, out foundObservers);
 
            if (foundObservers != null)
            {
                foundObservers.ToList().ForEach(o => o.Respond(payload));
            }
        }
 
        public IEnumerable<Observer> GetObservers(string subscription)
        {
            List<Observer> foundObservers = null;
            _observers.TryGetValue(subscription, out foundObservers);
 
            return foundObservers;
        }
 
        public void Clear()
        {
            _observers.Clear();
            _oneTimeObservers.Clear();
        }
 
        #region Helpers
        private void Subscribe(string subscription, Action<object> response, Dictionary<string, List<Observer>> observers)
        {
            Debug.Assert(_publishingCount >= 0);
 
            var observer = new Observer() { Subscription = subscription, Respond = response };
 
            if (_publishingCount == 0)
            {
                Add(subscription, observers, observer);
            }
            else
            {
                Add(subscription, _waitingSubscribers, observer);
            }
        }
 
        private static void Add(string subscription, Dictionary<string, List<Observer>> observers, Observer observer)
        {
            List<Observer> foundObservers = null;
            var observersExist = observers.TryGetValue(subscription, out foundObservers);
 
            if (observersExist)
            {
                foundObservers.Add(observer);
            }
            else
            {
                foundObservers = new List<Observer>() { observer };
                observers.Add(subscription, foundObservers);
            }
        }
        #endregion
    }
}

Observer

An observer is a subscriber to a message.


using System;

namespace Bizmonger.Patterns
{
    public class Observer
    {
        public string Subscription { get; set; }
        public Action<object> Respond {get; set; }
    }
}

ParticipationType

Participation specifies an object as a subscriber or publisher.


Participation
namespace Bizmonger.Patterns
{
    public enum ParticipationType
    {
        SUBSCRIBER = 0,
        PUBLISHER
    }
}

Conclusion

In conclusion, I have described an implementation that can be leveraged to decouple a subscriber from a publisher of an event. This example did not include the Subscription class. Also, this example did not include the support for subscribing to a message only once which could be useful when performing a subscription within a method that could be invoked numerous times within the life-cycle of an application. With that said, I have learned that subscriptions are best executed within a constructor of an object so that duplicate registrations do not occur, even though the MessageBus can incorporate logic to prevent duplicate observers (i.e. HashSet). This example also does not support memory-leak prevention. In the next article, I will show a version of MessageBus that accounts for some of these concerns.

Advertisements
2 comments

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: