‫۱ سال و ۶ ماه قبل، شنبه ۸ بهمن ۱۴۰۱، ساعت ۱۴:۳۰
یک نکته‌ی تکمیلی: استفاده از IAsyncEnumerable در جهت ایجاد وب سرویس‌های REST با قابلیت Stream

 مقدمه
در Net Core 3. نوع‌های جدیدی با عنوان‌های <IA­syncEnumerable<T>,IAsync­Enumerator<T> در فضای نام System.Collections.Gener­ic معرفی شدند. همانطور که مشخص است این نوع‌های جدید کاملا با نوع‌های synchronous خود هم پوشانی دارند و مفاهیم قبلی را به پیاده سازی میکنند.

نوع <IAsync­Enumerable<T متد GetAsyncEnumerator را معرفی میکند تا عملیات enumeration را به صورت async انجام دهد و در خروجی این متد، نوع <IAsyncEnumerator<T را برگشت میدهد؛ به‌طوریکه این نوع disposable و دو عضو MoveNextAsync و Current را در خود دارد. اولی برای رسیدن به مقدار بعدی و دومی برای دریافت مقدار فعلی استفاده می‌شود. این در حالی است که MoveNextAsync بجای برگشت دادن یک bool یک <ValueTask<bool را برگشت می‌دهد. همچنین این متد، مقدار CancelationToken را همانند سایر فرآیندهایی که به صورت async تعریف می‌شوند، به صورت اختیاری از ورودی دریافت میکند، تا در صورت لزوم، عملیات جاری را کنسل کند. از طرفی به دلیل اینکه IAsyncEnumerator اینترفیس IAsyncDisposable را پیاده سازی میکند، متد DisposeAsync را نیز در اختیار دارد به‌طوریکه بجای void یک ValueTask را برگشت میدهد.


نحوه استفاده از IAsyncEnumerable 
static async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
  for (int i = 0; i < count; i++)
  {
    await Task.Delay(i);
    yield return start + i;
  }
}
برای استفاده از این نوع در نهایت باید از عبارت yield return استفاده کرد. تا مقدار برگشتی مشخص شده در IAsyncEnumerable که در این مثال int است برگشت داده شود. در صورت استفاده نشدن از yield، خطای cannot return value from an iterator داده می‌شود.

پیاده سازی سمت سرور  

در قسمت قبل سعی بر این بود تا با این نوع جدید آشنا شویم. در این قسمت تلاش میکنیم تا با استفاده از این نوع یک وب سرویس stream را ایجاد کنیم .

ایجاد یک وب سرویس بدون خروجی IAsyncEnumerable

در مرحله اول، یک وب سرویس REST را بدون استفاده از IAsyncEnumerable ایجاد می‌کنیم تا متوجه مشکلات آن شویم و سپس در مرحله بعدی همین وب سرویس را با نوع IAsyncEnumerable  بازنویسی میکنیم.
    [ApiController]
    [Route("[controller]")]
    public class CustomerController : ControllerBase
    {
        private readonly IDictionary<int, Customer> _customers;
        private void FillCustomerFromMemory(int countOfCustomer)
        {
            for (int CustomerId = 1; CustomerId <= countOfCustomer; CustomerId++)
            {
                _customers.Add(key: CustomerId, new Customer($"name_{CustomerId}", CustomerId));
            }
        }
        public CustomerController()
        {
            _customers = new Dictionary<int, Customer>();
            FillCustomerFromMemory(countOfCustomer : 100);
        }
        [HttpGet]
        public async Task<IEnumerable<Customer>> Get()
        {
            var output = new List<Customer>();
            while (_customers.Any(_ => _.Key % 10 == 0))
            {
                var customer = _customers.First(_ => _.Key % 10 == 0);
                output.Add(new Customer(customer.Value.Name, customer.Key));
                await Task.Delay(500);
                _customers.Remove(customer);
            }
            return output;
        }

        public class Customer
        {
            public int Id { get; private set; }

            public string Name { get; private set; }
            public Customer(string name, int id)
            {
                Name = name;
                Id = id;
            }
        }
    }
در صورت اجرای این تکه کد و فراخوانی وب سرویس موجود بعد از بارگذاری کامل دیتا، خروجی به کاربر برگشت داده می‌شود. این در حالی است که ممکن است کاربر فقط به بخشی از این دیتا نیاز داشته باشد؛ برای مثال شاید صرفا به Id با مقدار ۸۰ نیاز داشته باشد، اما مجبور است تا بارگذاری کل دیتا صبر کند. برای رفع این مشکل وب سرویس موجود را مجدد باز نویسی میکنیم.

ایجاد یک وب سرویس با خروجی IAsyncEnumerable

  [HttpGet]
        public async IAsyncEnumerable<Customer> Get()
        {
            while (_customers.Any(_ => _.Key % 10 == 0))
            {
                var customer = _customers.First(_ => _.Key % 10 == 0);
                yield return new Customer(customer.Value.Name, customer.Key);
                _customers.Remove(customer);
                await Task.Delay(500);
            }
        }
این بار به محض اینکه یک دیتا ساخته شد، برگشت داده می‌شود و منتظر تمام دیتا نیستیم. این برگه برنده استفاده از IAsyncEnumerable , yield return است چرا که با ترکیب این دو میتوان وب سرویسی با قابلیت stream را ایجاد کرد. از طرفی حجم payload نیز کمتر شده‌است، چرا که هر بار صرفا یک بلاک مشخص از دیتا را به کلاینت ارسال میکنیم.

تا اینجا سمت سرور را به صورت stream پیاده سازی کردیم. در قسمت بعدی سمت کلاینت را نیز پیاده سازی میکنیم تا دیتا را همانطور که سرور، قسمت به قسمت ارسال میکند، کلاینت نیز آن را به شکل تک قسمتی دریافت کند.

پیاده سازی سمت کلاینت

در قسمت قبل تلاش کردیم تا یک وب سرویس با قابلیت stream را پیاده سازی کنیم. حال در این بخش کد کلاینت را به صورتی ایجاد میکنیم تا هر سری صرفا یک بلاک ارسال شده توسط سرور را دریافت و آن را Deserialize کند. برای این کار از کتابخانه Newtonsoft.Json استفاده میکنیم.
const int TARGET = 80;
var _httpClient = new HttpClient();
using (var response = await _httpClient.GetAsync(
    "https://localhost:7284/customer",
     HttpCompletionOption.ResponseHeadersRead))
{
    var stream = await response.Content.ReadAsStreamAsync();

    var _jsonSerializerSettings = new JsonSerializerSettings();
    var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);

    using TextReader textReader = new StreamReader(stream);
    using JsonReader jsonReader = new JsonTextReader(textReader);

    await using (stream.ConfigureAwait(false))
    {
        await jsonReader.ReadAsync().ConfigureAwait(false);
        while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
               jsonReader.TokenType != JsonToken.EndArray)
        {
            Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
            if (customer.Id == TARGET)
            {
                Console.WriteLine(customer.Id + " : " + customer.Name);
                break;
            }
        }
    }
}
همانطورکه در کد بالا مشخص است، ابتدا یک درخواست Get را به آدرس وب سرویس زده و برای اینکه متجوجه شویم به انتهای لیست داده‌ها رسیدیم از jsonReader.TokenType != JsonToken.EndArray استفاده میکنیم. با این کار در صورتی که به ] نرسیده باشیم، باید عملیات خواندن از stream ادامه داشته باشد و هر سری بلاک جاری را Deserialize  میکنیم و در آخر در صورتیکه آیتم مورد نظر را دریافت کردیم، با دستور break از حلقه دریافت بلاک‌ها خارج می‌شویم.

 
استفاده از CancelationToken در جهت استفاده بهینه از منابع

تا اینجا به هدفی که انتظار داشتیم رسیدیم؛ به این شکل که یک وب سرویس را ایجاد کردیم تا اطلاعات را به صورت بخش بخش ارسال کند و کلاینتی ساختیم تا این اطلاعات را دریافت کند و در صورتیکه اطلاعات مورد نظر را دریافت کرد، به کار خواندن از وب سرویس خاتمه دهد. برای اینکه متوجه اهمیت CanclationToken  شویم دو سناریو زیر را با هم بررسی میکنیم :

سناریو اول - قطع کردن ارتباط توسط کلاینت

فرض کنید به هر دلیلی، برای مثال خطای داخلی برنامه‌ی کلاینت و یا بسته شدن مرورگر، ارتباط کلاینت با سرور قطع شود. در این صورت سرور از این ماجرا خبردار نمی‌شود و به کار خود جهت ارسال اطلاعات ادامه می‌دهد. همانطور که گفته شد، کلاینت به هر دلیلی از دریافت اطلاعات منصرف شده و یا به خطا خورده. پس فرستادن اطلاعات هیچ کاربردی ندارد و سرور در هر مرحله ای از ارسال که باشد، باید به کار خود خاتمه دهد.

برای برطرف کردن مشکل، این سناریو کد سمت سرور را مجدد باز نویسی میکنیم : 
[HttpGet]
        public async IAsyncEnumerable<Customer> Get(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested && _customers.Any(_ => _.Key % 10 == 0))
            {
                var customer = _customers.First(_ => _.Key % 10 == 0);
                yield return new Customer(customer.Value.Name, customer.Key);
                _customers.Remove(customer);
                await Task.Delay(500,cancellationToken);
            }
        }
در کد بالا صرفا یک CancelationToken به ورودی متد اضافه شده و از آن در جهت اطمینان از اتصال کلاینت استفاده شده، به طوری که در حلقه اصلی ارسال اطلاعات شرط cancellationToken.IsCancellationRequested را چک میکند تا کاربر به دلایل مختلفی از دریافت اطلاعات منصرف نشده باشد و در صورت لغو کاربر، سرور به کار خود خاتمه میدهد

سناریو دوم-دستیابی کلاینت به اطلاعات مورد نظر

کلاینت در صورتیکه به اطلاعات مورد نظر از طریق وب سرویس دسترسی پیدا کرد، دیگر تمایلی به ادامه خواندن از جریان داده یا stream را ندارد و از حلقه خواندن اطلاعات خارج می‌شود. اما سرور همچنان درگیر ارسال اطلاعات است. برای رفع این مشکل کد سمت کلاینت را بازنویسی میکنیم: 
const int TARGET = 80;
var _httpClient = new HttpClient();
var _cancelationTokenSource = new CancellationTokenSource();

using (var response = await _httpClient.GetAsync(
    "https://localhost:7284/customer",
     HttpCompletionOption.ResponseHeadersRead,
     _cancelationTokenSource.Token))
{
    var stream = await response.Content.ReadAsStreamAsync(_cancelationTokenSource.Token);

    var _jsonSerializerSettings = new JsonSerializerSettings();
    var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);

    using TextReader textReader = new StreamReader(stream);
    using JsonReader jsonReader = new JsonTextReader(textReader);

    await using (stream.ConfigureAwait(false))
    {
        await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false);
        while (await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false) &&
               jsonReader.TokenType != JsonToken.EndArray)
        {
            Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
            if (customer.Id == TARGET)
            {
                Console.WriteLine(customer.Id + " : " + customer.Name);
                _cancelationTokenSource.Cancel();
                break;
            }
        }
    }
}

منابع :

https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8

https://code-maze.com/csharp-async-enumerable-yield

Github Link : https://github.com/Ershad95/Stream_REST_API