MSMQ and polling to receive messages?

Pure.Krome picture Pure.Krome · Jul 29, 2009 · Viewed 14.7k times · Source

I've got a windows service that does some image conversion. It works by firing off when any file (in a particular folder) is renamed (ie. rename file watcher). Works great until I have a massive amount of images dumped (and renamed) in that folder. CPU redlines, etc..

So, I was going to change my code to use MSMQ to queue all the files that need to be converted. Fine. Everytime the file is renamed and the file watcher fires, i then add a new message to the queue. Kewl.

Problem is this -> how do i grab one message at a time from the queue?

Do I need to make a timer object that polls the queue every xxx seconds? Or is there a way to constantly keep peeking the first item in the queue. Once a message exists, extract it, process it, then continue (which .. means, keep peeking until the world blows up).

I've wondered if i just need to put a while loop around the Receive method. Pseduo code is below (in Edit #2)...

Anyone have any experience with this and have some suggestions?

Thanks kindly!

EDIT:

If WCF is the way to go, can someone provide some sample code, etc instead?

EDIT 2:

Here's some pseudo code i was thinking off....

// Windows service start method.
protected override void OnStart(string[] args)
{
   // some initialisation stuf...

   // Start polling the queue.
   StartPollingMSMQ();

   // ....
}

private static void StartPollingMSMQ()
{
    // NOTE: This code should check if the queue exists, instead of just assuming it does.
    //       Left out for berevity.
    MessageQueue messageQueue = new MessageQueue(".\\Foo");

    while (true)
    {
        // This blocks/hangs here until a message is received.
        Message message = messageQueue.Receive(new TimeSpan(0, 0, 1));

        // Woot! we have something.. now process it...
        DoStuffWithMessage(message);

        // Now repeat for eva and eva and boomski...
    }
}

Answer

ViktorJ picture ViktorJ · Jul 29, 2009

If you use local queue, you don't need WCF.

This is how looks my sample service (a service clas from windows service project):

using System.Messaging;
public partial class MQProcessTest1 : ServiceBase
{
    //a name of the queue
    private const string MqName = @".\Private$\test1";
    //define static local private queue
    private static MessageQueue _mq;
    //lazy local property through which we access queue (it initializes queue when needed)
    private static MessageQueue mq
    {
        get
        {
            if (_mq == null)
            {
                if (!MessageQueue.Exists(MqName))
                    MessageQueue.Create(MqName);
                _mq = new MessageQueue(MqName, QueueAccessMode.ReceiveAndAdmin);
                _mq.Formatter = new BinaryMessageFormatter();
            }
            return _mq;
        }
    }

    //constructor
    public MQProcessTest1()
    {
        InitializeComponent();
        //event to process received message 
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
    }

    //method to process message
    private void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //queue that have received a message
        MessageQueue cmq = (MessageQueue)sender;
        try
        {
            //a message we have received (it is already removed from queue)
            Message msg = cmq.EndReceive(e.AsyncResult);
            //here you can process a message
        }
        catch
        {
        }
        //refresh queue just in case any changes occurred (optional)
        cmq.Refresh();
        //tell MessageQueue to receive next message when it arrives
        cmq.BeginReceive();
    }

    protected override void OnStart(string[] args)
    {
        //let start receive a message (when arrives)
        if (mq != null)
            mq.BeginReceive();
        //you can do any additional logic if mq == null
    }

    protected override void OnStop()
    {
        //close MessageQueue on service stop
        if (mq != null)
            mq.Close();
        return;
    }
}