Reactive extensions یا به صورت خلاصه Rx ،کتابخانهی سورس باز تهیه شدهای توسط مایکروسافت است که اگر بخواهیم آنرا به سادهترین شکل ممکن تعریف کنیم، معنای Linq to events را میدهد و امکان مدیریت تعاملهای پیچیدهی async را به صورت declaratively فراهم میکند. هدف آن بسط فضای نام System.Linq و تبدیل نتایج یک کوئری LINQ به یک مجموعهی Observable است؛ به همراه مدیریت مسایل همزمانی آن.
این افزونه جزو موفقترین کتابخانههای دات نتی مایکروسافت در سالهای اخیر به شما میرود؛ تا حدی که معادلهای بسیاری از آن برای زبانهای دیگر مانند Java، JavaScript، Python، CPP و غیره نیز تهیه شدهاند.
استفاده از Rx به همراه یک کوئری LINQ
یک برنامهی کنسول جدید را ایجاد کنید. سپس برای نصب کتابخانهی Rx، دستور ذیل را در کنسول پاورشل نیوگت اجرا نمائید:
نصب آن از طریق نیوگت، به صورت خودکار کلیه وابستگیهای مرتبط با آنرا نیز به پروژهی جاری اضافه میکند:
سپس متد Main این برنامه را به نحو ذیل تغییر دهید:
در اینجا یک سری عملیات متداول را مشاهده میکنید. بازهای از اعداد توسط متد Enumerable.Range ایجاد شده و سپس به کمک یک حلقه، تمام آیتمهای آن نمایش داده میشوند. همچنین در پایان کار نیز یک متد دیگر فراخوانی شدهاست.
اکنون اگر بخواهیم همین عملیات را توسط Rx انجام دهیم، به شکل زیر خواهد بود:
ابتدا نیاز است تا کوئری متداول LINQ را تبدیل به نمونهی Observable آن کرد. اینکار را توسط متد الحاقی ToObservable که در فضای نام System.Reactive.Linq تعریف شدهاست، انجام میدهیم. به این ترتیب، هر زمانیکه که عددی به query اضافه میشود، با استفاده از متد Subscribe میتوان تغییرات آنرا تحت کنترل قرار داد. برای مثال در اینجا هربار که عددی در بازهی 1 تا 5 تولید میشود، یکبار پارامتر onNext اجرا خواهد شد. برای نمونه در مثال فوق، از نتیجهی آن برای نمایش مقدار دریافتی، استفاده شدهاست. سپس توسط پارامتر اختیاری onCompleted، در پایان کار، یک متد خاص را میتوان فراخوانی کرد. خروجی برنامه در این حالت نیز به صورت ذیل است:
البته اگر قصد خلاصه نویسی داشته باشیم، سطر آخر متد Main، با سطر ذیل یکی است:
در این مثال ساده صرفا یک Syntax دیگر را نسبت به حلقهی foreach متداول مشاهده کردیم که اندکی فشردهتر است. در هر دو حالت نیز عملیات انجام شده در تردجاری صورت گرفتهاند. اما قابلیتها و ارزشهای واقعی Rx زمانی آشکار خواهند شد که پردازش موازی و پردازش در تردهای دیگر را در آن فعال کنیم.
الگوی Observer
Rx پیاده سازی کنندهی الگوی طراحی شیءگرایی به نام Observer است. برای توضیح آن یک لامپ و سوئیچ برق را درنظر بگیرید. زمانیکه لامپ مشاهده میکند سوئیچ برق در حالت روشن قرار گرفتهاست، روشن خواهد شد و برعکس. در اینجا به سوئیچ، subject و به لامپ، observer گفته میشود. هر زمان که حالت سوئیچ تغییر میکند، از طریق یک callback، وضعیت خود را به observer اعلام خواهد کرد. علت استفاده از callbackها، ارائه راهحلهای عمومی است تا بتواند با انواع و اقسام اشیاء کار کند. به این ترتیب هر بار که شیء observer از نوع متفاوتی تعریف میشود (مثلا بجای لامپ یک خودرو قرار گیرد)، نیازی نخواهد بود تا subject را تغییر داد.
در Rx دو اینترفیس معادل observer و subject تعریف شدهاند. در اینجا اینترفیس IObserver معادل observer است و اینترفیس IObservable معادل subject میباشد:
کار متد Subscribe، اتصال به Observer است و برای این حالت نیاز به کلاسی دارد که اینترفیس IObserver را پیاده سازی کند.
در اینجا OnCompleted زمانی اجرا میشود که پردازش مجموعهای از اعداد int پایان یافته باشد. OnError در زمان وقوع استثنایی اجرا میشود و OnNext به ازای هر عدد موجود در مجموعهی در حال پردازش، یکبار اجرا میشود. البته نیازی به پیاده سازی صریح این اینترفیس نیست و توسط متد توکار Observer.Create میتوان به همین نتیجه رسید.
مجموعههای Observable کلید کار با Rx هستند. در مثال قبل ملاحظه کردیم که با استفاده از متد الحاقی ToObservable بر روی یک کوئری LINQ و یا هر نوع IEnumerable ایی، میتوان یک مجموعهی Observable را ایجاد کرد. خروجی کوئری حاصل از آن به صورت خودکار اینترفیس IObservable را پیاده سازی میکند که دارای یک متد به نام Subscribe است.
در متد Subscribe کاری که به صورت خودکار صورت خواهد گرفت، ایجاد یک حلقهی foreach بر روی مجموعهی مورد آنالیز و سپس فراخوانی متد OnNext کلاس پیاده سازی کنندهی IObserver به ازای هر آیتم موجود در مجموعه است (فراخوانی observer.OnNext). در پایان کار هم فقط return this در اینجا صورت خواهد گرفت. در حین پردازش حلقه، اگر خطایی رخ دهد، متد observer.OnError انجام میشود.
در مثال قبل،کوئری LINQ نوشته شده، خروجی از نوع IObservable ندارد. به کمک متد الحاقی ToObservable:
به صورت خودکار، IEnumerable حاصل از کوئری LINQ را تبدیل به یک IObservable کردهایم. به این ترتیب اکنون کوئری LINQ ما همانند سوئیچ برق عمل میکند و با تغییر آیتمهای موجود در آن، مشاهدهگرهایی که به آن متصل شدهاند (از طریق فراخوانی متد Subscribe)، امکان دریافت سیگنالهای تغییر وضعیت آنرا خواهند داشت.
البته استفاده از متد Subscribe به نحوی که در مثال قبل ذکر شد، خلاصه شدهی الگوی Observer است. اگر بخواهیم دقیقا مانند الگو عمل کنیم، چنین شکلی را خواهد داشت:
ابتدا توسط متد ToObservable یک IObservable (سوئیچ) را ایجاد کردهایم. سپس توسط کلاس Observer موجود در فضای نام System.Reactive، یک IObserver (لامپ) را ایجاد کردهایم. کار اتصال سوئیچ به لامپ در متد Subscribe انجام میشود. اکنون هر زمانیکه تغییری در وضعیت observableQuery حاصل شود، سیگنالی را به observer ارسال میکند. در اینجا callbacks کار مدیریت observer را انجام میدهند.
پردازش نتایج یک کوئری LINQ در تردی دیگر توسط Rx
برای اجرای نتایج متد Subscribe در یک ترد جدید، میتوان پارامتر scheduler متد ToObservable را مقدار دهی کرد:
خروجی این مثال به نحو ذیل است:
پیش از آغاز کار و در متد Main، ترد آی دی ثبت شده مساوی 1 است. سپس هربار که callback متد Subscribe فراخوانی شدهاست، ملاحظه میکنید که ترد آی دی آن مساوی عدد 3 است. به این معنا که کلیه نتایج در یک ترد مشخص دیگر پردازش شدهاند.
NewThreadScheduler.Default در فضای نام System.Reactive.Concurrency واقع شدهاست.
یک نکته
در نگارشهای آغازین Rx، مقدار scheduler را میشد معادل Scheduler.NewThread نیز قرار داد که در نگارشهای جدید منسوخ شده درنظر گرفته شده و به زودی حذف خواهد شد. معادلهای جدید آن اکنون NewThreadScheduler.Default، ThreadPoolScheduler.Default و امثال آن هستند.
مدیریت خاتمهی اعمال انجام شدهی در تردهای دیگر توسط Rx
یکی از مواردی که حین اجرای نتیجهی callbackهای پردازش شدهی در تردهای دیگر نیاز است بدانیم، زمان خاتمهی کار آنها است. برای نمونه در مثال قبل، نمایش Done پس از پایان تمام callbacks انجام شدهاست. فرض کنید، callback پایان عملیات را حذف کرده و متد finished را پس از فراخوانی متد observableQuery.Subscribe قرار دهیم:
اینبار اگر برنامه را اجرا کنیم به خروجی ذیل خواهیم رسید:
این خروجی بدین معنا است که متد observableQuery.Subscribeدر حین اجرا شدن در تردی دیگر، صبر نخواهد کرد تا عملیات مرتبط با آن خاتمه یابد و سپس سطر بعدی را اجرا کند. بنابراین برای حل این مشکل، تنها کافی است به آن اعلام کنیم که پس از پایان عملیات، onCompleted را اجرا کن.
مدیریت استثناهای رخ داده در حین پردازش مجموعههای واکنشگرا
متد Subscribe دارای چندین overload است. تا اینجا نمونهای که دارای پارامترهای onNext و onCompleted بودند را بررسی کردیم. اگر بخواهیم مدیریت استثناءها را نیز در اینجا اضافه کنیم، فقط کافی است از overload دیگر آن که دارای پارامتر onError است، استفاده نمائیم:
اگر callback پارامتر onError اجرا شود، دیگر به onCompleted نخواهیم رسید. همچنین دیگر onNext ایی نیز اجرا نخواهد شد.
مدیریت ترد اجرای نتایج حاصل از Rx در یک برنامهی دسکتاپ WPF یا WinForms
تا اینجا مشاهده کردیم که اجرای callbackهای observer در یک ترد دیگر، به سادگی تنظیم پارامتر scheduler متد ToObservable است. اما در برنامههای دسکتاپ برای به روز رسانی عناصر رابط کاربری، حتما باید در تردی قرار داشته باشیم که آن رابط کاربری در آن ایجاد شدهاست یا به عبارتی در ترد اصلی برنامه؛ در غیر اینصورت برنامه کرش خواهد کرد. مدیریت این مساله نیز در Rx بسیار سادهاست. ابتدا نیاز است بستهی Rx-WPF را نصب کرد:
سپس توسط متد ObserveOn میتوان مشخص کرد که نتیجهی عملیات باید بر روی کدام ترد اجرا شود:
روش دیگر آن استفاده از متد ObserveOnDispatcher میباشد:
بنابراین مشخص سازی پارامتر scheduler متد ToObservable، به معنای اجرای query آن در یک ترد دیگر و استفاده از متد ObserveOn، به معنای مشخص سازی ترد اجرای callbackهای مشاهدهگر است.
و یا اگر از WinForms استفاده میکنید، ابتدا بستهی Rx خاص آنرا نصب کنید:
و سپس ترد اجرای callbackها را SynchronizationContext.Current مشخص نمائید:
یک نکته
در Rx فرض میشود که کوئری شما زمانبر است و callbackهای مشاهدهگر سریع عمل میکنند. بنابراین هدف از callbackهای آن، پردازشهای سنگین نیست. جهت آزمایش این مساله، اینبار query ابتدایی برنامه را به شکل ذیل تغییر دهید که در آن بازگشت زمانبر یک سری داده شبیه سازی شدهاند.
سپس با استفاده از متد ToObservable، ترد دیگری را برای اجرای واقعی آن مشخص کنید تا در حین اجرای آن برنامه در حالت هنگ به نظر نرسد و سپس نمایش آنرا به کمک متد ObserveOn، بر روی ترد اصلی برنامه انجام دهید.
این افزونه جزو موفقترین کتابخانههای دات نتی مایکروسافت در سالهای اخیر به شما میرود؛ تا حدی که معادلهای بسیاری از آن برای زبانهای دیگر مانند Java، JavaScript، Python، CPP و غیره نیز تهیه شدهاند.
استفاده از Rx به همراه یک کوئری LINQ
یک برنامهی کنسول جدید را ایجاد کنید. سپس برای نصب کتابخانهی Rx، دستور ذیل را در کنسول پاورشل نیوگت اجرا نمائید:
PM> Install-Package Rx-Main
<?xml version="1.0" encoding="utf-8"?> <packages> <package id="Rx-Core" version="2.2.4" targetFramework="net45" /> <package id="Rx-Interfaces" version="2.2.4" targetFramework="net45" /> <package id="Rx-Linq" version="2.2.4" targetFramework="net45" /> <package id="Rx-Main" version="2.2.4" targetFramework="net45" /> <package id="Rx-PlatformServices" version="2.2.4" targetFramework="net45" /> </packages>
using System; using System.Linq; namespace Rx01 { class Program { static void Main(string[] args) { var query = Enumerable.Range(1, 5).Select(number => number); foreach (var number in query) { Console.WriteLine(number); } finished(); } private static void finished() { Console.WriteLine("Done!"); } } }
اکنون اگر بخواهیم همین عملیات را توسط Rx انجام دهیم، به شکل زیر خواهد بود:
using System; using System.Linq; using System.Reactive.Linq; namespace Rx01 { class Program { static void Main(string[] args) { var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(); observableQuery.Subscribe(onNext: number => Console.WriteLine(number), onCompleted: () => finished()); } private static void finished() { Console.WriteLine("Done!"); } } }
1 2 3 4 5 Done!
observableQuery.Subscribe(Console.WriteLine, finished);
در این مثال ساده صرفا یک Syntax دیگر را نسبت به حلقهی foreach متداول مشاهده کردیم که اندکی فشردهتر است. در هر دو حالت نیز عملیات انجام شده در تردجاری صورت گرفتهاند. اما قابلیتها و ارزشهای واقعی Rx زمانی آشکار خواهند شد که پردازش موازی و پردازش در تردهای دیگر را در آن فعال کنیم.
الگوی Observer
Rx پیاده سازی کنندهی الگوی طراحی شیءگرایی به نام Observer است. برای توضیح آن یک لامپ و سوئیچ برق را درنظر بگیرید. زمانیکه لامپ مشاهده میکند سوئیچ برق در حالت روشن قرار گرفتهاست، روشن خواهد شد و برعکس. در اینجا به سوئیچ، subject و به لامپ، observer گفته میشود. هر زمان که حالت سوئیچ تغییر میکند، از طریق یک callback، وضعیت خود را به observer اعلام خواهد کرد. علت استفاده از callbackها، ارائه راهحلهای عمومی است تا بتواند با انواع و اقسام اشیاء کار کند. به این ترتیب هر بار که شیء observer از نوع متفاوتی تعریف میشود (مثلا بجای لامپ یک خودرو قرار گیرد)، نیازی نخواهد بود تا subject را تغییر داد.
در Rx دو اینترفیس معادل observer و subject تعریف شدهاند. در اینجا اینترفیس IObserver معادل observer است و اینترفیس IObservable معادل subject میباشد:
class Subject : IObservable<int> { public IDisposable Subscribe(IObserver<int> observer) { } }
class Observer : IObserver<int> { public void OnCompleted() { } public void OnError(Exception error) { } public void OnNext(int value) { } }
مجموعههای Observable کلید کار با Rx هستند. در مثال قبل ملاحظه کردیم که با استفاده از متد الحاقی ToObservable بر روی یک کوئری LINQ و یا هر نوع IEnumerable ایی، میتوان یک مجموعهی Observable را ایجاد کرد. خروجی کوئری حاصل از آن به صورت خودکار اینترفیس IObservable را پیاده سازی میکند که دارای یک متد به نام Subscribe است.
در متد Subscribe کاری که به صورت خودکار صورت خواهد گرفت، ایجاد یک حلقهی foreach بر روی مجموعهی مورد آنالیز و سپس فراخوانی متد OnNext کلاس پیاده سازی کنندهی IObserver به ازای هر آیتم موجود در مجموعه است (فراخوانی observer.OnNext). در پایان کار هم فقط return this در اینجا صورت خواهد گرفت. در حین پردازش حلقه، اگر خطایی رخ دهد، متد observer.OnError انجام میشود.
در مثال قبل،کوئری LINQ نوشته شده، خروجی از نوع IObservable ندارد. به کمک متد الحاقی ToObservable:
public static System.IObservable<TSource> ToObservable<TSource>( this System.Collections.Generic.IEnumerable<TSource> source, System.Reactive.Concurrency.IScheduler scheduler)
البته استفاده از متد Subscribe به نحوی که در مثال قبل ذکر شد، خلاصه شدهی الگوی Observer است. اگر بخواهیم دقیقا مانند الگو عمل کنیم، چنین شکلی را خواهد داشت:
var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(); var observer = Observer.Create<int>(onNext: number => Console.WriteLine(number)); observableQuery.Subscribe(observer);
پردازش نتایج یک کوئری LINQ در تردی دیگر توسط Rx
برای اجرای نتایج متد Subscribe در یک ترد جدید، میتوان پارامتر scheduler متد ToObservable را مقدار دهی کرد:
using System; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; namespace Rx01 { class Program { static void Main(string[] args) { Console.WriteLine("Thread-Id: {0}", Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(scheduler: NewThreadScheduler.Default); observableQuery.Subscribe(onNext: number => { Console.WriteLine("number: {0}, on Thread-id: {1}", number, Thread.CurrentThread.ManagedThreadId); }, onCompleted: () => finished()); } private static void finished() { Console.WriteLine("Done!"); } } }
Thread-Id: 1 number: 1, on Thread-id: 3 number: 2, on Thread-id: 3 number: 3, on Thread-id: 3 number: 4, on Thread-id: 3 number: 5, on Thread-id: 3 Done!
NewThreadScheduler.Default در فضای نام System.Reactive.Concurrency واقع شدهاست.
یک نکته
در نگارشهای آغازین Rx، مقدار scheduler را میشد معادل Scheduler.NewThread نیز قرار داد که در نگارشهای جدید منسوخ شده درنظر گرفته شده و به زودی حذف خواهد شد. معادلهای جدید آن اکنون NewThreadScheduler.Default، ThreadPoolScheduler.Default و امثال آن هستند.
مدیریت خاتمهی اعمال انجام شدهی در تردهای دیگر توسط Rx
یکی از مواردی که حین اجرای نتیجهی callbackهای پردازش شدهی در تردهای دیگر نیاز است بدانیم، زمان خاتمهی کار آنها است. برای نمونه در مثال قبل، نمایش Done پس از پایان تمام callbacks انجام شدهاست. فرض کنید، callback پایان عملیات را حذف کرده و متد finished را پس از فراخوانی متد observableQuery.Subscribe قرار دهیم:
observableQuery.Subscribe(onNext: number => { Console.WriteLine("number: {0}, on Thread-id: {1}", number, Thread.CurrentThread.ManagedThreadId); }/*, onCompleted: () => finished()*/); finished();
Thread-Id: 1 number: 1, on Thread-id: 3 Done! number: 2, on Thread-id: 3 number: 3, on Thread-id: 3 number: 4, on Thread-id: 3 number: 5, on Thread-id: 3
مدیریت استثناهای رخ داده در حین پردازش مجموعههای واکنشگرا
متد Subscribe دارای چندین overload است. تا اینجا نمونهای که دارای پارامترهای onNext و onCompleted بودند را بررسی کردیم. اگر بخواهیم مدیریت استثناءها را نیز در اینجا اضافه کنیم، فقط کافی است از overload دیگر آن که دارای پارامتر onError است، استفاده نمائیم:
observableQuery.Subscribe( onNext: number => Console.WriteLine(number), onError: exception => Console.WriteLine(exception.Message), onCompleted: () => finished());
مدیریت ترد اجرای نتایج حاصل از Rx در یک برنامهی دسکتاپ WPF یا WinForms
تا اینجا مشاهده کردیم که اجرای callbackهای observer در یک ترد دیگر، به سادگی تنظیم پارامتر scheduler متد ToObservable است. اما در برنامههای دسکتاپ برای به روز رسانی عناصر رابط کاربری، حتما باید در تردی قرار داشته باشیم که آن رابط کاربری در آن ایجاد شدهاست یا به عبارتی در ترد اصلی برنامه؛ در غیر اینصورت برنامه کرش خواهد کرد. مدیریت این مساله نیز در Rx بسیار سادهاست. ابتدا نیاز است بستهی Rx-WPF را نصب کرد:
PM> Install-Package Rx-WPF
observableQuery.ObserveOn(DispatcherScheduler.Current).Subscribe(...)
observableQuery.ObserveOnDispatcher().Subscribe(...)
و یا اگر از WinForms استفاده میکنید، ابتدا بستهی Rx خاص آنرا نصب کنید:
PM> Install-Package Rx-WinForms
observableQuery.ObserveOn(SynchronizationContext.Current).Subscribe(...)
یک نکته
در Rx فرض میشود که کوئری شما زمانبر است و callbackهای مشاهدهگر سریع عمل میکنند. بنابراین هدف از callbackهای آن، پردازشهای سنگین نیست. جهت آزمایش این مساله، اینبار query ابتدایی برنامه را به شکل ذیل تغییر دهید که در آن بازگشت زمانبر یک سری داده شبیه سازی شدهاند.
var query = Enumerable.Range(1, 5).Select(number => { Thread.Sleep(250); return number; });