معرفی Async Parallel.ForEach در دات نت 6
اندازه‌ی قلم متن
تخمین مدت زمان مطالعه‌ی مطلب: چهار دقیقه

عموما زمانیکه می‌خواهیم تمام وظایف مدنظر، به صورت موازی اجرا شوند، آن‌ها را Task.WhenAll می‌کنیم. برای مثال 10 هزار درخواست HTTP را به صورت وظایفی، WhenAll می‌کنیم و ... در این حالت ... سرور ریموت، IP شما را خواهد بست! چون کنترلی بر روی تعداد وظیفه‌ی در حالت اجرای موازی وجود ندارد و یک چنین عملی، شبیه به یک حمله‌ی DDOS عمل می‌کند! برای مدیریت بهتر یک چنین مواردی، در دات نت 6 متدهای Parallel.ForEachAsync ارائه شده‌اند تا دیگر نیازی به استفاده از راه‌حل‌های ثالثی که عموما آنچنان بهینه هم نیستند، نباشد.
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
public static Task ForEachAsync<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask> body)
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, CancellationToken cancellationToken, Func<TSource, CancellationToken, ValueTask> body)
public static Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TSource, CancellationToken, ValueTask> body)
این مجموعه متدها از ValueTaskها بجای Taskها استفاده می‌کند تا سربار ایجاد Taskها در حلقه‌ها کاهش یابد. همچنین در اینجا degree of parallelism به صورت پیش‌فرض به تعداد هسته‌های سی‌پی تنظیم شده‌است (Environment.ProcessorCount)؛ چون عموما توسعه دهنده‌ها نمی‌دانند که چه عددی را باید برای آن انتخاب کنند. هر چند امکان تنظیم دستی آن‌ها هم وجود دارد (یکی از مهم‌ترین مشکلات کار با WhenAll).

یک مثال: در اینجا می‌خواهیم به صورت موازی، مشخصات کاربرانی از Github را توسط HttpClient دریافت کنیم. هر بار هم فقط می‌خواهیم سه وظیفه اجرا شوند و نه بیشتر
using System.Net.Http.Headers;
using System.Net.Http.Json;
 
var userHandlers = new []  { "users/VahidN", "users/shanselman", "users/jaredpar", "users/davidfowl" };
 
using HttpClient client = new()
{
    BaseAddress = new Uri("https://api.github.com"),
};
client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("DotNet", "6"));
 
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = 3 };
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token); 
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});
 
public class GitHubUser
{
    public string Name { get; set; }
    public string  Bio { get; set; }
}
در این مثال، نمونه‌ای از کارکرد متد جدید Parallel.ForEachAsync را مشاهده می‌کنید که اینبار، MaxDegreeOfParallelism آن قابل تنظیم است. یعنی با تنظیم فوق، هربار فقط سه وظیفه به صورت موازی اجرا خواهند شد. البته تنظیم آن به منهای یک، همان حالت WhenAll را سبب خواهد شد؛ یعنی محدودیتی وجود نخواهد داشت.
متد Parallel.ForEachAsync، آرایه‌ای را که باید بر روی آن کار کند، دریافت می‌کند. سپس تنظیمات اجرای موازی آن‌ها را هم مشخص می‌کنیم. در ادامه آن‌ها را در دسته‌های مشخصی، به صورت موازی بر اساس منطقی که مشخص می‌کنیم، اجرا خواهد کرد.


وضعیت امکان اجرای موازی متدهای async همزمان، تا پیش از دات نت 6

<List<T به همراه متد الحاقی ForEach است که می‌تواند یک <Action<T را بر روی المان‌های این لیست، اجرا کند و ... عموما زمانیکه به وظایف async می‌رسیم، به اشتباه مورد استفاده قرار می‌گیرد:
customers.ForEach(c => SendEmailAsync(c));
مثال فوق، با اجرای حلقه‌ی زیر تفاوتی ندارد:
foreach(var c in customers)
{
    SendEmailAsync(c); // the return task is ignored
}
یعنی یک عملیات async، بدون await فراخوانی شده‌است و تا پایان عملیات مدنظر، صبر نخواهد شد. حداقل مشکل آن این است که اگر در این بین استثنایی رخ دهد، هیچگاه متوجه آن نخواهید شد و حتی می‌تواند کل پروسه‌ی برنامه را خاتمه دهد. شاید عنوان کنید که می‌شود این مشکل را به صورت زیر حل کرد:
customers.ForEach(async c => await SendEmailAsync(c));
اما ... این روش هم تفاوتی با قبل ندارد. از این لحاظ که متد ForEach یک <Action<T را دریافت می‌کند که خروجی آن void است. یعنی در نهایت با راه حل دوم، فقط یک async void ایجاد می‌شود که باز هم قابلیت صبر کردن تا پایان عملیات را ندارد. نکته‌ی مهم اینجا است که اجرای موازی آن‌ها توسط متد Parallel.ForEach نیز دقیقا همین مشکل را دارد.
تنها راه حل پذیرفته‌ی شده‌ی چنین عمل async ای، فراخوانی آن‌ها به صورت متداول زیر و بدون استفاده از متد ForEach است:
foreach(var c in customers)
{
   await SendEmailAsync(c);
}
و یا Task.WhenAll کردن آن‌ها، با علم به این موضوع که MaxDegreeOfParallelism آن قابل کنترل نیست (حداقل به صورت استاندارد و بدون نیاز به کتابخانه‌های جانبی). برای مثال بجای نوشتن:
foreach(var o in orders)
{
    await ProcessOrderAsync(o);
}
می‌توان آن‌را به صورت زیر درآورد:
var tasks = orders.Select(o => ProcessOrderAsync(o)).ToList();
await Task.WhenAll(tasks);
در این حالت عملیات ProcessOrderAsync را تبدیل به لیستی از وظایف مدنظر کرده و به متد Task.WhenAll ارسال می‌کنیم تا به صورت موازی اجرا شوند. اما ... اگر 10 هزار Task وجود داشته باشند، کنترلی بر روی تعداد وظایف در حال اجرای موازی وجود نخواهد داشت و این مورد نه تنها سبب بالا رفتن کارآیی نخواهد شد، بلکه می‌تواند سرور را هم با اخلال پردازشی، به علت کمبود منابع در دسترس مواجه کند.

دات نت 6، هم کنترل MaxDegreeOfParallelism را میسر کرده‌است و هم اینکه اینبار نگارش async واقعی Parallel.ForEachAsync را ارائه داده‌است تا دیگر همانند حالت قبلی Parallel.ForEach، به async void‌ها و مشکلات مرتبط با آن‌ها نرسیم.