C++ Thrift Client 與 Flume Thrift Source 對接


項目需要C++代碼與flume對接,進而將日志寫入HDFS。
flume原生為java代碼,原先的解決方案是通過JNI調用flume java方法。
但是由於一來對jni的調用效率的擔心,二來C++調用JNI需要照顧local reference和GC的問題,被搞得頭痛了。
一怒之下,重寫代碼了,使用C++與遠端的JAVA Flume對接。

在協議的選擇上,AVRO C++雖然也有apache的開源項目,但是目前只支持讀寫文件,而不能使用RPC。
故使用了thrift與遠端Flume thrift source通信。
以下是一些實現的具體方法:

 

Flume thrift 協議准備:

1. 安裝thrift,安裝flume。
2. 下載flume的source包,解壓。
3. 在解壓目錄flume-ng-sdk/src/main/thrift下,存在文件flume.thrift。
    這是flume的thrift協議規則文件,執行命令:thrift -r --gen cpp ./flume.thrift
    在./gen-cpp/目錄下會生成一些.h .cpp文件,之后C++代碼中需要調用其中的方法進行RPC通信。

    注意:如果是在windows/visual studio下開發,由於ERROR與關鍵詞沖突,需要對flume.thrift修改。

    enum Status {
    OK,
    FAILED,
    FERROR, //原先是ERROR,修改為FERROR。
    UNKNOWN
    }

    如果是linux下工作,則無需改動任何代碼。

 

C++ 代碼實現:

1. include一些頭文件,以及使用thrift的名字空間

1 #include <thrift/protocol/TBinaryProtocol.h>
2 #include <thrift/protocol/TCompactProtocol.h>
3 #include <thrift/transport/TSocket.h>
4 #include <thrift/transport/TTransportUtils.h>
5 
6 using namespace std;
7 using namespace apache::thrift;
8 using namespace apache::thrift::protocol;
9 using namespace apache::thrift::transport;

2. 創建thrift的socket, transport, protocol, client.

1 class ThriftClient{
2     private:
3     /* Thrift protocol needings... */
4     boost::shared_ptr<TTransport> socket;
5     boost::shared_ptr<TTransport> transport;
6     boost::shared_ptr<TProtocol> protocol;
7     ThriftSourceProtocolClient* pClient;
8 }
1     ThriftClient::ThriftClient(std::string inIpAddress, std::string inPort):
2         socket(new TSocket(inIpAddress.c_str(), atoi(inPort.c_str()))),
3         transport(new TFramedTransport(socket)),
4         protocol(new TCompactProtocol(transport))
5     {
6         pClient = new ThriftSourceProtocolClient(protocol);
7     }

 

3. 與遠端flume thrift source通信: 

 1       bool ThriftClient::sendEvent(const Event* event)
 2       {
 3           //build the head
 4           std::map<std::string, std::string>  headers;
 5           std::ostringstream timeBuffer;
 6           timeBuffer << event->timestamp <<"000";
 7           headers.insert(std::make_pair("timestamp", timeBuffer.str()));
 8           headers.insert(std::make_pair("appId", appId));
 9           //build the body
10           std::ostringstream  osBody;
11           osBody << *event;
12           std::string sBody = osBody.str();
13  
14           ThriftFlumeEvent tfEvent;
15           tfEvent.__set_body(sBody);
16           tfEvent.__set_headers(headers);
17           if(!transport->isOpen())
18           {
19               transport->open();
20           }
21           Status::type res=pClient->append(tfEvent);
22           if(res == Status::OK)
23           {
24               return true;
25           }
26           else
27           {
28               printf("WARNING: send event via thrift failed, return code:%d\n",res);
29               return false;
30           }
31       }

 

其他注意點:

1. append方法用來發送一條event:

Status::type ThriftSourceProtocolClient::append(const ThriftFlumeEvent& event)

  同時還有一個方法appendBatch用來一次發送多個event:

Status::type ThriftSourceProtocolClient::appendBatch(const std::vector<ThriftFlumeEvent> & events)

 

2. 無論是append還是appendBatch方法,都是阻塞方法。

3. 可能可以通過send_append,send_appendBatch來發送無需確認成功的event(未測試)。

void ThriftSourceProtocolClient::send_append(const ThriftFlumeEvent& event)
void ThriftSourceProtocolClient::send_appendBatch(const std::vector<ThriftFlumeEvent> & events)

4. Thrift只提供協議RPC功能,並沒有提供flume的channel功能,以及多個source的情況下的load balance功能。這些都需要自己實現。

 

關於性能:

測試環境: vmware+ubuntu,i3-4150 CPU, 配置1G內存,雙核CPU。

在本機配置兩個flume thrift source(load balance),網絡回環。

在每條event大約50字符的情況下,可以達到16000條每秒的吞吐量,此時CPU被耗盡(兩個flume thrift source大約占用30%CPU)。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM