//Server: TProtocolFactory ProtocolFactory = new TBinaryProtocol.Factory(true, true); TTransportFactory TransportFactory = new TFramedTransport.Factory(); serverTransport = new TServerSocket(this._port, 0, false); ThriftService.Processor processor = new ThriftService.Processor(new ThriftServiceImpl(_frm)); TMultiplexedProcessor multiplex = new TMultiplexedProcessor(); multiplex.RegisterProcessor("RTDListenerService", processor); server = new TThreadPoolServer(processor, serverTransport, TransportFactory, ProtocolFactory); //Client: TTransport trans = new TSocket("10.232.158.49", 8083); TProtocol Protocol = new TBinaryProtocol(trans, true, true); TMultiplexedProtocol multiplex = new TMultiplexedProtocol(Protocol, "RTDListenerService"); ThriftService.Iface client = new ThriftService.Client(multiplex); client.RquestThriftNoReply(null);
/* * Thrift的RPC調用的一個完整流程: * 首先是通過Thrift的編譯器生成的客戶端,將調用信息(方法名,參數信息)以指定的協議進行封裝, * 而傳輸層TTransport是對協議層的封裝進行處理(比如封裝成幀frame),並通過網絡發送出去。 * 服務端這邊流程跟客戶端相反,收到客戶端發過來的數據后,首先經過傳輸層對傳過來的數據進行處理, * 然后使用特定的協議(跟客戶端是一一對應的)進行解析,然后再通過生成的Processor調用用戶編寫的代碼, * 如果有返回值的話,返回值以逆向的順序,即通過協議層封裝,然后傳輸層處理對數據進行發送, * 到了客戶端那邊就是對服務端返回的數據進行處理,使用特定協議進行解析,然后得到一個調用個的結果。 * * 基本類型 * bool:布爾值(true或者false) * byte:8位的有符號字節(byte類型) * i16:16位的有符號整數(short類型) * i32:32位的有符號整數(int類型) * i64:64位的有符號長整型(long類型) * double:一個64位的浮點數(double類型) * string: 一個utf8編碼的字符串文本(String) * * 集合類型 * list:一個有序的元素列表。元素可以重復。 * set:一個無序的元素集合,集合中元素不能重復。 * map:一個鍵值對的數據結構,相當於Java中的HashMap。 * * 異常類型Exceptions * Thrift的異常類型,除了是繼承於靜態異常基類以外,其他的跟struct是類似的。表示的是一個異常對象。 * * 服務類型Services Server: 4.服務層, 整合上述組件, 提供網絡模型(單線程/多線程/事件驅動), 最終形成真正的服務. * Thrift 的service類型相當於定義一個面向對象編程的一個接口。Thrift的編譯器會根據這個接口定義來生成服務端和客戶端的接口實現代碼。 * * Thrift的傳輸格式(協議層)1.protocol: 協議層, 定義數據傳輸格式,可以為二進制或者XML等 * TBinaryProtocol: 二進制格式。效率顯然高於文本格式 * TCompactProtocol:壓縮格式。在二進制基礎上進一步壓縮。 * TJSONProtocol:JSON格式。 * TSimpleJSONProtocol:提供JSON只寫協議(缺少元數據信息),生成的文件很容易用過腳本語言解析。 * TDebugProtocol:使用易懂的刻度文本格式,以便於調試。 * * Thrift的數據傳輸方式(傳輸層) 2. Transport: 傳輸層,定義數據傳輸方式,可以為TCP/IP傳輸,內存共享或者文件共享等 * TSocket:阻塞式socket。 * TFramedTransport:以frame為單位進行傳輸,非阻塞式服務中使用。 * TFileTransport:以文件形式進行傳輸。 * TMemoryTransport:將內存用於I/O,Java是現實內部實際使用了簡單的ByteArrayOutputStream。 * TZlibTransport:使用zlib進行壓縮,與其他傳輸方式聯合使用。當前無java實現。 * * Thrift的服務模型 Processor: 3. 處理層, 這部分由定義的idl來生成, 封裝了協議輸入輸出流, 並委托給用戶實現的handler進行處理. * TSimpleServer: 簡單的單線程服務模型,常用於測試。只在一個單獨的線程中以阻塞I/O的方式來提供服務。所以它只能服務一個客戶端連接,其他所有客戶端在被服務器端接受之前都只能等待。 * TNonblockingServer: 它使用了非阻塞式I/O,使用了java.nio.channels.Selector,通過調用select(),它使得程序阻塞在多個連接上,而不是單一的一個連接上。TNonblockingServer處理這些連接的時候,要么接受它,要么從它那讀數據,要么把數據寫到它那里,然后再次調用select()來等待下一個准備好的可用的連接。通用這種方式,server可同時服務多個客戶端,而不會出現一個客戶端把其他客戶端全部“餓死”的情況。缺點是所有消息是被調用select()方法的同一個線程處理的,服務端同一時間只會處理一個消息,並沒有實現並行處理。 * THsHaServer:(半同步半異步server) 針對TNonblockingServer存在的問題,THsHaServer應運而生。它使用一個單獨的線程專門負責I/O,同樣使用java.nio.channels.Selector,通過調用select()。然后再利用一個獨立的worker線程池來處理消息。只要有空閑的worker線程,消息就會被立即處理,因此多條消息能被並行處理。效率進一步得到了提高。 * TThreadedSelectorServer: 它與THsHaServer的主要區別在於,TThreadedSelectorServer允許你用多個線程來處理網絡I/O。它維護了兩個線程池,一個用來處理網絡I/O,另一個用來進行請求的處理。 * TThreadPoolServer: 它使用的是一種多線程服務模型,使用標准的阻塞式I/O。它會使用一個單獨的線程來接收連接。一旦接受了一個連接,它就會被放入ThreadPoolExecutor中的一個worker線程里處理。worker線程被綁定到特定的客戶端連接上,直到它關閉。一旦連接關閉,該worker線程就又回到了線程池中。 這意味着,如果有1萬個並發的客戶端連接,你就需要運行1萬個線程。所以它對系統資源的消耗不像其他類型的server一樣那么“友好”。此外,如果客戶端數量超過了線程池中的最大線程數,在有一個worker線程可用之前,請求將被一直阻塞在那里。 如果提前知道了將要連接到服務器上的客戶端數量,並且不介意運行大量線程的話,TThreadPoolServer可能是個很好的選擇 * * cmd命令: * thrift-0.9.1.exe -help * thrift-0.9.1.exe -r -gen java data.thrift * thrift-0.9.1.exe -r -gen py data.thrift * thrift-0.9.1.exe -r -gen csharp data.thrift */ using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Threading; using System.Threading.Tasks; using System.Collections; using Thrift.Transport; using Thrift.Server; using Thrift.Protocol; namespace TestThriftProject { public partial class FrmTest : Form { public FrmTest() { InitializeComponent(); } private void button1_Click(object sender, EventArgs e) { if (textBox1.Text != null) { TTransport transport = new TSocket("127.0.0.1", 8090, 3000); TProtocol protocol = new TBinaryProtocol(transport);//協議要和服務端一致 HelloWorldService.Client client = new HelloWorldService.Client(protocol); try { transport.Open(); String result = client.SayHello(textBox1.Text.Trim()); Console.WriteLine("Thrift client result =: " + result); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } finally { if (transport != null) { transport.Close(); } } } } private void FrmTest_Load(object sender, EventArgs e) { new Task(new Action(() => { try { TServerSocket serverTransport = new TServerSocket(8090, 0, false); HelloWorldService.Processor processor = new HelloWorldService.Processor(new HelloWroldImpl()); TServer server = new TSimpleServer(processor, serverTransport); server.Serve(); } catch (Exception ex) { Console.WriteLine(ex.Message); Console.WriteLine(ex.StackTrace); } })).Start(); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace TestThriftProject { public class HelloWroldImpl : HelloWorldService.Iface { public string SayHello(string username) { return string.Format("{0:yyyy/MM/dd hh:mm:ss} hello: {1}", DateTime.Now, username); } } }