Skip to content

MassTransit Distributor (Load Balancing)

MassTransit has a distributor built in now (actually for some time…). I’ll try to explain what it does and how to use it.

What is the distributor? It provides the ability to load balancing across multiple endpoints for a given message.

Distributor Setup

For setting up the distributor, when doing the service bus configuration, call UseDisttributorFor(IEndpointFactory endpointFactory) on IServiceBusConfigurator. An instance of the endpoint factory is required.

For setting up the worker(s), when doing the service bus configuration, call ImplementDistributorWorker(Action consumeAction) on IServiceBusConfigurator.

Default IWorkerSelectionStrategy Implementation

The default worker selection strategy simple looks at the current message count in queue vs. the message count limits. It selects the worker with the smallest number of messages pending or in progress.

The data used to identify which worker is busy might be out of date. There is additional message traffic to keep this data in sync and it can fall behind when the system is under load.

Custom IWorkerSelectionStrategy

For setting up the distributor, when doing the service bus configuration, call UseDisttributorFor(IEndpointFactory endpointFactory, IWorkerSelectionStrategy workerSelectionStrategy) on IServiceBusConfigurator.

This requires a custom implementation of the IWorkerSelectionStrategy interface. There are two methods to implement.

  • HasAvaiableWorker - Using the list of candidate workers, indicates if one is available
  • SelectWorker - Using the list of candidate workers, find the best candidate available

Sample

Okay, so I know you’re now asking if I’ll get to any real code. There’s a sample in the MT solution Grid.Distributor that shows the basics of using the Distributor.

There are two parts, the activator (who sends commands) and the workers (who completes the commands). You can spin up multiple workers to work on the commands.

Topshelf 2.0 Released!

Topshelf 2.0 was released today! You can get it at Topshelf’s github.

2.0.1 was also released today! A bugfix release…

  • Shelving: renamed events no longer cause exceptions
  • Shelving: less likely for a service to start up then re-start again right away

There are a couple known issues though…

  • Some log4net log events seem to be squelched. This might be related to ilmerging we started doing for releases. When it’s run from source I don’t see this behavior but I am looking into it.
  • When shutting down you might see an exception about a pipe not being open. There are a lot of threads ‘doing’ stuff to keep everything going right. When shutting down it seems possible for them to not shut down in the right order. I have spent some time trying to pin it down exactly and I haven’t gotten it yet. I haven’t seen any negative side effects from it yet though.

Thanks to everyone who’s tried it out so far!

TopShelf 2.0

Working on making Topshelf 2.0 ready for release. I think we’re mostly feature complete for our 2.0 release, just working on fixing any left over bugs.

So what is changing in Topshelf 2.0?

  • InIsolation is no longer an option when creating new services. Support for hosting multiple AppDomains has been moved into shelving. More on shelving to follow.
  • Command line arguments have changed a bit. See the website’s documentation on command line syntax for updates there.
  • The communication between all subsystems, such as “start up services”, is now all down via messages sent across channels. This allows us to easily move to sending messages across AppDomains.
  • Shelving! More on this below.

Shelving

So what is this new shelving? It is building Topshelf to be a service host, where when you drop files in a directory, it can look for an interface implementation and spin up a new AppDomain with that logical service isolated in that AppDomain. If you update the files in that folder, the service will attempt to cleanly shutdown then restart with the new files. It’s pretty awesome, if I do say so myself.

Why is it awesome?

  1. Deploying services is dead simple xcopy now, instead of stop service, uninstall, copy, install, start
  2. Configuration changes automagically restart the service when saved – similar to IIS
  3. Need to scale out a new service? Just drop a new directory in the Service folder and you can scale easily!

Here’s a simple example.

Now compare that to the sample service in the old style.

Shelved services are just DLL assemblies, Topshelf has a Host program that is the windows service that logical services will run under. The second example is a full program that would be installed as a service.

Data Bound Collections in WPF

One of the joys of working with WPF is that data binding that can exist. At a couple points I’ve wanted to bind a Dictionary as a list. Each item in the list would have a key and only one of those items could exist based upon the the key. Thus I put together two parts that allowed me to do this. It also allows the bound data to be updated from a thread other than the UI one.

NotifyCollectionChangedBase implemented the INotifyCollectionChanged interface to allow the data bound elements in the WPF to identify the item has been updated.

The IKeyedObject just allows the KeyedCollectionBase to identify the key used in the internal dictionary.

KeyedCollectionBase implemented IList to make the collection appear as a list to the rendering elements but internally stores everything as a dictionary so only a single object with the given key exists.

XAttribute and null values

It turns out that for whatever the reason to the constructor for XAttribute does not accept null values for the value parameter. I think this makes sense even though the XElement constructor will accept null values for the value parameter – mostly because it can still function; a null value for an attribute would be nothing not ‘attributeName=”"‘.

The quick and dirty: there’s an easy solution, use an extension method.

    public static class AddXAttribute
    {
        public static XElement NewXAttribute(this XElement xa, XName name, Object value)
        {
            if (xa != null && value != null)
            {
                xa.Add(new XAttribute(name, value));
            }
            return xa;
        }
    }

Now to explain. Say we have a pretty generic BO for which we want to be able to generate some XML.

    public class BusinessObject
    {
        public int Id { get; set; }
        public String Name { get; set; }
        public String Description { get; set; }
    }
        private static String BuildXML(List<BusinessObject> data)
        {
            XDocument xDoc = null;
            try
            {
                xDoc = new XDocument(
                    new XElement("Root",
                        from d in data
                        select new XElement("BO",
                            new XAttribute("Id", d.Id),
                            new XAttribute("Name", d.Name),
                            new XAttribute("Desc", d.Description)
                        )
                    )
                );
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception caught: " + ex.Message);
                return String.Empty;
            }

            return xDoc.ToString();
        }

If we were try and run the BuildXML method with a set of data that included a null in the Name or Description properties then an exception would occur, “Value cannot be null.” This seems like it would be a pretty standard why of using LINQ expressions to build an XML document as simple as possible. We don’t really care about the nulls – we just want them handled. The answer is to use the NewXAttribute method from above as an extension method. The new BuildXML (call it BuildXML2 for now) would be very similar.

        private static String BuildXML2(List<BusinessObject> data)
        {
            XDocument xDoc = null;
            try
            {
                xDoc = new XDocument(
                    new XElement("Root",
                        from d in data
                        select (new XElement("BO"))
                            .NewXAttribute("Id", d.Id)
                            .NewXAttribute("Name", d.Name)
                            .NewXAttribute("Desc", d.Description)
                    )
                );
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception caught: " + ex.Message);
                return String.Empty;
            }

            return xDoc.ToString();
        }

You can still chain together expressions, so the code result is not terribly different than what you would expect. The one difference being that any child data would have to be declared before you would add the attributes. At least the way I think of the data when I’m building it that’s backwards.

I’ve included the entire sample below for completeness.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Xml.Linq;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var data = new List<BusinessObject>();

            // get some data
            data.Add(new BusinessObject()
            {
                Id = 1,
                Name = "One",
                Description = "First One"
            });

            data.Add(new BusinessObject()
            {
                Id = 2,
                Name = "Two",
                Description = "Second One"
            });

            data.Add(new BusinessObject()
            {
                Id = 3,
                Name = "Three",
                Description = "And the third."
            });

            String xml;

            Console.WriteLine("==== Standard =============");
            xml = BuildXML(data);
            Console.WriteLine(xml);
            Console.WriteLine("===========================");

            // we need some null data
            data[2].Description = null;

            Console.WriteLine("==== Standard w/ null data=");
            xml = BuildXML(data);
            Console.WriteLine(xml);
            Console.WriteLine("===========================");

            Console.WriteLine("==== New Extention ========");
            xml = BuildXML2(data);
            Console.WriteLine(xml);
            Console.WriteLine("===========================");

            Console.ReadLine();

        }

        private static String BuildXML(List<BusinessObject> data)
        {
            XDocument xDoc = null;
            try
            {
                xDoc = new XDocument(
                    new XElement("Root",
                        from d in data
                        select new XElement("BO",
                            new XAttribute("Id", d.Id),
                            new XAttribute("Name", d.Name),
                            new XAttribute("Desc", d.Description)
                        )
                    )
                );
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception caught: " + ex.Message);
                return String.Empty;
            }

            return xDoc.ToString();
        }

        private static String BuildXML2(List<BusinessObject> data)
        {
            XDocument xDoc = null;
            try
            {
                xDoc = new XDocument(
                    new XElement("Root",
                        from d in data
                        select (new XElement("BO"))
                            .NewXAttribute("Id", d.Id)
                            .NewXAttribute("Name", d.Name)
                            .NewXAttribute("Desc", d.Description)
                    )
                );
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception caught: " + ex.Message);
                return String.Empty;
            }

            return xDoc.ToString();
        }
    }

    public class BusinessObject
    {
        public int Id { get; set; }
        public String Name { get; set; }
        public String Description { get; set; }
    }

    public static class AddXAttribute
    {
        public static XElement NewXAttribute(this XElement xa, XName name, Object value)
        {
            if (xa != null && value != null)
            {
                xa.Add(new XAttribute(name, value));
            }
            return xa;
        }
    }
}

VirtualPathProvider for Assemblies

The VirtualPathProvider class provides some options when building websites. One of the most common is to be able to server ASPX/ASCX requests from embedded resources in an assembly.

A good place to start is from a CodeProject article that address this issue.

First, instead of using hardcode assemblies/paths/etc, a few properties were attached.

        #region Properties
        public String VirtualPath { get; set; }
        public String VirtualFileSystemAssembly { get; set; }
        public String VirtualFileSystemBaseNamespace { get; set; }
        #endregion

Also, instead of just assuming that a file might exist in the assembly if the virtual paths match – because we might want to serve file from multiple assemblies on the same virtual path – the code will check if the resource exists in the assembly.

        private bool DoesExistInVirtualPath(string virtualPath)
        {
            if (!IsInVirtualPath(virtualPath))
                return false;

            Assembly assembly = GetAssemblyReference();

            if (assembly != null)
            {
                String[] resources = assembly.GetManifestResourceNames();
                String resourceName = ConstructAssemblyResourceName(virtualPath);

                if (resources != null && resources.Contains(resourceName, StringComparer.InvariantCultureIgnoreCase))
                    return true;
            }
            return false;
        }

First we check to see if the path even matches, failing that it’s obvious that there isn’t a match. They we attempt to get a reference to the desired assembly. Of course if that fails then there’s no way to be able to serve files from it. Lastly we check the manifest for resources and if there are any does it contain the file we’re looking for. A file name mangling needs to occur (in ConstructAssemblyResourceName) because of the way the files are named as resources.

There are a couple of helper methods used in the above block, they are just there to keep things neat.

        private String ConstructAssemblyResourceName(string resourcePath)
        {
            // nix the base path
            String resourceName = resourcePath.Remove(0, VirtualPath.Length);
            // tack on the base namespace
            if (VirtualFileSystemBaseNamespace.Length > 0)
            {
                resourceName = VirtualFileSystemBaseNamespace + '/' + resourceName;
            }
            // replace directory seperators with .'s
            resourceName = resourceName.Replace('/', '.');
            return resourceName;
        }

        private Assembly GetAssemblyReference()
        {
            if (assemblyReference != null)
                return assemblyReference;

            String binDir = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location);
            String assemblyName = Path.Combine(binDir, VirtualFileSystemAssembly);

            Assembly assembly;
            try
            {
                assembly = Assembly.LoadFile(assemblyName);
            }
            catch
            {
                return null;
            }

            // if it is a real assembly - keep it
            if (assembly != null)
                assemblyReference = assembly;

            return assembly;
        }

Download the source code: Legomaster.AssemblyPathProvider.