V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
drymonfidelia
V2EX  ›  .NET

同一程序内的消息队列,这样用单例的简易实现比 redis 的 Pub-Sub 效率高吗?如果不实现 Unsubscribe,有内存泄漏风险吗?

  •  
  •   drymonfidelia · 29 天前 · 1017 次点击
    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    
    public class BasicMessageBus
    {
        private static readonly BasicMessageBus _instance = new();
        public static BasicMessageBus Inst => _instance;
    
        private readonly Subject<object> _messages = new();
        public IObservable<T> Subscribe<T>() => _messages.OfType<T>();
        public void Send(object message) => _messages.OnNext(message);
    }
    
    3 条回复    2024-11-29 19:41:30 +08:00
    hez2010
        1
    hez2010  
       25 天前
    没有看到哪里有内存泄露的风险。
    调用 Subscribe 的方法所在的对象如果被回收了那对 _messages 的引用也就自动没了,除非你是在哪个具有 static 生命周期的对象中调用了 Subscribe 。
    coder001
        2
    coder001  
       22 天前
    进程内队列? 为啥不用 Channel

    https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.channels.channel?view=net-8.0


    另外,如果是事件总线,可以考虑引入泛型之类的花样类型匹配订阅筛选器

    这是自用的事件总线实现,目前大规模用在工作生产环境和玩具项目,未发现明显性能瓶颈


    IEventBus.cs

    ```
    public interface IEventBus
    {
    bool Subscript<T>(Action<T> callBack);

    bool UnSubscript<T>(Action<T> callBack);

    bool Publish<T>();

    bool Publish<T>(T obj);
    }
    ```

    AnyPublishEvent.cs

    ```
    /// <summary>
    /// 任何事件发布,用于统计或通配
    /// </summary>
    [DisplayName("*")]
    public record AnyPublishEvent(Type Type, object? Obj);
    ```

    InProcessEventBusBase.cs

    ```
    public abstract class InProcessEventBusBase(ILogger<InProcessEventBusBase> logger) : IEventBus
    {
    private readonly Dictionary<Type, HashSet<Delegate>> _dicTypeToHandlers = [];

    public bool Subscript<T>(Action<T> callBack)
    {
    var type = typeof(T);
    lock (_dicTypeToHandlers)
    {
    if (!_dicTypeToHandlers.TryGetValue(type, out var handlers))
    {
    handlers = _dicTypeToHandlers[type] = [];
    }

    return handlers.Add(callBack); // 忽略重复
    }
    }

    public bool UnSubscript<T>(Action<T> callBack)
    {
    lock (_dicTypeToHandlers)
    {
    if (_dicTypeToHandlers.TryGetValue(typeof(T), out var handlers))
    {
    var unSubscript = handlers.Remove(callBack);

    if (handlers.Count == 0) _dicTypeToHandlers.Remove(typeof(T));

    return unSubscript;
    }

    return false;
    }
    }

    public bool Publish<T>()
    {
    PublishInternal(new AnyPublishEvent(typeof(T), default));
    return PublishInternal<T?>(default);
    }

    public bool Publish<T>(T obj)
    {
    PublishInternal(new AnyPublishEvent(typeof(T), obj));
    return PublishInternal(obj);
    }

    private bool PublishInternal<T>(T eventValue)
    {
    var type = typeof(T);

    Delegate[] subscripts;
    lock (_dicTypeToHandlers)
    {
    if (!_dicTypeToHandlers.TryGetValue(type, out var handlers)) return false;
    subscripts = [.. handlers];
    }

    foreach (var del in subscripts)
    {
    try
    {
    ((Action<T>)del)(eventValue);
    }
    catch (Exception e)
    {
    logger.LogError(e, nameof(Publish));
    }
    }

    return true;
    }
    }
    ```
    coder001
        3
    coder001  
       22 天前
    (看来似乎回帖没有代码格式支持,而且 gist 连接展开的特性似乎也没了,凑合看吧🌚)
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1261 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 17:48 · PVG 01:48 · LAX 09:48 · JFK 12:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.