聊聊MassTransit——Consumer Saga(译) 当前热讯
来源:博客园
发布日期:2023-05-29 14:29:59
(资料图片仅供参考)
原文地址:Consumer Sagas
consumer saga是一个由CorrelationId标识的类,它定义了由saga repository持久化的状态。除了状态之外,还可以向saga类添加接口,定义由saga处理的事件。这种状态和行为在单个类中的组合就是一个consumer saga。在下面的示例中,定义了由SubmitOrder消息发起的order saga。
Interfaces
InitiatedBy
public record SubmitOrder : CorrelatedBy{ public Guid CorrelationId { get; init; } public DateTime OrderDate { get; init; }}public class OrderSaga : ISaga, InitiatedBy{ public Guid CorrelationId { get; set; } public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; } public async Task Consume(ConsumeContext context) { SubmitDate = context.Message.OrderDate; }}
当Saga的接收端点接收到SubmitOrder消息时,将使用CorrelationId属性来确定是否存在具有该CorrelationId的现有Saga实例。如果没有找到现有的实例,repository将创建一个新的saga实例,并在新实例上调用Consume方法。在Consume方法完成之后,repository保存新创建的实例。
Orchestrates
要定义由现有的saga实例(如OrderAccepted)编排的事件,需要指定一个额外的接口和方法。
public record OrderAccepted : CorrelatedBy{ public Guid CorrelationId { get; init; } public DateTime Timestamp { get; init; }}public class OrderSaga : ISaga, InitiatedBy, Orchestrates,{ public Guid CorrelationId { get; set; } public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; } public async Task Consume(ConsumeContext context) {...} public async Task Consume(ConsumeContext context) { AcceptDate = context.Message.Timestamp; }}
InitiatedByOrOrchestrates
要定义一个可以启动一个新的或编排一个现有的saga实例(如OrderInvoiced)的事件,需要指定一个额外的接口和方法。
public record OrderInvoiced : CorrelatedBy{ public Guid CorrelationId { get; init; } public DateTime Timestamp { get; init; } public decimal Amount { get; init; }}public class OrderPaymentSaga : ISaga, InitiatedByOrOrchestrates{ public Guid CorrelationId { get; set; } public DateTime? InvoiceDate { get; set; } public decimal? Amount { get; set; } public async Task Consume(ConsumeContext context) { InvoiceDate = context.Message.Timestamp; Amount = context.Message.Amount; }}
Observes
要定义由未实现CorrelatedBy接口(如OrderShipped)的现有saga实例观察到的事件,需要指定一个额外的接口和方法。
public record OrderShipped{ public Guid OrderId { get; init; } public DateTime ShipDate { get; init; }}public class OrderSaga : ISaga, InitiatedBy, Orchestrates, Observes{ public Guid CorrelationId { get; set; } public DateTime? SubmitDate { get; set; } public DateTime? AcceptDate { get; set; } public DateTime? ShipDate { get; set; } public async Task Consume(ConsumeContext context) {...} public async Task Consume(ConsumeContext context) {...} public async Task Consume(ConsumeContext context) { ShipDate = context.Message.ShipDate; } public Expression> CorrelationExpression => (saga,message) => saga.CorrelationId == message.OrderId;}
Configuration
要在配置MassTransit时添加一个saga,请使用如下所示的AddSaga方法
services.AddMassTransit(x =>{ x.AddSaga() .InMemoryRepository();});
关键词: