之前發過一篇帖子 應用.Net+Consul維護RabbitMq的高可用性,然后最近老大問我當初我這么搞是抽的什么想法- -然后順便貼了兩行C#代碼:
var factory = new ConnectionFactory() { UserName = "username", Password = "password", AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; Connection = factory.CreateConnection(new string[3] { "ip1", "ip2", "ip3" });
AutomaticRecoveryEnabled的作用就是斷線重連,假如當前的連接斷開了,連接不會釋放
TopologyRecoveryEnabled 重連后恢復當前的工作進程,比如channel、queue、發布的消息進度等。
看上去其實卻是比較方便的,為了狡辯我當場列出了我設計的方案的主要優點:
1.rabbitmq網關設計的時候一方面是為了讓客戶端能夠保證建立與master節點的連接,直連master性能較高。
2.通過配置隊列名+VirthHost可以獲取隊列信息,Consul可以起到配置中心的作用,可配置性高一些(其實此處是狡辯。。)
3.RabbitMQ與Master隊列建立的連接在發生故障時會第一時間查詢網關獲取新的Mater隊列信息進行重連(當然代碼內部也有重試機制,不會隨便就更換連接的),相比於AutomaticRecoveryEnabled效率更高一些。
4.客戶端SDK內部實現定時更新隊列連接,發現Master節點更換時重新建立連接
看上去我設計的方案還是有點意義的(其實是真懶得改代碼)
不過此處有個問題就是既然創建連接時的參數可以提供多個IP的集合,假如RabbitMQ提供的客戶端SDK內部實現的更好,那我狡辯什么不也完戲了嗎。。。於是假裝下載了下客戶端sdk代碼掃了掃,此處以C#代碼為例,代碼還是比較簡單的。github地址
直接定位ConnectionFactory類找CreateConnection()方法

/// <summary> /// Create a connection using a list of hostnames using the configured port. /// By default each hostname is tried in a random order until a successful connection is /// found or the list is exhausted using the DefaultEndpointResolver. /// The selection behaviour can be overriden by configuring the EndpointResolverFactory. /// </summary> /// <param name="hostnames"> /// List of hostnames to use for the initial /// connection and recovery. /// </param> /// <returns>Open connection</returns> /// <exception cref="BrokerUnreachableException"> /// When no hostname was reachable. /// </exception> public IConnection CreateConnection(IList<string> hostnames) { return CreateConnection(hostnames, null); } /// <summary> /// Create a connection using a list of hostnames using the configured port. /// By default each endpoint is tried in a random order until a successful connection is /// found or the list is exhausted. /// The selection behaviour can be overriden by configuring the EndpointResolverFactory. /// </summary> /// <param name="hostnames"> /// List of hostnames to use for the initial /// connection and recovery. /// </param> /// <param name="clientProvidedName"> /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot /// be used as a connection identifier, e.g. in HTTP API requests. /// This value is supposed to be human-readable. /// </param> /// <returns>Open connection</returns> /// <exception cref="BrokerUnreachableException"> /// When no hostname was reachable. /// </exception> public IConnection CreateConnection(IList<string> hostnames, String clientProvidedName) { var endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, this.Port, this.Ssl)); return CreateConnection(new DefaultEndpointResolver(endpoints), clientProvidedName); } /// <summary> /// Create a connection using a list of endpoints. By default each endpoint will be tried /// in a random order until a successful connection is found or the list is exhausted. /// The selection behaviour can be overriden by configuring the EndpointResolverFactory. /// </summary> /// <param name="endpoints"> /// List of endpoints to use for the initial /// connection and recovery. /// </param> /// <returns>Open connection</returns> /// <exception cref="BrokerUnreachableException"> /// When no hostname was reachable. /// </exception> public IConnection CreateConnection(IList<AmqpTcpEndpoint> endpoints) { return CreateConnection(new DefaultEndpointResolver(endpoints), null); } /// <summary> /// Create a connection using an IEndpointResolver. /// </summary> /// <param name="endpointResolver"> /// The endpointResolver that returns the endpoints to use for the connection attempt. /// </param> /// <param name="clientProvidedName"> /// Application-specific connection name, will be displayed in the management UI /// if RabbitMQ server supports it. This value doesn't have to be unique and cannot /// be used as a connection identifier, e.g. in HTTP API requests. /// This value is supposed to be human-readable. /// </param> /// <returns>Open connection</returns> /// <exception cref="BrokerUnreachableException"> /// When no hostname was reachable. /// </exception> public IConnection CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName) { IConnection conn; try { if (AutomaticRecoveryEnabled) { var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName); autorecoveringConnection.Init(endpointResolver); conn = autorecoveringConnection; } else { IProtocol protocol = Protocols.DefaultProtocol; conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName); } } catch (Exception e) { throw new BrokerUnreachableException(e); } return conn; }
代碼比較直觀,第二個方法的時候把傳入的字符串集合轉換成了AmqpTcpEndpoint集合,AmqpTcpEndpoint里包含隊列綁定信息,包含默認ip,端口,SSL配置信息,RabbitMQ的Amqp協議信息,初始化Socket連接的協議類型(一個AddressFamily枚舉),當然都是可配置的(除了ip貌似都喜歡使用默認的)。
然后就倒主要部分了。
if (AutomaticRecoveryEnabled) { var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName); autorecoveringConnection.Init(endpointResolver); conn = autorecoveringConnection; } else { IProtocol protocol = Protocols.DefaultProtocol; conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName); }
由於主要想看看傳入多個連接是不是可以智能的選擇master連接,所以此處直接看else里的代碼就行了。。。(其實一樣。。)
protocol.CreateConnection方法調用時傳入的第三個參數endpointResolver.SelectOne(this.CreateFrameHandler)作用其實就是選擇連接然后扔到CreateFrameHandler委托里作為參數進行Socket初始化的。看一下SelectOne這個擴展方法即可。。看了兩行終於可以繼續狡辯了。。
public static T SelectOne<T>(this IEndpointResolver resolver, Func<AmqpTcpEndpoint, T> selector) { var t = default(T); Exception exception = null; foreach(var ep in resolver.All()) { try { t = selector(ep); if(t.Equals(default(T)) == false) { return t; } } catch (Exception e) { exception = e; } } if(Object.Equals(t, default(T)) && exception != null) { throw exception; } return t; }
這個foreach還是“比較好的”,能夠建立一個Socket連接就愉快的返回了!,不過這個resolver.All()里是不是還有玄機呢!看了一眼。。終於放心了。。

public class DefaultEndpointResolver : IEndpointResolver { private List<AmqpTcpEndpoint> endpoints; private Random rnd = new Random(); public DefaultEndpointResolver (IEnumerable<AmqpTcpEndpoint> tcpEndpoints) { this.endpoints = tcpEndpoints.ToList(); } public IEnumerable<AmqpTcpEndpoint> All() { return endpoints.OrderBy(item => rnd.Next()); }
就是隨機排序一下。。不過這么看自己實現下IEndpointResolver接口改個選擇master隊列的策略也是不錯的。