第二篇 連接池
連接池配置,請前往Thrift搭建分布式微服務(一)
下面要介紹的其實不是單一的連接池,應該說是連接池集合。因為它要管理多個Tcp Socket連接節點,每個服務節點都有設置了自己的最大激活連接數、最大空閑連接數、最小空閑連接數、等待連接時間。
1 internal class ServiceTransportPool 2 { 3 public ServiceConfig ServiceConfig { get; set; } 4 5 public ConcurrentStack<TTransport> TransportPool { get; set; } 6 7 public AutoResetEvent ResetEvent { get; set; } 8 9 public int ActivedTransportCount { get; set; }
internal object sync_obeject = new object();
10 }
一個ServiceTransportPool類對應一個服務配置,一個服務配置對應一個服務節點。連接池集合應具有下列成員:
1 internal class ThriftFactory 2 { 3 private static volatile List<ServiceTransportPool> transport_pools; 4 5 private static object sync_obj = new object(); 6 7 private static IThriftFactoryMonitor monitor = new ThriftFactoryMonitor(); 8 }
transport_pools實現了對服務節點的管理,monitor 用來監控連接池的狀態,如上一篇所講,等待連接超時怎么通知連接池管理者,就用monitor來實現。這里monitor有個默認的實現,后面再講。
初始化連接池集合:
1 static ThriftFactory() 2 { 3 if (transport_pools == null || transport_pools.Count == 0) 4 { 5 lock (sync_obj) 6 { 7 if (transport_pools == null || transport_pools.Count == 0) 8 { 9 var services = ConfigHelper.GetServiceConfigs(); 10 transport_pools = new List<ServiceTransportPool>(services.Count); 11 foreach (var service in services) 12 { 13 ServiceTransportPool stp = new ServiceTransportPool() 14 { 15 ServiceConfig = service, 16 TransportPool = new ConcurrentStack<TTransport>(), 17 ResetEvent = new AutoResetEvent(false), 18 ActivedTransportCount = 0 19 }; 20 transport_pools.Add(stp); 21 } 22 } 23 } 24 } 25 }
如何向連接池借出一個Socket連接:
1 public static TTransport BorrowInstance(string serviceName) 2 { 3 var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault(); 4 if (transpool == null) 5 { 6 throw new ThriftException(string.Format("There Is No Service Named \"{0}\"", serviceName)); 7 } 8 9 TTransport transport; 10 lock (transpool.sync_obeject) 11 { 12 if (transpool.TransportPool.Count() == 0) 13 { 14 if (transpool.ActivedTransportCount == transpool.ServiceConfig.MaxActive) 15 { 16 bool result = transpool.ResetEvent.WaitOne(transpool.ServiceConfig.WaitingTimeout); 17 if (!result) 18 { 19 monitor.TimeoutNotify(transpool.ServiceConfig.Name, transpool.ServiceConfig.WaitingTimeout); 20 } 21 } 22 else 23 { 24 transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig)); 25 } 26 } 27 28 if (!transpool.TransportPool.TryPop(out transport)) 29 { 30 throw new ThriftException("Connection Pool Exception"); 31 } 32 33 transpool.ActivedTransportCount++; 34 35 if (transpool.TransportPool.Count() < transpool.ServiceConfig.MinIdle && transpool.ActivedTransportCount < transpool.ServiceConfig.MaxActive) 36 { 37 transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig)); 38 } 39 } 40 if (!transport.IsOpen) 41 { 42 transport.Open(); 43 } 44 Monitor(); 45 return transport; 46 }
當實際激活的連接數達到服務節點配置的最大激活連接數,獲取Socket連接的請求就將處於等待狀態,超過等待時間設置,使用監視器方法monitor.TimeoutNotify()去通知管理者。連接池空閑的連接小於最小空閑連接數設置,每次請求連接都會建立一個新的連接放到池子里。
歸還連接:
1 public static void ReturnInstance(string serviceName,TTransport transport) 2 { 3 var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault(); 4 if (transpool == null) 5 { 6 throw new ThriftException("Connection Pool Exception"); 7 } 8 if (transpool.TransportPool.Count() == transpool.ServiceConfig.MaxIdle) 9 { 10 transport.Flush(); 11 if (transport.IsOpen) 12 { 13 transport.Close(); 14 } 15 transport.Dispose(); 16 } 17 else 18 { 19 lock (transpool.sync_obeject) 20 { 21 if (transport.IsOpen) 22 { 23 transport.Close(); 24 } 25 transpool.TransportPool.Push(transport); 26 transpool.ActivedTransportCount--; 27 transpool.ResetEvent.Set(); 28 } 29 } 30 Monitor(); 31 }
當連接池最大空閑連接達到了服務節點設置的最大空閑連接數時,歸還的連接將被銷毀。借出連接和歸還連接兩段代碼里都有一個Monitor()方法,此方法監控連接池連接的使用情況:
/// <summary> /// 監控連接池狀態 /// </summary> private static void Monitor() { List<Tuple<string, int, int>> tuples = new List<Tuple<string, int, int>>(transport_pools.Count); foreach(var transpool in transport_pools) { Tuple<string, int, int> tuple = new Tuple<string, int, int>(transpool.ServiceConfig.Name, transpool.TransportPool.Count(), transpool.ActivedTransportCount); tuples.Add(tuple); } monitor.Monitor(tuples); }
此方法將每個服務連接池的空閑連接數、和激活的連接數傳給前面提到的監視器。在連接等待超時和拿到連接池的運行參數時,最終進行什么動作還是由開發者去實現的。繼承下面的接口,開發者可以自定義監視器。
1 public interface IThriftFactoryMonitor 2 { 3 /// <summary> 4 /// 監控連接池運行狀態 5 /// </summary> 6 /// <param name="tuple">元組集合,第一個元素表示服務名稱、第二個元素表示空閑連接數量、第三個元素表示激活連接數量</param> 7 void Monitor(List<Tuple<string,int,int>> tuples); 8 9 /// <summary> 10 /// 等待連接超時 11 /// </summary> 12 void TimeoutNotify(string serviceName,int timeOut); 13 }
默認的監視器只是將連接池的運行狀態記錄到控制台:
1 /// <summary> 2 /// 默認連接池狀態監控類 3 /// </summary> 4 public class ThriftFactoryMonitor : IThriftFactoryMonitor 5 { 6 public virtual void Monitor(List<Tuple<string, int, int>> tuples) 7 { 8 foreach (var t in tuples) 9 { 10 Console.WriteLine(string.Format("{0}連接池,空閑連接數量:{1},激活連接數量:{2}", t.Item1, t.Item2, t.Item3)); 11 } 12 } 13 14 public virtual void TimeoutNotify(string serviceName, int timeOut) 15 { 16 Console.WriteLine(string.Format("{0}連接池等待連接超時{1}", serviceName, timeOut)); 17 } 18 }
開發者自己實現的監視器,如何能被連接池使用,其實在一開始的連接池初始化里,還有一段使用反射來初始化開發者定義的監視器的代碼。開發者只需在上一篇介紹的Thrift.config里配置MonitorType,其他的就交給框架處理:
1 static ThriftFactory() 2 { 3 ...... 4 if(!string.IsNullOrWhiteSpace(ConfigHelper.ThriftConfig.MonitorType)) 5 { 6 monitor = Invoker.CreateInstance(Type.GetType(ConfigHelper.ThriftConfig.MonitorType)) as IThriftFactoryMonitor; 7 if (monitor == null) 8 { 9 throw new ThriftException(string.Format("There Is No Monitor Implement Which Type Is \"{0}\"", ConfigHelper.ThriftConfig.MonitorType)); 10 } 11 } 12 }
通過連接池與服務節點建立了Socket連接,下一篇將介紹客戶端如何使用建立的Socket連接與服務端通信。
Thrift微服務代碼下載Thrift.Utility