Skip to content

.NET Core 使用 CAP 实现分布式事务

🏷️ .NET Core CAP

原本是由于 TransactionScope 暂时在 .NET Core 中无法使用(见 这里),所以想找一个替代的方案,然后就查到了 CAP 项目。

先把结论写在这里:CAP 并不是用来实现数据库的分布式事务的,和 TransactionScope 的功能完全不同。

虽然不能用来实现我的需求,但其功能还是常见的需求(发布/订阅模式),使用起来也很方便。

而且查都查了,也就把调查过程记录了下来。

安装所需要的包

因为项目使用的是 .NET Core 2.1DotNetCore.CAP 的最新版需要依赖 2.2,所以这里使用的是 DotNetCore.CAP 2.3.1 版。

powershell
Install-Package DotNetCore.CAP -Version 2.3.1

CAP 支持 RabbitMQKafkaAzureService 作为消息队列。

powershell
Install-Package DotNetCore.CAP.RabbitMQ -Version 2.3.1

CAP 的事件日志存储支持 SqlServerMySqlPostgreSqlMongoDB (需要 MongoDB 4.0+ cluster)作为事件日志存储。
所谓事件日志,个人理解其类似于消息队列中的消息。这里使用上述四种数据库来存储并确保该消息被消费并成功消费结束。
而且其消费并不保证只被执行一次,即消息接受的接口可能会被调用多次。这点也类似于消息队列的消费。所以也就需要开发者保证消费处理的幂等性。

powershell
Install-Package DotNetCore.CAP.SqlServer -Version 2.3.1

如果需要安装其他的消息队列或数据库,请参考 https://github.com/dotnetcore/CAP

配置

Startup.csConfigureServices 方法中配置 CAP。

任务日志这里使用 ADO.NET(ORM 框架使用的是修改过的 FluentData)和 EntityFramework ,更多类型请参考 GitHub

AddCap 配置数据库的意义在于创建用于存储 CAP 事件日志的表(Cap.PublishedCap.Received,其中 Cap 是 Schema,可以通过 o.Schema 属性配置)。如果对应的数据库中没有该表,则在 _capBus.Publish 发布消息时报 对象名 'Cap.Published' 无效 的错误。

csharp
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
    services.Configure<CookiePolicyOptions>(options =>
    {
        // This lambda determines whether user consent for non-essential cookies is needed for a given request.
        options.CheckConsentNeeded = context => true;
        options.MinimumSameSitePolicy = SameSiteMode.None;
    });

    services.AddDbContext<Models.TestA.TEST_AContext>(); //Options, If you are using EF as the ORM
    services.AddDbContext<Models.TestB.TEST_BContext>(); //Options, If you are using EF as the ORM

    services.AddCap(x =>
    {
        // If you are using EF, you need to add the configuration:
        x.UseEntityFramework<Models.TestA.TEST_AContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! C
        x.UseEntityFramework<Models.TestB.TEST_BContext>(); //Options, Notice: You don't need to config x.UseSqlServer(""") again! C

        // If you are using ADO.NET, choose to add configuration you needed:
        x.UseSqlServer(o =>
        {
            // sqlserverOptions.ConnectionString
            o.ConnectionString = Config.CONNECTION_STRING_SQL_SERVER_A;
        });

        x.UseSqlServer(o =>
        {
            // sqlserverOptions.ConnectionString
            o.ConnectionString = Config.CONNECTION_STRING_SQL_SERVER_B;
        });

        // CAP support RabbitMQ,Kafka,AzureService as the MQ, choose to add configuration you needed:
        x.UseRabbitMQ(o => {
            // rabbitmq options.
            o.HostName = Config.RABBIT_MQ_HOSTNAME;
            o.UserName = Config.RABBIT_MQ_USERNAME;
            o.Password = Config.RABBIT_MQ_PASSWORD;
        });
    });

    services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);

    services.AddSwaggerGen(c =>
    {
        c.SwaggerDoc("v1", new Info { Title = "My API", Version = "v1" });
    });
}

另外这里还配置了 SwaggerUI,方便调用接口。如需使用,先安装 Swashbuckle.AspNetCore 包。

powershell
Install-Package Swashbuckle.AspNetCore -Version 4.0.1

使用 EF & CAP

另外如果使用 EF 需要安装如下包:

powershell
Install-Package Microsoft.EntityFrameworkCore.SqlServer -Version 2.1.11
Install-Package Microsoft.EntityFrameworkCore.Tools -Version 2.1.11
Install-Package Microsoft.EntityFrameworkCore.SqlServer.Design -Version 1.1.6

如何生成 EF 对应的 DbContext 和 Models 请参考 【.NET Core】Entity Framework Core

这里使用了两个数据库 TEST_A 和 TEST_B,分别有 TableA 和 TableB。
下面的代码测试了在一个库中新增数据,然后通知 CAP,在之后的处理中在另一个库中再新增一条数据。

csharp
/// <summary>
/// [分布式事务测试][EF]
/// </summary>
[Route("api/test/scope/ef")]
[ApiController]
public class EFScopeTestController : ControllerBase
{
    Models.TestA.TEST_AContext _dbContextA;

    Models.TestB.TEST_BContext _dbContextB;

    private readonly ICapPublisher _capBus;

    public EFScopeTestController(ICapPublisher capPublisher, Models.TestA.TEST_AContext dbContextA, Models.TestB.TEST_BContext dbContextB)
    {
        _capBus = capPublisher;
        _dbContextA = dbContextA;
        _dbContextB = dbContextB;
    }

    /// <summary>
    /// [分布式事务测试][EF] 成功
    /// </summary>
    /// <returns></returns>
    [HttpPost("success-a")]
    public void EFSuccess()
    {
        using (var trans = _dbContextA.Database.BeginTransaction(_capBus, autoCommit: false))
        {
            //your business logic code
            _dbContextA.TableA.Add(new Models.TestA.TableA()
            {
                Guid = Guid.NewGuid(),
                Name = "EF 测试数据 A",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            });
            _dbContextA.SaveChanges();

            _capBus.Publish("test.scope.ef.success.a.check", DateTime.Now);

            trans.Commit();
        }
    }

    /// <summary>
    /// [分布式事务测试][EF]Check
    /// </summary>
    /// <param name="datetime"></param>
    /// <param name="dbContextB"></param>
    [HttpPost("check-message-a")]
    [CapSubscribe("test.scope.ef.success.a.check")]
    public void CheckTestAReceivedMessage(DateTime datetime)
    {
        Console.WriteLine(datetime);

        using (var trans = _dbContextB.Database.BeginTransaction())
        {
            //your business logic code
            _dbContextB.TableB.Add(new Models.TestB.TableB()
            {
                Guid = Guid.NewGuid(),
                Name = "EF 测试 Check 数据 B",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            });
            _dbContextB.SaveChanges();

            trans.Commit();
        }
    }

    /// <summary>
    /// [分布式事务测试][EF] 成功
    /// </summary>
    /// <returns></returns>
    [HttpPost("success-b")]
    public void EFSuccessB()
    {
        using (var trans = _dbContextB.Database.BeginTransaction(_capBus, autoCommit: false))
        {
            //your business logic code
            _dbContextB.TableB.Add(new Models.TestB.TableB()
            {
                Guid = Guid.NewGuid(),
                Name = "EF 测试数据 B",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            });
            _dbContextB.SaveChanges();

            _capBus.Publish("test.scope.ef.success.b.check", DateTime.Now);

            trans.Commit();
        }
    }

    /// <summary>
    /// [分布式事务测试][EF]Check
    /// </summary>
    /// <param name="datetime"></param>
    /// <param name="dbContextB"></param>
    [HttpPost("check-message-b")]
    [CapSubscribe("test.scope.ef.success.b.check")]
    public void CheckTestBReceivedMessage(DateTime datetime)
    {
        Console.WriteLine(datetime);

        using (var trans = _dbContextA.Database.BeginTransaction())
        {
            //your business logic code
            _dbContextA.TableA.Add(new Models.TestA.TableA()
            {
                Guid = Guid.NewGuid(),
                Name = "EF 测试 Check 数据 A",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            });
            _dbContextA.SaveChanges();

            trans.Commit();
        }
    }
}

这里测试了两种情况,分别是先在 TEST_A 和 TEST_B 库新增数据,结果 Cap.PublishedCap.Received 中的事件日志和想象中的并不一样。
原以为会分别在 TEST_A 和 TEST_B 中的 Cap.PublishedCap.Received 表中各有一条数据,实际结果是 TEST_A.Cap.Received 中没有数据,而 TEST_B.Cap.Received 中有两条数据。

TEST_A.Cap.Published

IdNameContentRetriesAddedExpiresAtStatusName
1137912972419485696test.scope.ef.success.a.check{"Id":"5cfdc3a8039e53669cffb77b","Timestamp":"2019-06-10T10:42:48.6969171+08:00","Content":"2019/6/10 10:42:48","CallbackName":null}02019-06-10 10:42:48.6800000NULLScheduled

TEST_A.Cap.Received

IdNameGroupContentRetriesAddedExpiresAtStatusName

TEST_B.Cap.Published

IdNameContentRetriesAddedExpiresAtStatusName
1137912987879690240test.scope.ef.success.b.check{"Id":"5cfdc3ac039e53669cffb77d","Timestamp":"2019-06-10T10:42:52.3689171+08:00","Content":"2019/6/10 10:42:52","CallbackName":null}02019-06-10 10:42:52.37000002019-06-11 10:42:52.3830000Succeeded

TEST_B.Cap.Received

IdNameGroupContentRetriesAddedExpiresAtStatusName
1137912972738252800test.scope.ef.success.a.checkcap.queue.ds_dnc_cap{"Id":"5cfdc3a8039e53669cffb77b","Timestamp":"2019-06-10T10:42:48.6969171+08:00","Content":"2019/6/10 10:42:48","CallbackName":null}02019-06-10 10:42:48.76000002019-06-11 10:42:48.8530000Succeeded
1137912987955187712test.scope.ef.success.b.checkcap.queue.ds_dnc_cap{"Id":"5cfdc3ac039e53669cffb77d","Timestamp":"2019-06-10T10:42:52.3689171+08:00","Content":"2019/6/10 10:42:52","CallbackName":null}02019-06-10 10:42:52.38700002019-06-11 10:42:52.4000000Succeeded

ADO.NET (FluentData) & CAP

csharp
/// <summary>
/// [分布式事务测试][ADO.NET]
/// </summary>
[Route("api/test/scope/adodotnet")]
[ApiController]
public class AdoDotNetScopeTestController : ControllerBase
{
    private readonly ICapPublisher _capBus;

    public AdoDotNetScopeTestController(ICapPublisher capPublisher)
    {
        _capBus = capPublisher;
    }

    /// <summary>
    /// [分布式事务测试][ADO.NET] 成功
    /// </summary>
    /// <returns></returns>
    [HttpPost("success")]
    public void Success()
    {
        using (var db_wirte = new DbContext().ConnectionString(Config.CONNECTION_STRING_SQL_SERVER_A, new SqlServerProvider()).UseTransactionScope(_capBus))
        {
            db_wirte.Insert("Table_A", new TestModel()
            {
                Guid = Guid.NewGuid(),
                Name = "ADO.NET 测试数据 A",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            }).AutoMap().Execute();

            _capBus.Publish("test.scope.adodotnet.success.check", DateTime.Now);

            db_wirte.Commit();
        }
    }

    /// <summary>
    /// [分布式事务测试][ADO.NET]Check
    /// </summary>
    /// <param name="datetime"></param>
    [HttpPost("check-messagee")]
    [CapSubscribe("test.scope.adodotnet.success.check")]
    public void CheckReceivedMessage(DateTime datetime)
    {
        Console.WriteLine(datetime);

        using (var db_prod = new DbContext().ConnectionString(Config.CONNECTION_STRING_SQL_SERVER_B, new SqlServerProvider()).UseTransactionScope(_capBus))
        {
            db_prod.Insert("Table_B", new TestModel()
            {
                Guid = Guid.NewGuid(),
                Name = "ADO.NET 测试 Check 数据 B",
                CreateTime = DateTime.Now,
                ModifyTime = DateTime.Now,
            }).AutoMap().Execute();

            db_prod.Commit();
        }
    }
}

IDbContext.cs 中增加 UseTransactionScope 接口定义。

csharp
IDbContext UseTransactionScope(ICapPublisher capBus);

Transactions.cs 中增加该接口的实现:

csharp
public IDbContext UseTransactionScope(ICapPublisher capBus)
{
    Data.UseTransaction = true;
    Data.CapBus = capBus;
    return this;
}

修改 ExecuteQueryHandler.csPrepareDbCommand 方法中开启事务的代码。

csharp
if (_command.Data.Context.Data.UseTransaction)
{
    if(_command.Data.Context.Data.Transaction == null)
    {
        if (_command.Data.Context.Data.CapBus != null)
        {
            _command.Data.Context.Data.Transaction = _command.Data.Context.Data.Connection.BeginTransaction(_command.Data.Context.Data.CapBus, autoCommit: false);
        }
        else
        {
            _command.Data.Context.Data.Transaction = _command.Data.Context.Data.Connection.BeginTransaction((System.Data.IsolationLevel)_command.Data.Context.Data.IsolationLevel);
        }

    }
    _command.Data.InnerCommand.Transaction = _command.Data.Context.Data.Transaction;
}

回调函数

_capBus.Publish 方法可以通过 callbackName 参数指定回调函数,在事件成功结束后,会执行回调的事件(回调事件不支持参数)。

不过在事件失败的情况下,并不会执行回调的事件。下面就是一个事件失败的示例代码,回调事件永远不会被执行。

csharp
/// <summary>
/// [分布式事务测试][EF] 失败
/// </summary>
/// <returns></returns>
[HttpPost("fail-a")]
public void EFFail()
{
    using (var trans = _dbContextA.Database.BeginTransaction(_capBus, autoCommit: false))
    {
        //your business logic code
        var model = new Models.TestA.TableA()
        {
            Guid = Guid.NewGuid(),
            Name = "EF 失败测试数据 A",
            CreateTime = DateTime.Now,
            ModifyTime = DateTime.Now,
        };
        _dbContextA.TableA.Add(model);
        _dbContextA.SaveChanges();

        _capBus.Publish("test.scope.ef.fail.a.check", model.Guid, "test.scope.ef.fail.callback.a.check");

        trans.Commit();
    }
}

/// <summary>
/// [分布式事务测试][EF]Check Fail
/// </summary>
/// <param name="guid"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-fail-message-a")]
[CapSubscribe("test.scope.ef.fail.a.check")]
public void CheckTestAReceivedFailMessage(Guid guid)
{
    Console.WriteLine(guid);

    throw new Exception("There is a exception at check-fail-message-a process.");
}

/// <summary>
/// [分布式事务测试][EF]Check Fail Callback
/// </summary>
/// <param name="guid"></param>
/// <param name="dbContextB"></param>
[HttpPost("check-fail-callback-message-a")]
[CapSubscribe("test.scope.ef.fail.callback.a.check")]
public void CheckTestAReceivedFailCallbackMessage()
{
    Console.WriteLine($"fail callback {DateTime.Now}");
}

感想

原本是由于 TransactionScope 暂时在 .NET Core 中无法使用(见 这里),所以想找一个替代的方案。

可惜通过上面的示例可以看出,CAP 和 TransactionScope 实现的目标是完全不同的。

TransactionScope 的目标是保证分布式数据的一致性和原子性,而 CAP 则是通过事务日志来确保后续处理一定会被调用,但并不保证其执行成功,也不能保证数据的原子性。

如果 CAP 能分别设置成功和失败的回调且支持参数的话,可扩展性应该会更好些。

综上,个人认为 CAP 是发布/订阅模式的 .NET Core 易用版,简化了开发者的配置和使用,但和数据库的分布式事务是完全不同的东西。