項目需要C++代碼與flume對接,進而將日志寫入HDFS。
flume原生為java代碼,原先的解決方案是通過JNI調用flume java方法。
但是由於一來對jni的調用效率的擔心,二來C++調用JNI需要照顧local reference和GC的問題,被搞得頭痛了。
一怒之下,重寫代碼了,使用C++與遠端的JAVA Flume對接。
在協議的選擇上,AVRO C++雖然也有apache的開源項目,但是目前只支持讀寫文件,而不能使用RPC。
故使用了thrift與遠端Flume thrift source通信。
以下是一些實現的具體方法:
Flume thrift 協議准備:
1. 安裝thrift,安裝flume。
2. 下載flume的source包,解壓。
3. 在解壓目錄flume-ng-sdk/src/main/thrift下,存在文件flume.thrift。
這是flume的thrift協議規則文件,執行命令:thrift -r --gen cpp ./flume.thrift
在./gen-cpp/目錄下會生成一些.h .cpp文件,之后C++代碼中需要調用其中的方法進行RPC通信。
注意:如果是在windows/visual studio下開發,由於ERROR與關鍵詞沖突,需要對flume.thrift修改。
enum Status {
OK,
FAILED,
FERROR, //原先是ERROR,修改為FERROR。
UNKNOWN
}
如果是linux下工作,則無需改動任何代碼。
C++ 代碼實現:
1. include一些頭文件,以及使用thrift的名字空間
1 #include <thrift/protocol/TBinaryProtocol.h> 2 #include <thrift/protocol/TCompactProtocol.h> 3 #include <thrift/transport/TSocket.h> 4 #include <thrift/transport/TTransportUtils.h> 5 6 using namespace std; 7 using namespace apache::thrift; 8 using namespace apache::thrift::protocol; 9 using namespace apache::thrift::transport;
2. 創建thrift的socket, transport, protocol, client.
1 class ThriftClient{ 2 private: 3 /* Thrift protocol needings... */ 4 boost::shared_ptr<TTransport> socket; 5 boost::shared_ptr<TTransport> transport; 6 boost::shared_ptr<TProtocol> protocol; 7 ThriftSourceProtocolClient* pClient; 8 }
1 ThriftClient::ThriftClient(std::string inIpAddress, std::string inPort): 2 socket(new TSocket(inIpAddress.c_str(), atoi(inPort.c_str()))), 3 transport(new TFramedTransport(socket)), 4 protocol(new TCompactProtocol(transport)) 5 { 6 pClient = new ThriftSourceProtocolClient(protocol); 7 }
3. 與遠端flume thrift source通信:
1 bool ThriftClient::sendEvent(const Event* event) 2 { 3 //build the head 4 std::map<std::string, std::string> headers; 5 std::ostringstream timeBuffer; 6 timeBuffer << event->timestamp <<"000"; 7 headers.insert(std::make_pair("timestamp", timeBuffer.str())); 8 headers.insert(std::make_pair("appId", appId)); 9 //build the body 10 std::ostringstream osBody; 11 osBody << *event; 12 std::string sBody = osBody.str(); 13 14 ThriftFlumeEvent tfEvent; 15 tfEvent.__set_body(sBody); 16 tfEvent.__set_headers(headers); 17 if(!transport->isOpen()) 18 { 19 transport->open(); 20 } 21 Status::type res=pClient->append(tfEvent); 22 if(res == Status::OK) 23 { 24 return true; 25 } 26 else 27 { 28 printf("WARNING: send event via thrift failed, return code:%d\n",res); 29 return false; 30 } 31 }
其他注意點:
1. append方法用來發送一條event:
Status::type ThriftSourceProtocolClient::append(const ThriftFlumeEvent& event)
同時還有一個方法appendBatch用來一次發送多個event:
Status::type ThriftSourceProtocolClient::appendBatch(const std::vector<ThriftFlumeEvent> & events)
2. 無論是append還是appendBatch方法,都是阻塞方法。
3. 可能可以通過send_append,send_appendBatch來發送無需確認成功的event(未測試)。
void ThriftSourceProtocolClient::send_append(const ThriftFlumeEvent& event)
void ThriftSourceProtocolClient::send_appendBatch(const std::vector<ThriftFlumeEvent> & events)
4. Thrift只提供協議RPC功能,並沒有提供flume的channel功能,以及多個source的情況下的load balance功能。這些都需要自己實現。
關於性能:
測試環境: vmware+ubuntu,i3-4150 CPU, 配置1G內存,雙核CPU。
在本機配置兩個flume thrift source(load balance),網絡回環。
在每條event大約50字符的情況下,可以達到16000條每秒的吞吐量,此時CPU被耗盡(兩個flume thrift source大約占用30%CPU)。
