روشهای متفاوت ایجاد توالی (sequence) در Rx
الف) استفاده از متدهای Factory
1) Observable.Create
نمونهای از استفاده از آنرا در مطلب «معرفی Reactive extensions» مشاهده کردید.
var query = Enumerable.Range(1, 5).Select(number => number); var observableQuery = query.ToObservable(); var observer = Observer.Create<int>(onNext: number => Console.WriteLine(number)); observableQuery.Subscribe(observer);
البته در این مثال فقط delegate مربوط به onNext را ملاحظه میکند. توسط سایر overloadهای آن امکان ذکر delegateهای OnError/OnCompleted نیز وجود دارد.
2) Observable.Return
برای ایجاد یک خروجی Observable از یک مقدار مشخص، میتوان از متد جنریک Observable.Return استفاده کرد. برای مثال:
var observableValue1 = Observable.Return("Value"); var observableValue2 = Observable.Return(2);
public static IObservable<T> Return<T>(T value) { return Observable.Create<T>(o => { o.OnNext(value); o.OnCompleted(); return Disposable.Empty; }); }
var observableValue1 = Observable.Return<string>("Value"); var observableValue2 = Observable.Return<int>(2);
3) Observable.Empty
برای بازگشت یک توالی خالی که تنها کار اطلاع رسانی onCompleted را انجام میدهد.
var emptyObservable = Observable.Empty<string>();
public static IObservable<T> Empty<T>() { return Observable.Create<T>(o => { o.OnCompleted(); return Disposable.Empty; }); }
4) Observable.Never
برای بازگشت یک توالی بدون قابلیت اطلاع رسانی و notification
var neverObservable = Observable.Never<string>();
public static IObservable<T> Never<T>() { return Observable.Create<T>(o => { return Disposable.Empty; }); }
5) Observable.Throw
برای ایجاد یک توالی که صرفا کار اطلاع رسانی OnError را توسط استثنای معرفی شده به آن انجام میدهد.
var throwObservable = Observable.Throw<string>(new Exception());
public static IObservable<T> Throws<T>(Exception exception) { return Observable.Create<T>(o => { o.OnError(exception); return Disposable.Empty; }); }
6) توسط Observable.Range
به سادگی میتوان بازهی Observable ایی را ایجاد کرد:
var range = Observable.Range(10, 15); range.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
7) Observable.Generate
اگر بخواهیم عملیات Observable.Range را پیاده سازی کنیم، میتوان از متد Observable.Generate استفاده کرد:
public static IObservable<int> Range(int start, int count) { var max = start + count; return Observable.Generate( initialState: start, condition: value => value < max, iterate: value => value + 1, resultSelector: value => value); }
8) Observable.Interval
عموما از انواع و اقسام تایمرهای موجود در دات نت مانند System.Timers.Timer ، System.Threading.Timer و System.Windows.Threading.DispatcherTimer برای ایجاد یک توالی از رخدادها استفاده میشود. تمام اینها را به سادگی میتوان توسط متد Observable.Interval، که قابل انتقال به تمام پلتفرمهایی است که Rx برای آنها تهیه شدهاست، جایگزین کرد:
var interval = Observable.Interval(period: TimeSpan.FromMilliseconds(250)); interval.Subscribe(Console.WriteLine, () => Console.WriteLine("completed"));
Overload دوم این متد، امکان معرفی scheduler و اجرای بر روی تردی دیگر را نیز میسر میکند.
9) Observable.Timer
تفاوت Observable.Timer با Observable.Interval در مفهوم پارامتر ارسالی به آنها است:
var timer = Observable.Timer(dueTime: TimeSpan.FromSeconds(1)); timer.Subscribe(Console.WriteLine, () => Console.WriteLine("completed"));
خروجی Observable.Interval مثال زده شده به نحو زیر است و خاتمهای ندارد:
0
1
2
3
4
5
اما خروجی Observable.Timer به نحو ذیل بوده و پس از یک ثانیه، خاتمه مییابد:
0
completed
متد Observable.Timer دارای هفت overload متفاوت است که توسط آنها dueTime (مدت زمان صبر کردن تا تولید اولین خروجی)، period (کار Observable.Timer را به صورت متوالی در بازهی زمانی مشخص شده تکرار میکند) و scheduler (تعیین ترد اجرایی عملیات) قابل مقدار دهی هستند.
اگر میخواهید Observable.Timer بلافاصله شروع به کار کند، مقدار dueTime آنرا مساوی TimeSpan.Zero قرار دهید. به این ترتیب یک Observable.Interval را به وجود آوردهاید که بلافاصله شروع به کار کرده است و تا مدت زمان مشخص شدهای جهت اجرای اولین callback خود صبر نمیکند.
ب) تبدیلگرهایی که خروجی IObservable ایجاد میکنند
برای تبدیل مدلهای برنامه نویسی Async قدیمی دات نت مانند APM، رخدادها و امثال آن به معادلهای Rx، متدهای الحاقی خاصی تهیه شدهاند.
1) تبدیل delegates به معادل Observable
متد Observable.Start، امکان تبدیل یک Func یا Action زمانبر را به یک توالی observable میسر میکند. در این حالت به صورت پیش فرض، پردازش عملیات بر روی یکی از تردهای ThreadPool انجام میشود.
static void StartAction() { var start = Observable.Start(() => { Console.Write("Observable.Start"); for (int i = 0; i < 10; i++) { Thread.Sleep(100); Console.Write("."); } }); start.Subscribe( onNext: unit => Console.WriteLine("published"), onCompleted: () => Console.WriteLine("completed")); } static void StartFunc() { var start = Observable.Start(() => { Console.Write("Observable.Start"); for (int i = 0; i < 10; i++) { Thread.Sleep(100); Console.Write("."); } return "value"; }); start.Subscribe( onNext: Console.WriteLine, onCompleted: () => Console.WriteLine("completed")); }
زمانیکه از Func استفاده میشود، تابع یک خروجی را ارائه داده و سپس توالی خاتمه مییابد. اگر از Action استفاده شود، نوع Observable بازگشت داده شده از نوع Unit است که در برنامه نویسی functional معادل void است و هدف از آن مشخص سازی پایان عملیات Action میباشد. Unit دارای مقداری نبوده و صرفا سبب اجرای اطلاع رسانی OnNext میشود.
تفاوت مهم Observable.Start و Observable.Return در این است که Observable.Start مقدار تابع را به صورت تنبل (lazily) پردازش میکند، اما Observable.Return پردازش حریصانهای (eagrly) را به همراه خواهد داشت. به این ترتیب Observable.Start بسیار شبیه به یک Task (پردازشهای غیرهمزمان) عمل میکند.
در اینجا شاید این سؤال مطرح شود که استفاده از قابلیتهای Async سیشارپ 5 برای اینگونه کارها مناسب است یا Rx؟ قابلیتهای Async بیشتر به اعمال مخصوص IO bound مانند کار با شبکه، دریافت فایل از اینترنت، کار با یک بانک اطلاعاتی خارج از مرزهای سیستم، مرتبط میشوند؛ اما اعمال CPU bound مانند محاسبات سنگین حاصل از توالیهای observable را به خوبی میتوان توسط Rx مدیریت کرد.
2) تبدیل Events به معادل Observable
دات نت از روزهای اول خود به همراه یک event driven programming model بودهاست. Rx متدهایی را برای دریافت یک رخداد و تبدیل آن به یک توالی Observable ارائه دادهاست. برای نمونه ObservableCollection زیر را درنظر بگیرید
var items = new System.Collections.ObjectModel.ObservableCollection<string> { "Item1", "Item2", "Item3" };
items.CollectionChanged += (sender, ea) => { if (ea.Action == NotifyCollectionChangedAction.Remove) { foreach (var oldItem in ea.OldItems.Cast<string>()) { Console.WriteLine("Removed {0}", oldItem); } } };
var removals = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs> ( addHandler: handler => items.CollectionChanged += handler, removeHandler: handler => items.CollectionChanged -= handler ) .Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Remove) .SelectMany(c => c.EventArgs.OldItems.Cast<string>()); var disposable = removals.Subscribe(onNext: item => Console.WriteLine("Removed {0}", item));
items.Remove("Item1");
3) تبدیل Task به معادل Observable
متد ToObservable واقع در فضای نام System.Reactive.Threading.Tasks را بر روی یک Task نیز میتوان فراخوانی کرد:
var task = Task.Factory.StartNew(() => "Test"); var source = task.ToObservable(); source.Subscribe(Console.WriteLine, () => Console.WriteLine("completed"));
4) تبدیل IEnumerable به معادل Observable
با این مورد تاکنون آشنا شدهاید. فقط کافی است متد ToObservable را بر روی یک IEnumerable، جهت تهیه خروجی Observable فراخوانی کرد.
5) تبدیل APM به معادل Observable
APM یا Asynchronous programming model، همان روش کار با متدهای Async با نامهای BeginXXX و EndXXX است که از نگارشهای آغازین دات نت به همراه آن بودهاند. کار کردن با آن مشکل است و مدیریت آن به همراه پراکندگیهای بسیاری جهت کار با callbacks آن است. برای تبدیل این نوع روش برنامه نویسی به روش Rx نیز متدهایی پیش بینی شدهاست؛ مانند Observable.FromAsyncPattern.
یک نکته
کتابخانهای به نام Rxx بسیاری از این محصور کنندهها را تهیه کردهاست:
http://Rxx.codeplex.com
ابتدا بستهی نیوگت آنرا نصب کنید:
PM> Install-Package Rxx
using (new FileStream("file.txt", FileMode.Open) .ReadToEndObservable() .Subscribe(x => Console.WriteLine(x.Length))) { Console.ReadKey(); }