.NET4 TaskScheduler使用指南

标签: T4 .net4 C#
发布时间: 2011/3/22 10:59:03

    一、自定义TaskScheduler

//自定义TaskScheduler
public class CustomTaskScheduler : TaskScheduler, IDisposable
{
//调用Task的线程
Thread[] _Threads;

//Task Collection
BlockingCollection<Task> _Tasks = new BlockingCollection<Task>();

int _ConcurrencyLevel;


//设置schedule并发
public CustomTaskScheduler(int concurrencyLevel)
{

_Threads = new Thread[concurrencyLevel];
this._ConcurrencyLevel = concurrencyLevel;


for (int i = 0; i < concurrencyLevel; i++)
{
_Threads[i] = new Thread(() =>
{
foreach (Task task in _Tasks.GetConsumingEnumerable())
this.TryExecuteTask(task);

});

_Threads[i].Start();
}

}

protected override void QueueTask(Task task)
{
_Tasks.Add(task);
}


protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{

if (_Threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task);

return false;
}

public override int MaximumConcurrencyLevel
{
get
{
return _ConcurrencyLevel;
}
}


protected override IEnumerable<Task> GetScheduledTasks()
{
return _Tasks.ToArray();
}


public void Dispose()
{
this._Tasks.CompleteAdding();
foreach (Thread t in _Threads)
{
t.Join();
}

}
}

该scheduler代码很简单,重写相关System.Threading.Tasks.TaskScheduler类下的相关方法即可,代码中已给出相关注释。

   二、使用自定义的TaskScheduler

调用TaskScheduler代码:

 

List<string> listMsg = new List<string>() { "Task1", "Task2", "Task3", "Task4", "Task5", "Task6" };
List<Task> listTask = new List<Task>();

foreach (string msg in listMsg)
{
    Task myTask = new Task(obj => InvokeThread2((string)obj), msg, token);
    listTask.Add(myTask);
    myTask.Start(customTaskScheduler);
}

try
{
    //等待所有线程全部运行结束
    Task.WaitAll(listTask.ToArray());
}
catch (AggregateException ex)
{
    //.NET4 Task的统一异常处理机制
    foreach (Exception inner in ex.InnerExceptions)
    {
        Console.WriteLine("Exception type {0} from {1}",
        inner.GetType(), inner.Source);
    }
}

Console.ReadLine();

 

InvokeThread2 相关代码:

 

static void InvokeThread2(string msg)
{
    try
    {
        var x = Convert.ToInt32(msg.Replace("Task", "").Trim());
        Console.WriteLine(msg);
        Thread.Sleep(1000 * 5);
        Console.WriteLine("{0} ok", msg);
    }
    catch (Exception ex)
    {
        //如果有异常发生则取消正在排队的所有线程。
        tokenSource.Cancel();
        Exception exception = new Exception("error");
        exception.Source = msg;
        throw exception;
    }
}

 

 

以上代码运行效果如下:

接着在TaskScheduler调用代码中如果将第一行代码listMsg值修改成 List<string> listMsg = new List<string>() { "Task1", "Task2", "TaskA", "Task3", "Task4", "Task5", "Task6", "Task7", "Task8", "Task9" };这时候我们将得到以下结果:

这个运行结果重点要强调的地方为:后面这7个Exception'>exception。聪明的您或许已经看出来前6个exception属于没有执行的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9",而最后一个exception才是真正的发生异常的"TaskA"。这里主要用到了Task的统一异常处理机制AggregateException。 可以从运行结果得到:Task1,Task2,Task3执行成功了,但是TaskA发生了异常导致了后面排队的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9"都不会执行了。节省了系统资源,同时也提高了系统性能。

官方微信
官方QQ群
31647020