Thrift搭建分布式微服務(二)


第二篇 連接池

    連接池配置,請前往Thrift搭建分布式微服務(一)

    下面要介紹的其實不是單一的連接池,應該說是連接池集合。因為它要管理多個Tcp Socket連接節點,每個服務節點都有設置了自己的最大激活連接數、最大空閑連接數、最小空閑連接數、等待連接時間。

 1     internal class ServiceTransportPool
 2     {
 3         public ServiceConfig ServiceConfig { get; set; }
 4 
 5         public ConcurrentStack<TTransport> TransportPool { get; set; }
 6 
 7         public AutoResetEvent ResetEvent { get; set; }
 8 
 9         public int ActivedTransportCount { get; set; }
     
     internal object sync_obeject = new object();
10 }

  

        一個ServiceTransportPool類對應一個服務配置,一個服務配置對應一個服務節點。連接池集合應具有下列成員:

1     internal class ThriftFactory
2     {
3         private static volatile List<ServiceTransportPool> transport_pools;
4         
5         private static object sync_obj = new object();
6 
7         private static IThriftFactoryMonitor monitor = new ThriftFactoryMonitor();
8     }

        transport_pools實現了對服務節點的管理,monitor 用來監控連接池的狀態,如上一篇所講,等待連接超時怎么通知連接池管理者,就用monitor來實現。這里monitor有個默認的實現,后面再講。

        初始化連接池集合:

 1         static ThriftFactory()
 2         {
 3             if (transport_pools == null || transport_pools.Count == 0)
 4             {
 5                 lock (sync_obj)
 6                 {
 7                     if (transport_pools == null || transport_pools.Count == 0)
 8                     {
 9                         var services = ConfigHelper.GetServiceConfigs();
10                         transport_pools = new List<ServiceTransportPool>(services.Count);
11                         foreach (var service in services)
12                         {
13                             ServiceTransportPool stp = new ServiceTransportPool()
14                             {
15                                 ServiceConfig = service,
16                                 TransportPool = new ConcurrentStack<TTransport>(),
17                                 ResetEvent = new AutoResetEvent(false),
18                                 ActivedTransportCount = 0
19                             };
20                             transport_pools.Add(stp);
21                         }
22                     }
23                 }
24             }
25        }

 

   如何向連接池借出一個Socket連接:

 1         public static TTransport BorrowInstance(string serviceName)
 2         {
 3             var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
 4             if (transpool == null)
 5             {
 6                 throw new ThriftException(string.Format("There Is No Service Named \"{0}\"", serviceName));
 7             }
 8 
 9             TTransport transport;
10             lock (transpool.sync_obeject)
11             {
12                 if (transpool.TransportPool.Count() == 0)
13                 {
14                     if (transpool.ActivedTransportCount == transpool.ServiceConfig.MaxActive)
15                     {
16                         bool result = transpool.ResetEvent.WaitOne(transpool.ServiceConfig.WaitingTimeout);
17                         if (!result)
18                         {
19                             monitor.TimeoutNotify(transpool.ServiceConfig.Name, transpool.ServiceConfig.WaitingTimeout);
20                         }
21                     }
22                     else
23                     {
24                         transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
25                     }
26                 }
27 
28                 if (!transpool.TransportPool.TryPop(out transport))
29                 {
30                     throw new ThriftException("Connection Pool Exception");
31                 }
32 
33                 transpool.ActivedTransportCount++;
34 
35                 if (transpool.TransportPool.Count() < transpool.ServiceConfig.MinIdle && transpool.ActivedTransportCount < transpool.ServiceConfig.MaxActive)
36                 {
37                     transpool.TransportPool.Push(CreateTransport(transpool.ServiceConfig));
38                 }
39             }
40             if (!transport.IsOpen)
41             {
42                 transport.Open();
43             }
44             Monitor();
45             return transport;
46         }

        當實際激活的連接數達到服務節點配置的最大激活連接數,獲取Socket連接的請求就將處於等待狀態,超過等待時間設置,使用監視器方法monitor.TimeoutNotify()去通知管理者。連接池空閑的連接小於最小空閑連接數設置,每次請求連接都會建立一個新的連接放到池子里。

        歸還連接:

 1         public static void ReturnInstance(string serviceName,TTransport transport)
 2         {
 3             var transpool = (from tp in transport_pools where tp.ServiceConfig.Name.ToUpper() == serviceName.ToUpper() select tp).FirstOrDefault();
 4             if (transpool == null)
 5             {
 6                 throw new ThriftException("Connection Pool Exception");
 7             }
 8             if (transpool.TransportPool.Count() == transpool.ServiceConfig.MaxIdle)
 9             {
10                 transport.Flush();
11                 if (transport.IsOpen)
12                 {
13                     transport.Close();
14                 }
15                 transport.Dispose();
16             }
17             else
18             {
19                 lock (transpool.sync_obeject)
20                 {
21                     if (transport.IsOpen)
22                     {
23                         transport.Close();
24                     }
25                     transpool.TransportPool.Push(transport);
26                     transpool.ActivedTransportCount--;
27                     transpool.ResetEvent.Set();
28                 }
29             }
30             Monitor();
31         }

        當連接池最大空閑連接達到了服務節點設置的最大空閑連接數時,歸還的連接將被銷毀。借出連接和歸還連接兩段代碼里都有一個Monitor()方法,此方法監控連接池連接的使用情況:

        /// <summary>
        /// 監控連接池狀態
        /// </summary>
        private static void Monitor()
        {
            List<Tuple<string, int, int>> tuples = new List<Tuple<string, int, int>>(transport_pools.Count);
            foreach(var transpool in transport_pools)
            {
                Tuple<string, int, int> tuple = new Tuple<string, int, int>(transpool.ServiceConfig.Name, transpool.TransportPool.Count(), transpool.ActivedTransportCount);
                tuples.Add(tuple);
            }
            monitor.Monitor(tuples);
        }

       此方法將每個服務連接池的空閑連接數、和激活的連接數傳給前面提到的監視器。在連接等待超時和拿到連接池的運行參數時,最終進行什么動作還是由開發者去實現的。繼承下面的接口,開發者可以自定義監視器。

 1     public interface IThriftFactoryMonitor
 2     {
 3         /// <summary>
 4         /// 監控連接池運行狀態
 5         /// </summary>
 6         /// <param name="tuple">元組集合,第一個元素表示服務名稱、第二個元素表示空閑連接數量、第三個元素表示激活連接數量</param>
 7         void Monitor(List<Tuple<string,int,int>> tuples);
 8 
 9         /// <summary>
10         /// 等待連接超時
11         /// </summary>
12         void TimeoutNotify(string serviceName,int timeOut);
13     }

       默認的監視器只是將連接池的運行狀態記錄到控制台:

 1     /// <summary>
 2     /// 默認連接池狀態監控類
 3     /// </summary>
 4     public class ThriftFactoryMonitor : IThriftFactoryMonitor
 5     {
 6         public virtual void Monitor(List<Tuple<string, int, int>> tuples)
 7         {
 8             foreach (var t in tuples)
 9             {
10                 Console.WriteLine(string.Format("{0}連接池,空閑連接數量:{1},激活連接數量:{2}", t.Item1, t.Item2, t.Item3));
11             }
12         }
13 
14         public virtual void TimeoutNotify(string serviceName, int timeOut)
15         {
16             Console.WriteLine(string.Format("{0}連接池等待連接超時{1}", serviceName, timeOut));
17         }
18     }

    開發者自己實現的監視器,如何能被連接池使用,其實在一開始的連接池初始化里,還有一段使用反射來初始化開發者定義的監視器的代碼。開發者只需在上一篇介紹的Thrift.config里配置MonitorType,其他的就交給框架處理:

 1         static ThriftFactory()
 2         {
 3             ......
 4             if(!string.IsNullOrWhiteSpace(ConfigHelper.ThriftConfig.MonitorType))
 5             {
 6                 monitor = Invoker.CreateInstance(Type.GetType(ConfigHelper.ThriftConfig.MonitorType)) as IThriftFactoryMonitor;
 7                 if (monitor == null)
 8                 {
 9                     throw new ThriftException(string.Format("There Is No Monitor Implement Which Type Is  \"{0}\"", ConfigHelper.ThriftConfig.MonitorType));
10                 }
11             }
12         }

   通過連接池與服務節點建立了Socket連接,下一篇將介紹客戶端如何使用建立的Socket連接與服務端通信。

Thrift微服務代碼下載Thrift.Utility

 

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM