首页 资讯 > > 正文

聊聊MassTransit——Consumer Saga(译) 当前热讯

来源:博客园 发布日期:2023-05-29 14:29:59 分享到:


(资料图片仅供参考)

原文地址:Consumer Sagas

consumer saga是一个由CorrelationId标识的类,它定义了由saga repository持久化的状态。除了状态之外,还可以向saga类添加接口,定义由saga处理的事件。这种状态和行为在单个类中的组合就是一个consumer saga。在下面的示例中,定义了由SubmitOrder消息发起的order saga。

Interfaces

InitiatedBy

public record SubmitOrder :    CorrelatedBy{    public Guid CorrelationId { get; init; }    public DateTime OrderDate { get; init; }}public class OrderSaga :    ISaga,    InitiatedBy{    public Guid CorrelationId { get; set; }    public DateTime? SubmitDate { get; set; }    public DateTime? AcceptDate { get; set; }    public async Task Consume(ConsumeContext context)    {        SubmitDate = context.Message.OrderDate;    }}

当Saga的接收端点接收到SubmitOrder消息时,将使用CorrelationId属性来确定是否存在具有该CorrelationId的现有Saga实例。如果没有找到现有的实例,repository将创建一个新的saga实例,并在新实例上调用Consume方法。在Consume方法完成之后,repository保存新创建的实例。

Orchestrates

要定义由现有的saga实例(如OrderAccepted)编排的事件,需要指定一个额外的接口和方法。

public record OrderAccepted :    CorrelatedBy{    public Guid CorrelationId { get; init; }    public DateTime Timestamp { get; init; }}public class OrderSaga :    ISaga,    InitiatedBy,    Orchestrates,{    public Guid CorrelationId { get; set; }    public DateTime? SubmitDate { get; set; }    public DateTime? AcceptDate { get; set; }    public async Task Consume(ConsumeContext context) {...}    public async Task Consume(ConsumeContext context)    {        AcceptDate = context.Message.Timestamp;    }}

InitiatedByOrOrchestrates

要定义一个可以启动一个新的或编排一个现有的saga实例(如OrderInvoiced)的事件,需要指定一个额外的接口和方法。

public record OrderInvoiced :    CorrelatedBy{    public Guid CorrelationId { get; init; }    public DateTime Timestamp { get; init; }    public decimal Amount { get; init; }}public class OrderPaymentSaga :    ISaga,    InitiatedByOrOrchestrates{    public Guid CorrelationId { get; set; }    public DateTime? InvoiceDate { get; set; }    public decimal? Amount { get; set; }    public async Task Consume(ConsumeContext context)    {        InvoiceDate = context.Message.Timestamp;        Amount = context.Message.Amount;    }}

Observes

要定义由未实现CorrelatedBy接口(如OrderShipped)的现有saga实例观察到的事件,需要指定一个额外的接口和方法。

public record OrderShipped{    public Guid OrderId { get; init; }    public DateTime ShipDate { get; init; }}public class OrderSaga :    ISaga,    InitiatedBy,    Orchestrates,    Observes{    public Guid CorrelationId { get; set; }    public DateTime? SubmitDate { get; set; }    public DateTime? AcceptDate { get; set; }    public DateTime? ShipDate { get; set; }    public async Task Consume(ConsumeContext context) {...}    public async Task Consume(ConsumeContext context) {...}    public async Task Consume(ConsumeContext context)    {        ShipDate = context.Message.ShipDate;    }    public Expression> CorrelationExpression =>        (saga,message) => saga.CorrelationId == message.OrderId;}

Configuration

要在配置MassTransit时添加一个saga,请使用如下所示的AddSaga方法

services.AddMassTransit(x =>{    x.AddSaga()        .InMemoryRepository();});

关键词:

x 广告

河北印发出台通用机场布局规划(2021-2030年)

到2030年,全省形成以A类通用机场为主体、B类通用机场为补充,功能完善、覆盖广泛的通用机场体系,全省通用机场达到23个。其中,到2025年全

复原民国旧菜单 一批“消失的名菜”重现羊城

  中新网广州12月5日电 (记者 程景伟)“粤宴中国·消失的名菜”活动4日晚在广州博物馆镇海楼广场举行,一批业已失传或十分罕见的传统粤

青海再度“双清零”:战“疫”催生定点救治医院反思与成长

  中新网西宁12月5日电 题:青海再度“双清零”:战“疫”催生定点救治医院反思与成长  作者 潘雨洁  全面停诊、四下无人;火线冲

世界海拔最高高铁客运站山丹马场站运营

  中新网兰州12月5日电 (记者 杨艳敏)记者从中国铁路兰州局集团有限公司获悉,12月5日10时29分随着嘉峪关南至西安北D2696次动车组列车

千年古都洛阳为何要建青年友好型城市?

  中新网洛阳12月5日电 题:千年古都洛阳为何要建青年友好型城市?  记者 肖开霖 李贵刚  千年古都洛阳日前公布《洛阳市建设青年

甘肃万余河长公示牌拥有“电子身份证” 局地启“千里眼”治水

  中新网兰州12月5日电 (记者 冯志军)记者5日从甘肃省水利厅获悉,今年以来,甘肃全面推动河长公示牌信息化建设,为全省河流换发“电子

满洲里市向呼伦贝尔市“手递手”异地转运3批次隔离人员

  (抗击新冠肺炎)满洲里市向呼伦贝尔市“手递手”异地转运3批次隔离人员  中新网呼伦贝尔12月5日电 (记者 张玮)5日,内蒙古自治区呼

2021年度法治人物沈云如:让群众过上“有身份的生活”

  中新网杭州12月5日电 题:2021年度法治人物沈云如:让群众过上“有身份的生活”  作者 郭其钰 张先登  行程10余万公里,为辖区3

x 广告

Copyright   2015-2032 华西海洋网版权所有  备案号:京ICP备2022016840号-35   联系邮箱: 920 891 263@qq.com