1.工作原理
CAP(Consistent,Available,Partition - tolerant)框架在.NET 中的工作原理。CAP 是一个用于.NET 的分布式事务解决方案,主要用于实现微服务架构中的数据一致性(实现分布式事务的最终一致性),其工作原理如下:
发布 / 订阅模型
-
CAP 基于发布 / 订阅模式来处理消息。当一个应用程序需要执行一个可能涉及多个微服务的数据操作时,它会将相关的操作封装成一个消息,并将其发布到消息队列中。其他对该消息感兴趣的微服务则会订阅这个消息队列,当消息到达时,订阅者会接收并处理消息。
本地事务与消息持久化
-
在发布消息的过程中,CAP 使用了本地事务来确保消息的可靠性。当一个服务执行一个业务操作(如数据库插入、更新等)时,它会将这个操作与消息的发布放在同一个本地事务中。如果业务操作成功,那么消息也会被成功持久化到数据库的Published表中;如果业务操作失败,那么整个事务回滚,消息也不会被发布。这样就保证了消息与业务数据的一致性。
消息发送与重试机制
-
消息持久化后,CAP 会尝试将消息发送到消息队列(如 RabbitMQ、Kafka 等)。如果消息发送失败,CAP 会根据预设的重试策略进行重试。重试机制可以确保在消息队列暂时不可用或出现网络故障等情况下,消息不会丢失,最终能够成功发送到消息队列。
消费者接收与处理消息
-
订阅了消息队列的消费者服务会从队列中接收消息,并根据消息的内容执行相应的业务逻辑。在处理消息时,消费者也会使用本地事务来确保消息处理的原子性。如果消息处理成功,消费者会将消息标记为已处理,并从Received表中删除相关记录;如果处理失败,消费者可以根据具体情况选择重试或进行其他处理方式。
分布式事务一致性保证
-
通过上述的发布 / 订阅模型、本地事务与消息持久化、消息发送与重试机制以及消费者接收与处理消息等一系列操作,CAP 框架实现了在分布式系统中跨多个微服务的数据一致性。即使在出现网络分区、服务故障等情况下,CAP 也能够通过消息的持久化和重试等机制,保证数据在最终一致性的前提下进行可靠的传输和处理。
CAP 框架通过巧妙地结合本地事务、消息队列和重试机制等技术,为.NET 微服务架构提供了一种高效、可靠的分布式事务解决方案,帮助开发者在复杂的分布式环境中轻松实现数据的一致性和可靠性。
2.作用
实现分布式事务的最终一致性
- 在分布式系统中,保证数据一致性颇具挑战,如电商场景里用户下单需同时更新库存、订单和支付信息,若服务失败易致数据不一致。CAP 不采用两阶段提交(2PC) ,而是用本地消息表 + MQ(如 RabbitMQ、Kafka 等消息队列 )的异步确保方式。具体过程为:
- 业务操作与事件发布:服务完成操作后,发布含操作类型、数据标识等信息的事件到消息队列。
- 事件存储与确认:发布的事件存于指定介质以防丢失,CAP 等待事件处理确认。
- 事件处理与重试机制:其他服务接收事件并处理,成功则发送确认,失败则 CAP 按设定策略重试,直至成功或达最大重试次数。为避免重复处理致数据不一致,处理逻辑需保证幂等性。
- 数据一致性检查与补偿:必要时,CAP 定期检查数据一致性,通过补偿机制修复不一致问题。
- 充当高可用的 EventBus(事件总线)
- 实现了 EventBus 的发布 / 订阅功能,借助本地消息表对消息持久化。当消息队列宕机或连接失败,消息也不会丢失。比如在复杂业务系统中,各微服务可通过 CAP 实现的 EventBus 进行通信。一个服务发布事件(如订单创建事件 ),其他订阅该事件的服务(如库存服务、物流服务 )就能收到通知并执行相应操作(如扣减库存、安排发货 ) 。
- 实际使用时,需引用相关包(基本包、消息层包、数据库包 ) ,配置本地消息记录库,生产者通过 _capBus.PublishAsync 方法发布消息,消费者在 Controller 或服务层通过 [CapSubscribe] 特性标记方法来订阅处理消息 。 此外,在应用中要注意确保幂等性、合理配置重试策略、搭建监控与报警机制以及进行性能调优 。
3.Published 和Received 表作用
在.NET CAP(一个用于解决微服务或分布式系统中分布式事务问题的开源项目 )中:
Published 表作用
-
消息暂存:当生产者服务执行本地事务(如创建订单 )时,会将相关事件消息(如订单创建事件 )插入到该表,此时消息状态为未发布。这是为了确保消息和本地业务事务的原子性,即要么两者都成功执行,要么都失败回滚,避免消息丢失。
-
发布状态记录:CAP 会扫描该表,将未发布消息发送到消息队列(如 RabbitMQ、Kafka ) 。消息成功发送后,会更新该表中消息的状态为已发布,用于记录消息的发布过程和状态。
-
保障数据一致性:通过该表对消息的存储和管理,为实现分布式事务的最终一致性提供支持。比如在分布式系统中,不同服务通过消息进行交互,该表可确保消息按正确顺序、在合适时机被发送到消息队列,进而被消费者服务接收处理,保证各服务间数据状态的一致。
-
故障恢复依据:当系统出现故障(如消息队列短暂不可用 ) ,重启后可依据该表中未成功发布的消息记录,进行重试发送操作,恢复消息传递流程,保障系统可靠运行。
Received 表作用
-
消息接收记录:消费者从消息队列接收消息后,该表用于记录已接收的消息。记录内容包括消息的唯一标识、消息体、接收时间、消息来源等相关信息,方便对消息的消费情况进行跟踪和审计。
-
处理重复消费:通过记录已处理消息的相关信息(如消息 ID ) ,帮助消费者判断接收的消息是否已被处理过,以实现幂等消费。当消费者接收到消息时,会检查该表中是否已有相同标识的消息记录,若存在则可避免重复处理,确保即使在网络抖动、消息重发等情况下,也不会因重复消费导致数据不一致等问题。
-
监控与排查:开发人员可通过查看该表,了解消息的接收和处理情况,排查消息消费过程中出现的问题。比如,若发现某类消息一直未被正确处理,可根据表中记录进一步分析是消息格式问题、消费者服务异常还是其他原因,便于定位和解决问题,保障分布式系统的正常运行。
4.CapSubscribe的作用
[CapSubscribe]是.NET CAP 框架中的一个特性(Attribute)1。
作用
标识事件处理方法:用于标记那些需要处理特定事件的方法,告诉 CAP 框架当有相关事件发布时,需要调用这些被标记的方法来进行处理1。
实现消息订阅:使服务能够订阅由其他服务发布的事件消息,从而实现微服务之间基于事件的异步通信和交互,以保证各服务间的数据同步和业务流程的协同1。
使用方法
在 Controller 中使用:如果处理事件的方法在 Controller 中,直接在 Action 方法上添加[CapSubscribe]特性即可。例如:
csharp[Route("api/[controller]")][ApiController]public class MyController : ControllerBase{[CapSubscribe("xxx.services.show.time")]public IActionResult HandleShowTime(DateTime time){// 处理接收到的消息,例如记录日志、更新数据库等操作Console.WriteLine($"Received time: {time}");return Ok();}}
在非 Controller 类中使用:若订阅方法不在 Controller 中,订阅的类需要继承ICapSubscribe接口。例如:
csharppublic class MySubscriberService : ICapSubscribe{[CapSubscribe("xxx.services.show.time")]public void HandleShowTime(DateTime time){// 处理接收到的消息Console.WriteLine($"Received time: {time}");}}然后在Startup.cs中的ConfigureServices方法中注入该服务类:csharpservices.AddTransient<MySubscriberService>();
指定参数:
Name参数:用于指定订阅的消息名称,对应通过_capBus.PublishAsync("name", ...)发布消息时指定的名称。在不同的消息队列中,它对应不同的项,如在 RabbitMQ 中对应路由键(routing key),在 Kafka 中对应主题(topic)等。
Group参数:可选参数,用于将订阅者放置在一个单独的消费者组中。具有相同名称但不同组的订阅者都会接收消息;而同一组内具有相同名称的订阅者只有一个会接收消息。
GroupConcurrent参数:可选参数,用于设置订阅者并发执行的并行度。如果不指定Group参数,CAP 会自动使用Name的值创建一个组。需要注意的是,该设置仅适用于新消息,重试的消息不受并发限制。
在使用[CapSubscribe]前,需在项目中引入 CAP 相关的 NuGet 包,并在Startup.cs中进行 CAP 的配置,包括选择消息队列(如x.UseRabbitMQ("ConnectionStrings"))和存储介质(如x.UseSqlServer("数据库连接字符串"))
5.案例说明
以下是一个完整的示例,展示了如何使用 CAP、SQL Server 和 RabbitMQ 来实现下单服务器和订单处理服务器之间的协作,完成订单处理流程。
项目结构和整体流程概述
-
下单服务器(OrderPlacingServer):一个.NET Core Web API 项目,负责接收用户的下单请求,执行本地订单数据插入操作,并通过 CAP 发布订单消息到消息队列。
-
订单处理服务器(OrderProcessingServer):一个.NET Core Console 应用项目,订阅来自消息队列的订单消息,处理订单逻辑并更新订单状态。
-
共享项目(OrderShared):用于存放共享的实体类和数据库上下文,两个服务器项目都会引用该项目。
整体流程如下:
-
用户在下单服务器发起下单请求。
-
下单服务器执行本地事务插入订单数据到 SQL Server 数据库,并通过 CAP 将订单消息发布到 RabbitMQ 队列。
-
订单处理服务器从 RabbitMQ 队列接收订单消息,执行订单处理逻辑,更新订单状态并保存到 SQL Server 数据库。
详细代码及解释
1. OrderShared 项目
csharp
// Order.cs
using System;
using Microsoft.EntityFrameworkCore;namespace OrderShared
{public class Order{public int OrderId { get; set; }public string CustomerName { get; set; }public decimal TotalAmount { get; set; }public string Status { get; set; } = "Placed"; // 初始状态为已下单}public class OrderDbContext : DbContext{public OrderDbContext(DbContextOptions<OrderDbContext> options) : base(options){}public DbSet<Order> Orders { get; set; }protected override void OnModelCreating(ModelBuilder modelBuilder){// 可以在这里配置实体关系等base.OnModelCreating(modelBuilder);}}
}
解释:
-
Order
类定义了订单的实体结构,包含订单 ID、客户名称、总金额和订单状态等属性。 -
OrderDbContext
是继承自DbContext
的数据库上下文类,用于与 SQL Server 数据库进行交互,定义了Orders
属性来操作Order
实体的集合。
2. OrderPlacingServer 项目(.NET Core Web API)
csharp
// Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;namespace OrderPlacingServer
{public class Startup{public Startup(IConfiguration configuration){Configuration = configuration;}public IConfiguration Configuration { get; }public void ConfigureServices(IServiceCollection services){// 配置SQL Server数据库连接services.AddDbContext<OrderDbContext>(options =>options.UseSqlServer(Configuration.GetConnectionString("OrderDbConnection")));// 配置CAPservices.AddCap(x =>{// 使用SQL Server作为存储x.UseEntityFramework<OrderDbContext>();// 使用RabbitMQ作为消息队列x.UseRabbitMQ(Configuration.GetConnectionString("RabbitMQConnection"));// 设置重试次数和间隔x.FailedRetryCount = 3;x.FailedRetryInterval = 5000;});services.AddControllers();}public void Configure(IApplicationBuilder app, IWebHostEnvironment env){if (env.IsDevelopment()){app.UseDeveloperExceptionPage();}app.UseRouting();app.UseAuthorization();app.UseEndpoints(endpoints =>{endpoints.MapControllers();});// 注册CAPapp.UseCap();}}
}// OrdersController.cs
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using DotNetCore.CAP;
using OrderShared;namespace OrderPlacingServer.Controllers
{[Route("api/[controller]")][ApiController]public class OrdersController : ControllerBase{private readonly ICapPublisher _capPublisher;public OrdersController(ICapPublisher capPublisher){_capPublisher = capPublisher;}[HttpPost]public async Task<IActionResult> PlaceOrder(Order order){using (var scope = HttpContext.RequestServices.CreateScope()){var dbContext = scope.ServiceProvider.GetRequiredService<OrderDbContext>();// 开启本地事务using (var transaction = dbContext.Database.BeginTransaction(_capPublisher)){try{// 插入订单数据到本地数据库dbContext.Orders.Add(order);await dbContext.SaveChangesAsync();// 发布订单消息到CAPawait _capPublisher.PublishAsync("order.process", order);// 提交事务transaction.Commit();return Ok("Order placed successfully");}catch (Exception ex){// 回滚事务transaction.Rollback();return BadRequest($"Failed to place order: {ex.Message}");}}}}}
}
解释:
-
Startup.cs
:配置了 SQL Server 数据库连接,使用OrderDbContext
来操作数据库。配置 CAP,指定使用 SQL Server 作为消息存储,RabbitMQ 作为消息队列,并设置了重试策略。在Configure
方法中注册了 CAP 中间件。 -
OrdersController.cs
:PlaceOrder
方法接收订单数据,通过ICapPublisher
发布订单消息到名为order.process
的主题。在本地事务中,先插入订单数据到数据库,然后发布消息,如果操作成功则提交事务,失败则回滚事务。
3. OrderProcessingServer 项目(.NET Core Console 应用)
csharp
// Program.cs
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using DotNetCore.CAP;
using OrderShared;namespace OrderProcessingServer
{class Program{static async Task Main(string[] args){var host = Host.CreateDefaultBuilder(args).ConfigureServices((hostContext, services) =>{// 配置SQL Server数据库连接services.AddDbContext<OrderDbContext>(options =>options.UseSqlServer(hostContext.Configuration.GetConnectionString("OrderDbConnection")));// 配置CAPservices.AddCap(x =>{x.UseEntityFramework<OrderDbContext>();x.UseRabbitMQ(hostContext.Configuration.GetConnectionString("RabbitMQConnection"));});// 注册订单消费者服务services.AddHostedService<OrderConsumerHostedService>();}).Build();await host.RunAsync();}}// OrderConsumer.csusing System.Threading.Tasks;using DotNetCore.CAP;using OrderShared;public class OrderConsumer : ICapSubscribe{private readonly OrderDbContext _dbContext;public OrderConsumer(OrderDbContext dbContext){_dbContext = dbContext;}[CapSubscribe("order.process")]public async Task ProcessOrder(Order order){try{// 处理订单逻辑,例如更新订单状态为已处理order.Status = "Processed";_dbContext.Orders.Update(order);await _dbContext.SaveChangesAsync();System.Console.WriteLine($"Processed order: {order.OrderId}");}catch (Exception ex){System.Console.WriteLine($"Failed to process order: {ex.Message}");throw; // 让CAP进行重试}}}// OrderConsumerHostedService.csusing System.Threading;using System.Threading.Tasks;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using DotNetCore.CAP;public class OrderConsumerHostedService : BackgroundService{private readonly ICapPublisher _capPublisher;private readonly ILogger<OrderConsumerHostedService> _logger;public OrderConsumerHostedService(ICapPublisher capPublisher, ILogger<OrderConsumerHostedService> logger){_capPublisher = capPublisher;_logger = logger;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){// 这里可以做一些定时任务相关的操作,目前只是简单等待await Task.Delay(1000, stoppingToken);}}}
}
解释:
-
Program.cs
:创建了一个.NET Core 主机,配置了 SQL Server 数据库连接和 CAP。注册了OrderConsumerHostedService
,用于在后台运行应用并处理消息。 -
OrderConsumer.cs
:OrderConsumer
类实现了ICapSubscribe
接口,ProcessOrder
方法被[CapSubscribe("order.process")]
标记,用于处理接收到的订单消息。在方法中更新订单状态并保存到数据库,如果处理失败则抛出异常让 CAP 进行重试。 -
OrderConsumerHostedService.cs
:OrderConsumerHostedService
继承自BackgroundService
,在ExecuteAsync
方法中可以实现一些定时任务逻辑,目前只是简单地等待,确保应用持续运行以接收和处理消息。
配置和注意事项
-
配置文件:在两个项目的
appsettings.json
中配置 SQL Server 数据库连接字符串和 RabbitMQ 连接字符串。
json
{"ConnectionStrings": {"OrderDbConnection": "Data Source=YOUR_SERVER_NAME;Initial Catalog=YOUR_DATABASE_NAME;User ID=YOUR_USERNAME;Password=YOUR_PASSWORD","RabbitMQConnection": "host=YOUR_RABBITMQ_HOST;port=YOUR_RABBITMQ_PORT;user=YOUR_USERNAME;password=YOUR_PASSWORD"}
}
-
幂等性:订单处理方法
ProcessOrder
应保证幂等性,避免重复处理导致数据不一致。 -
异常处理:合理处理异常,确保消息能够正确重试或进行其他处理。
-
数据一致性:下单服务器的本地事务和消息发布应保证原子性,确保订单数据和消息状态的一致性。
-
消息队列健康:确保 RabbitMQ 服务正常运行,网络连接稳定,避免消息丢失或积压。
通过以上代码和配置,可以实现下单服务器和订单处理服务器之间基于 CAP、SQL Server 和 RabbitMQ 的订单处理流程。