C# 8.0 - Async Streams
اندازه‌ی قلم متن
تخمین مدت زمان مطالعه‌ی مطلب: سه دقیقه

امکان تعریف نوع‌های شمارشی async در C# 8.0

فرض کنید قصد دارید یک متد async از نوع IEnumerable را که تعدادی yield return به تاخیر افتاده را به همراه دارد (yield return‌ها فقط زمانی اجرا می‌شوند که بر روی آن‌ها متدهایی مانند ToList و یا حلقه‌ی foreach اجرا شوند) و همچنین توسط await Task.Delay، دریافت اطلاعات به صورت async را نیز شبیه سازی می‌کند، تهیه کنید:
public struct Statement
{
    public int Id { get; }
    public string Description { get; }
    public Statement(int id, string description) => (Id, Description) = (id, description);
    public override string ToString() => Description;
}

public static async Task<IEnumerable<Statement>> GetStatements(bool error)
{
    if (error)
    {
       throw new Exception("Oops, we messed up 😬");
    }

    await Task.Delay(1000); //Simulate waiting for data to come through. 

    yield return new Statement(1, "C# is cool!");
    yield return new Statement(2, "C# orginally named COOL.");
    yield return new Statement(3, "More examples...");
}
این قطعه کد حتی در C# 8.0 نیز چنین خطای کامپایلری را به همراه دارد:
The body of 'AsyncStreams.GetStatements(bool)' cannot be an iterator block because
'Task<IEnumerable<AsyncStreams.Statement>>' is not an iterator interface type (CS1624)
عنوان می‌کند که برای دریافت اطلاعات متد GetStatements باید یک iterator تشکیل شود؛ اما Task IEnumerable از این نوع نیست.

برای رفع یک چنین مشکلی، اکنون در C# 8.0 می‌توان از اینترفیس جدید IAsyncEnumerable بجای Task IEnumerable استفاده کرد. به این ترتیب تنها تغییری که در قطعه کد فوق نیاز است، تغییر امضای آن به صورت زیر است:
static async IAsyncEnumerable<Statement> GetStatements(bool error)


امکان تعریف حلقه‌های async در C# 8.0

مرحله‌ی بعد، ایجاد حلقه‌ای بر روی متد GetStatements است. اکنون مشکل دیگری وجود دارد: حلقه‌ی foreach به خودی خود، یک حلقه‌ی synchronous است و اگر از آن برای کار با یک استریم async استفاده شود، هربار که اطلاعاتی از آن بازگشت داده می‌شود، پایان یک Task نیز گزارش داده خواهد شد که می‌توان سبب خاتمه‌ی حلقه شود. بنابراین انجام اینکار نیز پیش از C# 8.0 میسر نبود که اکنون با امکان تعریف await پیش از یک حلقه‌ی foreach، ممکن شده‌است:
static async IAsyncEnumerable<Statement> GetStatementsAsync(bool error)
{
    await foreach (var statement in GetStatements(error))
    {
      await Task.Delay(1000);
      yield return statement;
    }
}
تا پیش از C# 8.0، از واژه‌ی await تنها برای دریافت یک تک مقدار استفاده می‌شد؛ اما حالا می‌توان از آن برای دریافت استریمی از نتایج (async streams) نیز استفاده کرد.


اینترفیس IAsyncEnumerable چگونه تعریف شده‌است؟

 اینترفیس IAsyncEnumerable متد GetAsyncEnumerator را تعریف می‌کند که یک IAsyncEnumerator را بازگشت می‌دهد و آن نیز به همراه متد MoveNextAsync است. اگر دقت کنید در این حالت از نگارش async اینترفیس IDisposable به نام IAsyncDisposable استفاده کرده‌است:
using System.Threading;

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }
}

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}
اینترفیس‌های IAsyncDisposable و IAsyncEnumerator یک ValueTask را توسط متدهای DisposeAsync و MoveNextAsync بازگشت می‌دهند و این مورد به C# 7x باز می‌گردد که امکان await را نه تنها بر روی Task، بلکه بر روی هر نوعی که متد GetAwaiter را پیاده سازی می‌کند، میسر می‌کند و ValueTask نیز یکی از آن‌ها است. ValueTask به صورت یک نوع مقدار (value type) تعریف شده‌است؛ بجای نوع ارجاعی Task که سربار کمتری را به همراه دارد.


مثالی از IAsyncDisposable و روش Dispose خودکار آن

با معرفی IAsyncDisposable، اگر یک مثال ساده از پیاده سازی آن به صورت زیر باشد:
public class AwaitUsingTest : IAsyncDisposable
{
   public async ValueTask DisposeAsync()
   {
     await Task.CompletedTask;
   }

   public void Dummy() { }
}
روش فراخوانی using declaration بر روی آن به همراه واژه‌ی کلیدی await در C# 8.0، مانند مثال زیر است:
async Task FooBar()
{
   await using var test = new AwaitUsingTest();
   test.Dummy();
}
  • #
    ‫۵ سال و ۱ ماه قبل، پنجشنبه ۱۰ مرداد ۱۳۹۸، ساعت ۱۲:۴۷
    از زمان ارائه‌ی NET Core 3 Preview 7. به بعد، امکان تعریف یک چنین اکشن متدهایی در ASP.NET Core وجود دارد:
    [HttpGet]
    public IAsyncEnumerable<Product> Get()
        => productsRepository.GetAllProducts();
    • #
      ‫۴ سال و ۶ ماه قبل، شنبه ۳ اسفند ۱۳۹۸، ساعت ۱۱:۵۰
      یک نکته‌ی تکمیلی: محدودیت بافر IAsyncEnumerable
      اگر بیش از 8192 رکورد را به صورت IAsyncEnumerable بازگشت دهیم، خطای زیر ظاهر خواهد شد:
      ‘AsyncEnumerableReader’ reached the configured maximum size of the buffer when enumerating a value of type ‘<type>’. 
      This limit is in place to prevent infinite streams of ‘IAsyncEnumerable<>’ from continuing indefinitely. 
      If this is not a programming mistake, consider ways to reduce the collection size, or consider manually converting ‘<type>’ into a list rather than increasing the limit.

      برای تنظیم یا تغییر آن می‌توان از خاصیت MvcOptions.MaxIAsyncEnumerableBufferLimit در برنامه‌های ASP.NET Core استفاده کرد.
    • #
      ‫۴ سال و ۶ ماه قبل، شنبه ۳ اسفند ۱۳۹۸، ساعت ۱۲:۰۰
      یک نکته‌ی تکمیلی: محدود کردن IAsyncEnumerable به یک فضای نام
      ممکن است در برنامه‌های NET Core 3x. به خطای مبهم بودن محل دریافت IAsyncEnumerable بر بخورید:
      The type ‘IAsyncEnumerable’ exists in both ‘System.Interactive.Async, Version=3.2.0.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263’
      and ‘System.Runtime, Version=4.2.2.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a’
      برای رفع آن نیاز است ارجاع IAsyncEnumerable را از System.Interactive.Async در فایل csproj، حذف کنید:
      <Target Name="AddAssemblyAliasToReactiveAsync" AfterTargets="ResolveAssemblyReferences">
          <ItemGroup>
              <ReferencePath Condition=" '%(FileName)' == 'System.Interactive.Async' ">
                  <Aliases>reactive</Aliases>
              </ReferencePath>
          </ItemGroup>
      </Target>
    • #
      ‫۴ سال و ۶ ماه قبل، شنبه ۳ اسفند ۱۳۹۸، ساعت ۱۲:۰۶
      یک نکته‌ی تکمیلی: بافر شدن کل نتیجه‌ی IAsyncEnumerable در ASP.NET Core 3x
      در اکشن متدهایی مانند:
      [HttpGet]
      public IAsyncEnumerable<Product> Get()
          => productsRepository.GetAllProducts();
      به علت نبود هیچ نوع JSON serializer ای با پشتیبانی از IAsyncEnumerable (در زمان نگارش این نکته)، کل نتیجه بافر شده و سپس بازگشت داده می‌شود و response stream در کار نیست.
  • #
    ‫۱ سال و ۶ ماه قبل، شنبه ۸ بهمن ۱۴۰۱، ساعت ۱۴:۳۰
    یک نکته‌ی تکمیلی: استفاده از IAsyncEnumerable در جهت ایجاد وب سرویس‌های REST با قابلیت Stream

     مقدمه
    در Net Core 3. نوع‌های جدیدی با عنوان‌های <IA­syncEnumerable<T>,IAsync­Enumerator<T> در فضای نام System.Collections.Gener­ic معرفی شدند. همانطور که مشخص است این نوع‌های جدید کاملا با نوع‌های synchronous خود هم پوشانی دارند و مفاهیم قبلی را به پیاده سازی میکنند.

    نوع <IAsync­Enumerable<T متد GetAsyncEnumerator را معرفی میکند تا عملیات enumeration را به صورت async انجام دهد و در خروجی این متد، نوع <IAsyncEnumerator<T را برگشت میدهد؛ به‌طوریکه این نوع disposable و دو عضو MoveNextAsync و Current را در خود دارد. اولی برای رسیدن به مقدار بعدی و دومی برای دریافت مقدار فعلی استفاده می‌شود. این در حالی است که MoveNextAsync بجای برگشت دادن یک bool یک <ValueTask<bool را برگشت می‌دهد. همچنین این متد، مقدار CancelationToken را همانند سایر فرآیندهایی که به صورت async تعریف می‌شوند، به صورت اختیاری از ورودی دریافت میکند، تا در صورت لزوم، عملیات جاری را کنسل کند. از طرفی به دلیل اینکه IAsyncEnumerator اینترفیس IAsyncDisposable را پیاده سازی میکند، متد DisposeAsync را نیز در اختیار دارد به‌طوریکه بجای void یک ValueTask را برگشت میدهد.


    نحوه استفاده از IAsyncEnumerable 
    static async IAsyncEnumerable<int> RangeAsync(int start, int count)
    {
      for (int i = 0; i < count; i++)
      {
        await Task.Delay(i);
        yield return start + i;
      }
    }
    برای استفاده از این نوع در نهایت باید از عبارت yield return استفاده کرد. تا مقدار برگشتی مشخص شده در IAsyncEnumerable که در این مثال int است برگشت داده شود. در صورت استفاده نشدن از yield، خطای cannot return value from an iterator داده می‌شود.

    پیاده سازی سمت سرور  

    در قسمت قبل سعی بر این بود تا با این نوع جدید آشنا شویم. در این قسمت تلاش میکنیم تا با استفاده از این نوع یک وب سرویس stream را ایجاد کنیم .

    ایجاد یک وب سرویس بدون خروجی IAsyncEnumerable

    در مرحله اول، یک وب سرویس REST را بدون استفاده از IAsyncEnumerable ایجاد می‌کنیم تا متوجه مشکلات آن شویم و سپس در مرحله بعدی همین وب سرویس را با نوع IAsyncEnumerable  بازنویسی میکنیم.
        [ApiController]
        [Route("[controller]")]
        public class CustomerController : ControllerBase
        {
            private readonly IDictionary<int, Customer> _customers;
            private void FillCustomerFromMemory(int countOfCustomer)
            {
                for (int CustomerId = 1; CustomerId <= countOfCustomer; CustomerId++)
                {
                    _customers.Add(key: CustomerId, new Customer($"name_{CustomerId}", CustomerId));
                }
            }
            public CustomerController()
            {
                _customers = new Dictionary<int, Customer>();
                FillCustomerFromMemory(countOfCustomer : 100);
            }
            [HttpGet]
            public async Task<IEnumerable<Customer>> Get()
            {
                var output = new List<Customer>();
                while (_customers.Any(_ => _.Key % 10 == 0))
                {
                    var customer = _customers.First(_ => _.Key % 10 == 0);
                    output.Add(new Customer(customer.Value.Name, customer.Key));
                    await Task.Delay(500);
                    _customers.Remove(customer);
                }
                return output;
            }
    
            public class Customer
            {
                public int Id { get; private set; }
    
                public string Name { get; private set; }
                public Customer(string name, int id)
                {
                    Name = name;
                    Id = id;
                }
            }
        }
    در صورت اجرای این تکه کد و فراخوانی وب سرویس موجود بعد از بارگذاری کامل دیتا، خروجی به کاربر برگشت داده می‌شود. این در حالی است که ممکن است کاربر فقط به بخشی از این دیتا نیاز داشته باشد؛ برای مثال شاید صرفا به Id با مقدار ۸۰ نیاز داشته باشد، اما مجبور است تا بارگذاری کل دیتا صبر کند. برای رفع این مشکل وب سرویس موجود را مجدد باز نویسی میکنیم.

    ایجاد یک وب سرویس با خروجی IAsyncEnumerable

      [HttpGet]
            public async IAsyncEnumerable<Customer> Get()
            {
                while (_customers.Any(_ => _.Key % 10 == 0))
                {
                    var customer = _customers.First(_ => _.Key % 10 == 0);
                    yield return new Customer(customer.Value.Name, customer.Key);
                    _customers.Remove(customer);
                    await Task.Delay(500);
                }
            }
    این بار به محض اینکه یک دیتا ساخته شد، برگشت داده می‌شود و منتظر تمام دیتا نیستیم. این برگه برنده استفاده از IAsyncEnumerable , yield return است چرا که با ترکیب این دو میتوان وب سرویسی با قابلیت stream را ایجاد کرد. از طرفی حجم payload نیز کمتر شده‌است، چرا که هر بار صرفا یک بلاک مشخص از دیتا را به کلاینت ارسال میکنیم.

    تا اینجا سمت سرور را به صورت stream پیاده سازی کردیم. در قسمت بعدی سمت کلاینت را نیز پیاده سازی میکنیم تا دیتا را همانطور که سرور، قسمت به قسمت ارسال میکند، کلاینت نیز آن را به شکل تک قسمتی دریافت کند.

    پیاده سازی سمت کلاینت

    در قسمت قبل تلاش کردیم تا یک وب سرویس با قابلیت stream را پیاده سازی کنیم. حال در این بخش کد کلاینت را به صورتی ایجاد میکنیم تا هر سری صرفا یک بلاک ارسال شده توسط سرور را دریافت و آن را Deserialize کند. برای این کار از کتابخانه Newtonsoft.Json استفاده میکنیم.
    const int TARGET = 80;
    var _httpClient = new HttpClient();
    using (var response = await _httpClient.GetAsync(
        "https://localhost:7284/customer",
         HttpCompletionOption.ResponseHeadersRead))
    {
        var stream = await response.Content.ReadAsStreamAsync();
    
        var _jsonSerializerSettings = new JsonSerializerSettings();
        var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);
    
        using TextReader textReader = new StreamReader(stream);
        using JsonReader jsonReader = new JsonTextReader(textReader);
    
        await using (stream.ConfigureAwait(false))
        {
            await jsonReader.ReadAsync().ConfigureAwait(false);
            while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
                   jsonReader.TokenType != JsonToken.EndArray)
            {
                Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
                if (customer.Id == TARGET)
                {
                    Console.WriteLine(customer.Id + " : " + customer.Name);
                    break;
                }
            }
        }
    }
    همانطورکه در کد بالا مشخص است، ابتدا یک درخواست Get را به آدرس وب سرویس زده و برای اینکه متجوجه شویم به انتهای لیست داده‌ها رسیدیم از jsonReader.TokenType != JsonToken.EndArray استفاده میکنیم. با این کار در صورتی که به ] نرسیده باشیم، باید عملیات خواندن از stream ادامه داشته باشد و هر سری بلاک جاری را Deserialize  میکنیم و در آخر در صورتیکه آیتم مورد نظر را دریافت کردیم، با دستور break از حلقه دریافت بلاک‌ها خارج می‌شویم.

     
    استفاده از CancelationToken در جهت استفاده بهینه از منابع

    تا اینجا به هدفی که انتظار داشتیم رسیدیم؛ به این شکل که یک وب سرویس را ایجاد کردیم تا اطلاعات را به صورت بخش بخش ارسال کند و کلاینتی ساختیم تا این اطلاعات را دریافت کند و در صورتیکه اطلاعات مورد نظر را دریافت کرد، به کار خواندن از وب سرویس خاتمه دهد. برای اینکه متوجه اهمیت CanclationToken  شویم دو سناریو زیر را با هم بررسی میکنیم :

    سناریو اول - قطع کردن ارتباط توسط کلاینت

    فرض کنید به هر دلیلی، برای مثال خطای داخلی برنامه‌ی کلاینت و یا بسته شدن مرورگر، ارتباط کلاینت با سرور قطع شود. در این صورت سرور از این ماجرا خبردار نمی‌شود و به کار خود جهت ارسال اطلاعات ادامه می‌دهد. همانطور که گفته شد، کلاینت به هر دلیلی از دریافت اطلاعات منصرف شده و یا به خطا خورده. پس فرستادن اطلاعات هیچ کاربردی ندارد و سرور در هر مرحله ای از ارسال که باشد، باید به کار خود خاتمه دهد.

    برای برطرف کردن مشکل، این سناریو کد سمت سرور را مجدد باز نویسی میکنیم : 
    [HttpGet]
            public async IAsyncEnumerable<Customer> Get(CancellationToken cancellationToken)
            {
                while (!cancellationToken.IsCancellationRequested && _customers.Any(_ => _.Key % 10 == 0))
                {
                    var customer = _customers.First(_ => _.Key % 10 == 0);
                    yield return new Customer(customer.Value.Name, customer.Key);
                    _customers.Remove(customer);
                    await Task.Delay(500,cancellationToken);
                }
            }
    در کد بالا صرفا یک CancelationToken به ورودی متد اضافه شده و از آن در جهت اطمینان از اتصال کلاینت استفاده شده، به طوری که در حلقه اصلی ارسال اطلاعات شرط cancellationToken.IsCancellationRequested را چک میکند تا کاربر به دلایل مختلفی از دریافت اطلاعات منصرف نشده باشد و در صورت لغو کاربر، سرور به کار خود خاتمه میدهد

    سناریو دوم-دستیابی کلاینت به اطلاعات مورد نظر

    کلاینت در صورتیکه به اطلاعات مورد نظر از طریق وب سرویس دسترسی پیدا کرد، دیگر تمایلی به ادامه خواندن از جریان داده یا stream را ندارد و از حلقه خواندن اطلاعات خارج می‌شود. اما سرور همچنان درگیر ارسال اطلاعات است. برای رفع این مشکل کد سمت کلاینت را بازنویسی میکنیم: 
    const int TARGET = 80;
    var _httpClient = new HttpClient();
    var _cancelationTokenSource = new CancellationTokenSource();
    
    using (var response = await _httpClient.GetAsync(
        "https://localhost:7284/customer",
         HttpCompletionOption.ResponseHeadersRead,
         _cancelationTokenSource.Token))
    {
        var stream = await response.Content.ReadAsStreamAsync(_cancelationTokenSource.Token);
    
        var _jsonSerializerSettings = new JsonSerializerSettings();
        var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);
    
        using TextReader textReader = new StreamReader(stream);
        using JsonReader jsonReader = new JsonTextReader(textReader);
    
        await using (stream.ConfigureAwait(false))
        {
            await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false);
            while (await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false) &&
                   jsonReader.TokenType != JsonToken.EndArray)
            {
                Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
                if (customer.Id == TARGET)
                {
                    Console.WriteLine(customer.Id + " : " + customer.Name);
                    _cancelationTokenSource.Cancel();
                    break;
                }
            }
        }
    }

    منابع :

    https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8

    https://code-maze.com/csharp-async-enumerable-yield

    Github Link : https://github.com/Ershad95/Stream_REST_API