این افزونه جزو موفقترین کتابخانههای دات نتی مایکروسافت در سالهای اخیر به شما میرود؛ تا حدی که معادلهای بسیاری از آن برای زبانهای دیگر مانند Java، JavaScript، Python، CPP و غیره نیز تهیه شدهاند.
استفاده از Rx به همراه یک کوئری LINQ
یک برنامهی کنسول جدید را ایجاد کنید. سپس برای نصب کتابخانهی Rx، دستور ذیل را در کنسول پاورشل نیوگت اجرا نمائید:
PM> Install-Package Rx-Main
<?xml version="1.0" encoding="utf-8"?> <packages> <package id="Rx-Core" version="2.2.4" targetFramework="net45" /> <package id="Rx-Interfaces" version="2.2.4" targetFramework="net45" /> <package id="Rx-Linq" version="2.2.4" targetFramework="net45" /> <package id="Rx-Main" version="2.2.4" targetFramework="net45" /> <package id="Rx-PlatformServices" version="2.2.4" targetFramework="net45" /> </packages>
using System; using System.Linq; namespace Rx01 { class Program { static void Main(string[] args) { var query = Enumerable.Range(1, 5).Select(number => number); foreach (var number in query) { Console.WriteLine(number); } finished(); } private static void finished() { Console.WriteLine("Done!"); } } }
اکنون اگر بخواهیم همین عملیات را توسط Rx انجام دهیم، به شکل زیر خواهد بود:
using System; using System.Linq; using System.Reactive.Linq; namespace Rx01 { class Program { static void Main(string[] args) { var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(); observableQuery.Subscribe(onNext: number => Console.WriteLine(number), onCompleted: () => finished()); } private static void finished() { Console.WriteLine("Done!"); } } }
1 2 3 4 5 Done!
observableQuery.Subscribe(Console.WriteLine, finished);
در این مثال ساده صرفا یک Syntax دیگر را نسبت به حلقهی foreach متداول مشاهده کردیم که اندکی فشردهتر است. در هر دو حالت نیز عملیات انجام شده در تردجاری صورت گرفتهاند. اما قابلیتها و ارزشهای واقعی Rx زمانی آشکار خواهند شد که پردازش موازی و پردازش در تردهای دیگر را در آن فعال کنیم.
الگوی Observer
Rx پیاده سازی کنندهی الگوی طراحی شیءگرایی به نام Observer است. برای توضیح آن یک لامپ و سوئیچ برق را درنظر بگیرید. زمانیکه لامپ مشاهده میکند سوئیچ برق در حالت روشن قرار گرفتهاست، روشن خواهد شد و برعکس. در اینجا به سوئیچ، subject و به لامپ، observer گفته میشود. هر زمان که حالت سوئیچ تغییر میکند، از طریق یک callback، وضعیت خود را به observer اعلام خواهد کرد. علت استفاده از callbackها، ارائه راهحلهای عمومی است تا بتواند با انواع و اقسام اشیاء کار کند. به این ترتیب هر بار که شیء observer از نوع متفاوتی تعریف میشود (مثلا بجای لامپ یک خودرو قرار گیرد)، نیازی نخواهد بود تا subject را تغییر داد.
در Rx دو اینترفیس معادل observer و subject تعریف شدهاند. در اینجا اینترفیس IObserver معادل observer است و اینترفیس IObservable معادل subject میباشد:
class Subject : IObservable<int> { public IDisposable Subscribe(IObserver<int> observer) { } }
class Observer : IObserver<int> { public void OnCompleted() { } public void OnError(Exception error) { } public void OnNext(int value) { } }
مجموعههای Observable کلید کار با Rx هستند. در مثال قبل ملاحظه کردیم که با استفاده از متد الحاقی ToObservable بر روی یک کوئری LINQ و یا هر نوع IEnumerable ایی، میتوان یک مجموعهی Observable را ایجاد کرد. خروجی کوئری حاصل از آن به صورت خودکار اینترفیس IObservable را پیاده سازی میکند که دارای یک متد به نام Subscribe است.
در متد Subscribe کاری که به صورت خودکار صورت خواهد گرفت، ایجاد یک حلقهی foreach بر روی مجموعهی مورد آنالیز و سپس فراخوانی متد OnNext کلاس پیاده سازی کنندهی IObserver به ازای هر آیتم موجود در مجموعه است (فراخوانی observer.OnNext). در پایان کار هم فقط return this در اینجا صورت خواهد گرفت. در حین پردازش حلقه، اگر خطایی رخ دهد، متد observer.OnError انجام میشود.
در مثال قبل،کوئری LINQ نوشته شده، خروجی از نوع IObservable ندارد. به کمک متد الحاقی ToObservable:
public static System.IObservable<TSource> ToObservable<TSource>( this System.Collections.Generic.IEnumerable<TSource> source, System.Reactive.Concurrency.IScheduler scheduler)
البته استفاده از متد Subscribe به نحوی که در مثال قبل ذکر شد، خلاصه شدهی الگوی Observer است. اگر بخواهیم دقیقا مانند الگو عمل کنیم، چنین شکلی را خواهد داشت:
var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(); var observer = Observer.Create<int>(onNext: number => Console.WriteLine(number)); observableQuery.Subscribe(observer);
پردازش نتایج یک کوئری LINQ در تردی دیگر توسط Rx
برای اجرای نتایج متد Subscribe در یک ترد جدید، میتوان پارامتر scheduler متد ToObservable را مقدار دهی کرد:
using System; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; namespace Rx01 { class Program { static void Main(string[] args) { Console.WriteLine("Thread-Id: {0}", Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(scheduler: NewThreadScheduler.Default); observableQuery.Subscribe(onNext: number => { Console.WriteLine("number: {0}, on Thread-id: {1}", number, Thread.CurrentThread.ManagedThreadId); }, onCompleted: () => finished()); } private static void finished() { Console.WriteLine("Done!"); } } }
Thread-Id: 1 number: 1, on Thread-id: 3 number: 2, on Thread-id: 3 number: 3, on Thread-id: 3 number: 4, on Thread-id: 3 number: 5, on Thread-id: 3 Done!
NewThreadScheduler.Default در فضای نام System.Reactive.Concurrency واقع شدهاست.
یک نکته
در نگارشهای آغازین Rx، مقدار scheduler را میشد معادل Scheduler.NewThread نیز قرار داد که در نگارشهای جدید منسوخ شده درنظر گرفته شده و به زودی حذف خواهد شد. معادلهای جدید آن اکنون NewThreadScheduler.Default، ThreadPoolScheduler.Default و امثال آن هستند.
مدیریت خاتمهی اعمال انجام شدهی در تردهای دیگر توسط Rx
یکی از مواردی که حین اجرای نتیجهی callbackهای پردازش شدهی در تردهای دیگر نیاز است بدانیم، زمان خاتمهی کار آنها است. برای نمونه در مثال قبل، نمایش Done پس از پایان تمام callbacks انجام شدهاست. فرض کنید، callback پایان عملیات را حذف کرده و متد finished را پس از فراخوانی متد observableQuery.Subscribe قرار دهیم:
observableQuery.Subscribe(onNext: number => { Console.WriteLine("number: {0}, on Thread-id: {1}", number, Thread.CurrentThread.ManagedThreadId); }/*, onCompleted: () => finished()*/); finished();
Thread-Id: 1 number: 1, on Thread-id: 3 Done! number: 2, on Thread-id: 3 number: 3, on Thread-id: 3 number: 4, on Thread-id: 3 number: 5, on Thread-id: 3
مدیریت استثناهای رخ داده در حین پردازش مجموعههای واکنشگرا
متد Subscribe دارای چندین overload است. تا اینجا نمونهای که دارای پارامترهای onNext و onCompleted بودند را بررسی کردیم. اگر بخواهیم مدیریت استثناءها را نیز در اینجا اضافه کنیم، فقط کافی است از overload دیگر آن که دارای پارامتر onError است، استفاده نمائیم:
observableQuery.Subscribe( onNext: number => Console.WriteLine(number), onError: exception => Console.WriteLine(exception.Message), onCompleted: () => finished());
مدیریت ترد اجرای نتایج حاصل از Rx در یک برنامهی دسکتاپ WPF یا WinForms
تا اینجا مشاهده کردیم که اجرای callbackهای observer در یک ترد دیگر، به سادگی تنظیم پارامتر scheduler متد ToObservable است. اما در برنامههای دسکتاپ برای به روز رسانی عناصر رابط کاربری، حتما باید در تردی قرار داشته باشیم که آن رابط کاربری در آن ایجاد شدهاست یا به عبارتی در ترد اصلی برنامه؛ در غیر اینصورت برنامه کرش خواهد کرد. مدیریت این مساله نیز در Rx بسیار سادهاست. ابتدا نیاز است بستهی Rx-WPF را نصب کرد:
PM> Install-Package Rx-WPF
observableQuery.ObserveOn(DispatcherScheduler.Current).Subscribe(...)
observableQuery.ObserveOnDispatcher().Subscribe(...)
و یا اگر از WinForms استفاده میکنید، ابتدا بستهی Rx خاص آنرا نصب کنید:
PM> Install-Package Rx-WinForms
observableQuery.ObserveOn(SynchronizationContext.Current).Subscribe(...)
یک نکته
در Rx فرض میشود که کوئری شما زمانبر است و callbackهای مشاهدهگر سریع عمل میکنند. بنابراین هدف از callbackهای آن، پردازشهای سنگین نیست. جهت آزمایش این مساله، اینبار query ابتدایی برنامه را به شکل ذیل تغییر دهید که در آن بازگشت زمانبر یک سری داده شبیه سازی شدهاند.
var query = Enumerable.Range(1, 5).Select(number => { Thread.Sleep(250); return number; });