A random Twitterizer developer that emailed me was attempting to pull data for multiple users all in background threads, but noticed that his requests started to be rejected by Twitter. I proposed that it was possibly a spam countermeasure by the Twitter API, and that he needed to throttle his requests. He had no idea how to do that. Seeing an excellent challenge, I jumped into Visual Studio and created the following console application.

  1 namespace ThrottledThreads
  2 {
  3     using System;
  4     using System.Collections.Generic;
  5     using System.ComponentModel;
  6     using System.Threading;
  7     
  8     class Program
  9     {
 10         // Configuration stuff 
 11         static int maxThreads = 10000;
 12         static int maxActiveThreads = 4;
 13         static int maxWaitTime = 10 * 1000;
 14 
 15         static bool verbose = false;
 16 
 17         // The rest doesn't need to be modified.
 18         public static int currentlyActive = 0;
 19         public static Queue<BackgroundWorker> threadQueue = new Queue<BackgroundWorker>();
 20         public static AutoResetEvent resetEvent = new AutoResetEvent(false);
 21         static object currentlyActiveLock = new object();
 22 
 23         static Random rnd = new Random();
 24 
 25         /// <summary>
 26         /// The entry point for the application
 27         /// </summary>
 28         /// <param name="args">The args.</param>
 29         /// <remarks></remarks>
 30         static void Main(string[] args)
 31         {
 32             Console.WriteLine("[Main] Queueing threads");
 33 
 34             // Add the background threads to the queue.
 35             for (int i = 0; i < maxThreads; i++)
 36             {
 37                 BackgroundWorker worker = new BackgroundWorker();
 38                 worker.DoWork += new DoWorkEventHandler(backgroundWorker_DoWork);
 39                 worker.RunWorkerCompleted += new RunWorkerCompletedEventHandler(backgroundWorker_RunWorkerCompleted);
 40 
 41                 threadQueue.Enqueue(worker);
 42             }
 43 
 44             int threadNumber = 1;
 45 
 46             Console.WriteLine("[Main] Entering main loop.");
 47 
 48             // Loop through the queue and process new workers as they complete
 49             while (threadQueue.Count > 0)
 50             {
 51                 lock (currentlyActiveLock)
 52                 {
 53                     for (int index = currentlyActive; index <= maxActiveThreads; index++)
 54                     {
 55                         Console.WriteLine("[Main] Running {0}", threadNumber);
 56                         threadQueue.Dequeue().RunWorkerAsync(threadNumber);
 57                         currentlyActive++;
 58                         threadNumber++;
 59                     }
 60                 }
 61 
 62                 if (verbose)
 63                     Console.WriteLine("[Main] Waiting ...");
 64                 resetEvent.WaitOne();
 65             }
 66 
 67             Console.WriteLine("[Main] All threads completed");
 68             Console.WriteLine("Press any key to exit");
 69             Console.ReadKey();
 70         }
 71 
 72         /// <summary>
 73         /// Handles the DoWork event of the backgroundWorker control.
 74         /// </summary>
 75         /// <param name="sender">The source of the event.</param>
 76         /// <param name="e">The <see cref="System.ComponentModel.DoWorkEventArgs"/> instance containing the event data.</param>
 77         /// <remarks></remarks>
 78         private static void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
 79         {
 80             BackgroundWorker worker = sender as BackgroundWorker;
 81 
 82             if (verbose)
 83                 Console.WriteLine("[{0}] Starting", e.Argument);
 84 
 85             int time = rnd.Next(500, maxWaitTime);
 86 
 87             // This is where our actual work is done. For the example, we'll just sleep for a while.
 88             Thread.Sleep(time);
 89 
 90             Console.WriteLine("[{0}] Finished in {1:0.00}s", e.Argument, time / 1000d);
 91 
 92             e.Result = e.Argument;
 93         }
 94 
 95 
 96         /// <summary>
 97         /// Handles the RunWorkerCompleted event of the backgroundWorker control.
 98         /// </summary>
 99         /// <param name="sender">The source of the event.</param>
100         /// <param name="e">The <see cref="System.ComponentModel.RunWorkerCompletedEventArgs"/> instance containing the event data.</param>
101         /// <remarks></remarks>
102         private static void backgroundWorker_RunWorkerCompleted(
103             object sender, RunWorkerCompletedEventArgs e)
104         {
105             BackgroundWorker worker = sender as BackgroundWorker;
106 
107             lock (currentlyActiveLock)
108             {
109                 currentlyActive--;
110             }
111 
112             Console.WriteLine("[{0}] Completed", e.Result);
113 
114             // Signal that the thread has finished and a new thread can be processed.
115             resetEvent.Set();
116         }
117     }
118 }

The code uses the generic Queue<T> class to queue a large set of BackgroundWorker objects. It will execute a set number of threads, then wait for a worker to complete. When a worker completes, it sends a signal to trigger a new worker to be executed from the queue. It’s pretty cool to watch.

References