یک نکتهی تکمیلی: استفاده از IAsyncEnumerable در جهت ایجاد وب سرویسهای REST با قابلیت Stream
مقدمه
در Net Core 3. نوعهای جدیدی با عنوانهای <IAsyncEnumerable<T>,IAsyncEnumerator<T> در فضای نام System.Collections.Generic معرفی شدند. همانطور که مشخص است این نوعهای جدید کاملا با نوعهای synchronous خود هم پوشانی دارند و مفاهیم قبلی را به پیاده سازی میکنند.
نوع <IAsyncEnumerable<T متد GetAsyncEnumerator را معرفی میکند تا عملیات enumeration را به صورت async انجام دهد و در خروجی این متد، نوع <IAsyncEnumerator<T را برگشت میدهد؛ بهطوریکه این نوع disposable و دو عضو MoveNextAsync و Current را در خود دارد. اولی برای رسیدن به مقدار بعدی و دومی برای دریافت مقدار فعلی استفاده میشود. این در حالی است که MoveNextAsync بجای برگشت دادن یک bool یک <ValueTask<bool را برگشت میدهد. همچنین این متد، مقدار CancelationToken را همانند سایر فرآیندهایی که به صورت async تعریف میشوند، به صورت اختیاری از ورودی دریافت میکند، تا در صورت لزوم، عملیات جاری را کنسل کند. از طرفی به دلیل اینکه IAsyncEnumerator اینترفیس IAsyncDisposable را پیاده سازی میکند، متد DisposeAsync را نیز در اختیار دارد بهطوریکه بجای void یک ValueTask را برگشت میدهد.
نحوه استفاده از IAsyncEnumerable
static async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(i);
yield return start + i;
}
}
برای استفاده از این نوع در نهایت باید از عبارت yield return استفاده کرد. تا مقدار برگشتی مشخص شده در IAsyncEnumerable که در این مثال int است برگشت داده شود. در صورت استفاده نشدن از yield، خطای cannot return value from an iterator داده میشود.
پیاده سازی سمت سرور
در قسمت قبل سعی بر این بود تا با این نوع جدید آشنا شویم. در این قسمت تلاش میکنیم تا با استفاده از این نوع یک وب سرویس stream را ایجاد کنیم .
ایجاد یک وب سرویس بدون خروجی IAsyncEnumerable
در مرحله اول، یک وب سرویس REST را بدون استفاده از IAsyncEnumerable ایجاد میکنیم تا متوجه مشکلات آن شویم و سپس در مرحله بعدی همین وب سرویس را با نوع IAsyncEnumerable بازنویسی میکنیم.
[ApiController]
[Route("[controller]")]
public class CustomerController : ControllerBase
{
private readonly IDictionary<int, Customer> _customers;
private void FillCustomerFromMemory(int countOfCustomer)
{
for (int CustomerId = 1; CustomerId <= countOfCustomer; CustomerId++)
{
_customers.Add(key: CustomerId, new Customer($"name_{CustomerId}", CustomerId));
}
}
public CustomerController()
{
_customers = new Dictionary<int, Customer>();
FillCustomerFromMemory(countOfCustomer : 100);
}
[HttpGet]
public async Task<IEnumerable<Customer>> Get()
{
var output = new List<Customer>();
while (_customers.Any(_ => _.Key % 10 == 0))
{
var customer = _customers.First(_ => _.Key % 10 == 0);
output.Add(new Customer(customer.Value.Name, customer.Key));
await Task.Delay(500);
_customers.Remove(customer);
}
return output;
}
public class Customer
{
public int Id { get; private set; }
public string Name { get; private set; }
public Customer(string name, int id)
{
Name = name;
Id = id;
}
}
}
در صورت اجرای این تکه کد و فراخوانی وب سرویس موجود بعد از بارگذاری کامل دیتا، خروجی به کاربر برگشت داده میشود. این در حالی است که ممکن است کاربر فقط به بخشی از این دیتا نیاز داشته باشد؛ برای مثال شاید صرفا به Id با مقدار ۸۰ نیاز داشته باشد، اما مجبور است تا بارگذاری کل دیتا صبر کند. برای رفع این مشکل وب سرویس موجود را مجدد باز نویسی میکنیم.
ایجاد یک وب سرویس با خروجی IAsyncEnumerable [HttpGet]
public async IAsyncEnumerable<Customer> Get()
{
while (_customers.Any(_ => _.Key % 10 == 0))
{
var customer = _customers.First(_ => _.Key % 10 == 0);
yield return new Customer(customer.Value.Name, customer.Key);
_customers.Remove(customer);
await Task.Delay(500);
}
}
این بار به محض اینکه یک دیتا ساخته شد، برگشت داده میشود و منتظر تمام دیتا نیستیم. این برگه برنده استفاده از IAsyncEnumerable , yield return است چرا که با ترکیب این دو میتوان وب سرویسی با قابلیت stream را ایجاد کرد. از طرفی حجم payload نیز کمتر شدهاست، چرا که هر بار صرفا یک بلاک مشخص از دیتا را به کلاینت ارسال میکنیم.
تا اینجا سمت سرور را به صورت stream پیاده سازی کردیم. در قسمت بعدی سمت کلاینت را نیز پیاده سازی میکنیم تا دیتا را همانطور که سرور، قسمت به قسمت ارسال میکند، کلاینت نیز آن را به شکل تک قسمتی دریافت کند.
پیاده سازی سمت کلاینت
در قسمت قبل تلاش کردیم تا یک وب سرویس با قابلیت stream را پیاده سازی کنیم. حال در این بخش کد کلاینت را به صورتی ایجاد میکنیم تا هر سری صرفا یک بلاک ارسال شده توسط سرور را دریافت و آن را Deserialize کند. برای این کار از کتابخانه Newtonsoft.Json استفاده میکنیم.
const int TARGET = 80;
var _httpClient = new HttpClient();
using (var response = await _httpClient.GetAsync(
"https://localhost:7284/customer",
HttpCompletionOption.ResponseHeadersRead))
{
var stream = await response.Content.ReadAsStreamAsync();
var _jsonSerializerSettings = new JsonSerializerSettings();
var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync().ConfigureAwait(false);
while (await jsonReader.ReadAsync().ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
if (customer.Id == TARGET)
{
Console.WriteLine(customer.Id + " : " + customer.Name);
break;
}
}
}
}
همانطورکه در کد بالا مشخص است، ابتدا یک درخواست Get را به آدرس وب سرویس زده و برای اینکه متجوجه شویم به انتهای لیست دادهها رسیدیم از jsonReader.TokenType != JsonToken.EndArray استفاده میکنیم. با این کار در صورتی که به ] نرسیده باشیم، باید عملیات خواندن از stream ادامه داشته باشد و هر سری بلاک جاری را Deserialize میکنیم و در آخر در صورتیکه آیتم مورد نظر را دریافت کردیم، با دستور break از حلقه دریافت بلاکها خارج میشویم.
استفاده از CancelationToken در جهت استفاده بهینه از منابع
تا اینجا به هدفی که انتظار داشتیم رسیدیم؛ به این شکل که یک وب سرویس را ایجاد کردیم تا اطلاعات را به صورت بخش بخش ارسال کند و کلاینتی ساختیم تا این اطلاعات را دریافت کند و در صورتیکه اطلاعات مورد نظر را دریافت کرد، به کار خواندن از وب سرویس خاتمه دهد. برای اینکه متوجه اهمیت CanclationToken شویم دو سناریو زیر را با هم بررسی میکنیم :
سناریو اول - قطع کردن ارتباط توسط کلاینت
فرض کنید به هر دلیلی، برای مثال خطای داخلی برنامهی کلاینت و یا بسته شدن مرورگر، ارتباط کلاینت با سرور قطع شود. در این صورت سرور از این ماجرا خبردار نمیشود و به کار خود جهت ارسال اطلاعات ادامه میدهد. همانطور که گفته شد، کلاینت به هر دلیلی از دریافت اطلاعات منصرف شده و یا به خطا خورده. پس فرستادن اطلاعات هیچ کاربردی ندارد و سرور در هر مرحله ای از ارسال که باشد، باید به کار خود خاتمه دهد.
برای برطرف کردن مشکل، این سناریو کد سمت سرور را مجدد باز نویسی میکنیم :
[HttpGet]
public async IAsyncEnumerable<Customer> Get(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested && _customers.Any(_ => _.Key % 10 == 0))
{
var customer = _customers.First(_ => _.Key % 10 == 0);
yield return new Customer(customer.Value.Name, customer.Key);
_customers.Remove(customer);
await Task.Delay(500,cancellationToken);
}
}
در کد بالا صرفا یک CancelationToken به ورودی متد اضافه شده و از آن در جهت اطمینان از اتصال کلاینت استفاده شده، به طوری که در حلقه اصلی ارسال اطلاعات شرط cancellationToken.IsCancellationRequested را چک میکند تا کاربر به دلایل مختلفی از دریافت اطلاعات منصرف نشده باشد و در صورت لغو کاربر، سرور به کار خود خاتمه میدهد
سناریو دوم-دستیابی کلاینت به اطلاعات مورد نظر
کلاینت در صورتیکه به اطلاعات مورد نظر از طریق وب سرویس دسترسی پیدا کرد، دیگر تمایلی به ادامه خواندن از جریان داده یا stream را ندارد و از حلقه خواندن اطلاعات خارج میشود. اما سرور همچنان درگیر ارسال اطلاعات است. برای رفع این مشکل کد سمت کلاینت را بازنویسی میکنیم:
const int TARGET = 80;
var _httpClient = new HttpClient();
var _cancelationTokenSource = new CancellationTokenSource();
using (var response = await _httpClient.GetAsync(
"https://localhost:7284/customer",
HttpCompletionOption.ResponseHeadersRead,
_cancelationTokenSource.Token))
{
var stream = await response.Content.ReadAsStreamAsync(_cancelationTokenSource.Token);
var _jsonSerializerSettings = new JsonSerializerSettings();
var _serializer = Newtonsoft.Json.JsonSerializer.Create(_jsonSerializerSettings);
using TextReader textReader = new StreamReader(stream);
using JsonReader jsonReader = new JsonTextReader(textReader);
await using (stream.ConfigureAwait(false))
{
await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false);
while (await jsonReader.ReadAsync(_cancelationTokenSource.Token).ConfigureAwait(false) &&
jsonReader.TokenType != JsonToken.EndArray)
{
Customer customer = _serializer!.Deserialize<Customer>(jsonReader);
if (customer.Id == TARGET)
{
Console.WriteLine(customer.Id + " : " + customer.Name);
_cancelationTokenSource.Cancel();
break;
}
}
}
}
منابع :
https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8 https://code-maze.com/csharp-async-enumerable-yield
Github Link :
https://github.com/Ershad95/Stream_REST_API