新闻中心

如何用 Redis Streams 构建 .NET 事件存储?

2025-10-10
浏览次数:
返回列表
使用 Redis Streams 可构建高性能 .NET 事件存储,支持按聚合根分隔离事件流、版本控制与消费者组分发。1. 以 events:{id} 为 Key 存储事件流,每条消息含 type、data、timestamp 和 version 字段。2. 利用 StackExchange.Redis 写入和读取事件,通过 StreamAddAsync 和 StreamRangeAsync 操作数据。3. 创建消费者组实现事件广播,StreamReadGroupAsync 拉取消息并用 StreamAcknowledgeAsync 确认处理,确保至少一次交付。4. 使用独立版本键或 Lua 脚本实现乐观锁,防止并发写冲突。该方案具备持久化、高吞吐与水平扩展能力,适用于事件溯源架构。

如何用 redis streams 构建 .net 事件存储?

用 Redis Streams 构建 .NET 事件存储是一种轻量、高性能的实现方式,特别适合需要持久化事件流并支持消费者组处理的场景。Redis Streams 天然支持消息持久化、多消费者组、消息确认机制,非常适合做事件溯源(Event Sourcing)的基础存储。

1. 设计事件存储的基本结构

事件存储的核心是将领域事件按聚合根(Aggregate Root)分类,追加写入事件流。在 Redis 中,可以用 Stream Key 表示一个聚合实例的事件流,例如:

events:order-12345

每个事件作为一条消息写入该 Stream,包含事件类型、时间戳和序列化后的数据。

每条消息字段建议包括:

  • type: 事件类型(如 OrderCreated)
  • data: JSON 序列化的事件内容
  • timestamp: 发生时间
  • version: 聚合版本号,用于乐观并发控制

2. 在 .NET 中使用 StackExchange.Redis 操作 Streams

安装 NuGet 包:

Install-Package StackExchange.Redis

连接 Redis 并写入事件:

var redis = ConnectionMultiplexer.Connect("localhost");
var db = redis.GetDatabase();

var streamKey = "events:order-12345";
var eventId = await db.StreamAddAsync(streamKey, new NameValueEntry[]
{
    new ("type", "OrderCreated"),
    new ("data", JsonSerializer.Serialize(new { OrderId = "12345", Amount = 100 })),
    new ("timestamp", DateTime.UtcNow.ToString("o")),
    new ("version", "1")
});

读取某个聚合的所有事件:

AdMaker AI AdMaker AI

从0到爆款高转化AI广告生成器

AdMaker AI 65 查看详情 AdMaker AI
var entries = await db.StreamRangeAsync(streamKey, null, null);
var events = entries.Select(entry => new {
    Type = entry["type"],
    Data = entry["data"],
    Version = int.Parse(entry["version"])
}).ToList();

3. 支持消费者组处理事件(用于事件分发)

如果需要将事件通知给多个服务,可以创建消费者组:

// 创建消费者组(首次执行)
await db.StreamCreateConsumerGroupAsync(streamKey, "payments-service", "$");

// 拉取未处理的消息
var messages = await db.StreamReadGroupAsync(streamKey, "payments-service", "consumer-1", 10, ">");
foreach (var message in messages)
{
    // 处理业务逻辑
    Console.WriteLine($"Received: {message["type"]}");

    // 确认消息处理完成
    await db.StreamAcknowledgeAsync(streamKey, "payments-service", message.Id);
}

这种模式支持水平扩展,多个消费者可并行处理不同消息,同时保证每条消息只被组内一个消费者处理。

4. 添加版本控制与并发检查

写入事件前应检查当前版本,防止并发冲突:

可通过 StreamLength 或存储一个单独的版本键(如 version:order-12345)来管理版本号。

写入时先获取当前长度或版本:

var currentVersion = await db.StringGetAsync($"version:{aggregateId}");
if (expectedVersion != (long)currentVersion)
    throw new ConcurrencyException();

// 写入事件
await db.StreamAddAsync(streamKey, entries);

// 更新版本
await db.StringSetAsync($"version:{aggregateId}", expectedVersion + 1);

也可通过 Lua 脚本原子化操作:校验版本 + 写入事件 + 自增版本。

基本上就这些。Redis Streams 提供了足够基础的能力,配合 .NET 的序列化和异步处理,可以快速搭建一个高效、可靠的事件存储。关键是设计好 Key 结构、版本控制和消费者组的使用策略。

以上就是如何用 Redis Streams 构建 .NET 事件存储?的详细内容,更多请关注其它相关文章!


# 命令行  # 荣县营销推广公司电话  # 新手网站建设  # 兰州seo网站优化工具技巧  # 智能网站推广价格  # 网站的建设背景  # 快速建设网站步骤  # 网站推广公司的优势  # 江西建设项目公示网站  # 黑龙江推广营销哪家好  # 石家庄企业营销推广  # 可以用  # 首次  # 有哪些  # 是一种  # redis  # 高性能  # 序列化  # 多个  # 如何用  # 每条  # gate  # red  # .net  # stream  # ai  # mac  # edge  # json  # js 


相关栏目: 【 行业资讯67740 】 【 技术百科0 】 【 网络运营39195


相关推荐: linux如何跳回命令行界面  每日推荐电声音乐软件有哪些  春运抢票还用取票吗  怎么用win7系统盘重装系统  单片机是怎么计时的  如何看固态硬盘型号  苹果16日发售哪些机型  j*a数组怎么保存类  固态硬盘如何保存  debug中如何用n命令命名程序文件名  春运抢票可以抢几张  在遥控器中power是什么意思  怎么在typescript定义集合  固态硬盘如何启动  calm是什么意思  红米手机怎么设置变成5G手机  vivo手机nfc功能是什么意思  j*a怎么创建json数组  开机如何运行dos命令提示符  进口超级维特拉三门版power是什么意思  市盈率当中17A 18E是什么意思  bored是什么意思  显示器上power键是什么意思  苹果16讲解有哪些功能  夸克是什么空间单位  typescript文件怎么打开  单片机*计步器怎么用  怎么在项目中使用typescript  夸克还原排版是什么意思  typescript书籍哪个好  j*a数组逆序怎么写  360n5锁屏壁纸怎么设置  2026年将会大爆发的15个新科技  如何利用固态硬盘  春运抢票可以抢几次啊  破太岁是什么意思  固态硬盘如何备份  征信不好如何快速恢复 征信不好快速恢复的方法  单片机怎么发送can 信号  j*a map数组怎么用  ai文件里无法找到链接文件要怎么解决步骤  华硕k20ce怎么装win7  如何加装固态硬盘  vfp 命令窗口如何实现换行  选哪个折叠屏手机好用  苹果16送哪些配件  夸克网盘是什么都有吗  linux如何使用db2命令  单片机怎么储存和显示  j*a怎么用数组缓存 

搜索