پیاده سازی CQRS توسط MediatR - قسمت پنجم
اندازه‌ی قلم متن
تخمین مدت زمان مطالعه‌ی مطلب: چهار دقیقه

کدهای این قسمت به‌روزرسانی شده و از این ریپازیتوری قابل دسترسی است.


Event Sourcing

در این قسمت قصد داریم تا اطلاعات Command‌های خود را بعد از Process، داخل یک دیتابیس Append-Only ذخیره کنیم. با استفاده از این روش میتوانیم بفهمیم در یک تاریخ مشخص، با چه ورودی‌هایی ( Request )، چه جواب ( Response ) ای در آن لحظه از برنامه برگشت داده شده‌است.


برای پیاده سازی Event Sourcing از دیتابیس EventStore که سورس آن نیز در گیتهاب قابل دسترسی است، استفاده خواهیم کرد. توجه داشته باشید که شما میتوانید از دیتابیس‌های دیگری مثل Elasticsearch, Redis و ... به‌منظور دیتابیس Event Store خود استفاده کنید و محدود به EventStore نیستید.

ما برای راه اندازی دیتابیس EventStore در این قسمت، از Docker استفاده خواهیم کرد. آموزش Docker قبلا طی مقالاتی (2 , 1) در سایت قرار گرفته‌است و در این مقاله به تکرار نحوه استفاده از آن نخواهیم پرداخت.

با استفاده از دستور زیر، EventStore را از روی Docker Hub که Registry پیشفرض است، Pull و اجرا میکنیم و پورت‌های 2113 و 1113 آن را به بیرون Expose میکنیم تا داخل برنامه خود، از آن‌ها استفاده کنیم:
docker run --name eventstore-node -d -p 2113:2113 -p 1113:1113 eventstore/eventstore

EventStore دارای پنل ادمینی است که از طریق http://localhost:2113 قابل دسترسی است. Username پیشفرض آن برابر با admin و کلمه عبور آن برابر با changeit است.

بعد از لاگین در پنل ادمین، با چنین Dashboard ای مواجه خواهید شد و نشان از این دارد که EventStore به‌درستی اجرا شده است:



برای استفاده از EventStore داخل برنامه خود، مانند دیگر دیتابیس‌ها، Client موجود آن را برای #C، از NuGet نصب میکنیم:
Install-Package EventStore.Client

سپس کلاسی بنام EventStoreDbContext ایجاد و منطق ارتباط با EventStore را داخل آن قرار میدهیم :
public class EventStoreDbContext : IEventStoreDbContext
{
    public async Task<IEventStoreConnection> GetConnection()
    {
        IEventStoreConnection connection = EventStoreConnection.Create(
            new IPEndPoint(IPAddress.Loopback, 1113),
            nameof(MediatrTutorial));

        await connection.ConnectAsync();

        return connection;
    }

    public async Task AppendToStreamAsync(params EventData[] events)
    {
        const string appName = nameof(MediatrTutorial);
        IEventStoreConnection connection = await GetConnection();

        await connection.AppendToStreamAsync(appName, ExpectedVersion.Any, events);
    }
}

همانطور که می‌بینید، با استفاده از IP 1113 که در بالاتر با استفاده از Docker آن را Expose کرده بودیم، به EventStore متصل شده‌ایم. همچنین برای متد AppendToStreamAsync خود EventStore ، یک Facade نوشته‌ایم که نحوه کار با آن را برایمان راحت‌تر کرده‌است.

با توجه به اینکه EventStore در Documentation خود بیان کرده که Thread-Safe است، در DI Container خود، EventStoreDbContext را بصورت Singleton ثبت و Register میکنیم و در طول عمر برنامه، یک instance از آن خواهیم داشت:
services.AddSingleton<IEventStoreDbContext, EventStoreDbContext>();

قصد داریم Request هایی را که از نوع Command هستند، همراه با Response آن‌ها داخل EventStore ذخیره کنیم. برای تشخیص Query/Command بودن یک Request ، از نام آنها استفاده خواهیم کرد. همانطور که در قسمت‌های قبل گفتیم ، Command‌ها باید با ذکر "Command" در پایان نامشان همراه باشند.

این یک Convention در برنامه ماست که باید رعایت شود. ( Convention Over Configuration )



مانند Behavior‌های قبلی، یک Behavior جدید را بنام EventLoggerBehavior ایجاد و از IPipelineBehavior ارث بری کرده و EventStoreDbContext خود را به آن Inject میکنیم:
public class EventLoggerBehavior<TRequest, TResponse> :
   IPipelineBehavior<TRequest, TResponse>
{
    readonly IEventStoreDbContext _eventStoreDbContext;

    public EventLoggerBehavior(IEventStoreDbContext eventStoreDbContext)
    {
        _eventStoreDbContext = eventStoreDbContext;
    }

    public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
    {
        TResponse response = await next();

        string requestName = request.ToString();

        // Commands convention
        if (requestName.EndsWith("Command"))
        {
            Type requestType = request.GetType();
            string commandName = requestType.Name;

            var data = new Dictionary<string, object>
            {
                {
                    "request", request
                },
                {
                    "response", response
                }
            };

            string jsonData = JsonConvert.SerializeObject(data);
            byte[] dataBytes = Encoding.UTF8.GetBytes(jsonData);

            EventData eventData = new EventData(eventId: Guid.NewGuid(),
                type: commandName,
                isJson: true,
                data: dataBytes,
                metadata: null); 

            await _eventStoreDbContext.AppendToStreamAsync(eventData);
        }

        return response;
    }
}

با استفاده از این Behavior، فقط Request هایی را که Command هستند و State برنامه را تغییر میدهند، داخل EventStore ذخیره میکنیم. اکنون کافیست تا این Behavior را داخل DI Container خود اضافه کنیم :
services.AddScoped(typeof(IPipelineBehavior<,>), typeof(EventLoggerBehavior<,>));

اگر برنامه را اجرا و یکی از Command‌ها را مانند CreateCustomerCommand، با استفاده از api/Customers <= POST فراخوانی کنید، Request و Response شما با Type آن Command و همراه با DateTime ای که این Request رخ داده‌است، داخل EventStore ذخیره خواهد شد که در Admin Panel مربوط به EventStore، در تب Stream Browser قابل مشاهده است :



نامگذاری این بخش به Stream، بدلیل این است که ما جریان و تاریخچه‌ای از وقایع بوجود آمده در سیستم را داریم که با استفاده از آن‌ها میتوانیم به وضعیت جاری و نحوه رسیدن به این State دست پیدا کنیم.
  • #
    ‫۴ سال و ۴ ماه قبل، چهارشنبه ۱۰ اردیبهشت ۱۳۹۹، ساعت ۱۴:۰۸
    اگر سرور دیتابیس Event Store استاپ شده باشد یا ارتباطش قطع شده باشد. چه اتفاقی می‌افتد؟ ایا ادامه کد اجرا شده و Client پاسخ را دریافت میکند؟ یا خطای 500 رخ میدهد؟