git地址在這里:
https://github.com/Lazyshot/go-hbase
這是一個使用go操作hbase的行為。
分析scan行為
如何使用scan看下面這個例子,偽代碼如下:
func scan(phone string, start time.Time, end time.Time) ([]Loc, error) {
...
client := hbase.NewClient(zks, "/hbase")
client.SetLogLevel("DEBUG")
scan := client.Scan(table)
scan.StartRow = []byte(phone + strconv.Itoa(int(end.Unix())))
scan.StopRow = []byte(phone + strconv.Itoa(int(start.Unix())))
var locs []Loc
scan.Map(func(ret *hbase.ResultRow) {
var loc Loc
for _, v := range ret.Columns {
switch v.ColumnName {
case "lbs:phone":
loc.Phone = v.Value.String()
case "lbs:lat":
loc.Lat = v.Value.String()
...
}
}
locs = append(locs, loc)
})
return locs, nil
}
首先是NewClient, 返回的結構是hbase.Client, 這個結構代表的是與hbase服務端交互的客戶端實體。
這里沒有什么好看的,倒是有一點要注意,在NewClient的時候,里面的zkRootReginPath是寫死的,就是說hbase在zk中的地址是固定的。當然這個也是默認的。
func NewClient(zkHosts []string, zkRoot string) *Client {
cl := &Client{
zkHosts: zkHosts,
zkRoot: zkRoot,
zkRootRegionPath: "/meta-region-server",
servers: make(map[string]*connection),
cachedRegionLocations: make(map[string]map[string]*regionInfo),
prefetched: make(map[string]bool),
maxRetries: max_action_retries,
}
cl.initZk()
return cl
}
下面是client.Scan
client.Scan
返回的是
func newScan(table []byte, client *Client) *Scan {
return &Scan{
client: client,
table: table,
nextStartRow: nil,
families: make([][]byte, 0),
qualifiers: make([][][]byte, 0),
numCached: 100,
closed: false,
timeRange: nil,
}
}
scan結構:
type Scan struct {
client *Client
id uint64
table []byte
StartRow []byte
StopRow []byte
families [][]byte
qualifiers [][][]byte
nextStartRow []byte
numCached int
closed bool
//for filters
timeRange *TimeRange
location *regionInfo
server *connection
}
設置了開始位置,結束位置,就可以進行Map操作了。
func (s *Scan) Map(f func(*ResultRow)) {
for {
results := s.next()
if results == nil {
break
}
for _, v := range results {
f(v)
if s.closed {
return
}
}
}
}
這個map的參數是一個函數f,沒有返回值。框架的行為就是一個大循環,不斷調用s.next(),注意,這里s.next返回回來的result可能是由多條,然后把這個多條數據每條進行一次實際的函數調用。結束循環有兩個方法,一個是next中再也取不到數據(數據已經取完了)。還有一個是s.closed唄設置為true。
s.next()
func (s *Scan) next() []*ResultRow {
startRow := s.nextStartRow
if startRow == nil {
startRow = s.StartRow
}
return s.getData(startRow)
}
這里其實是把startRow不斷往前推進,但是每次從startRow獲取多少數據呢?需要看getData
getData
最核心的流程如下:
func (s *Scan) getData(nextStart []byte) []*ResultRow {
...
server, location := s.getServerAndLocation(s.table, nextStart)
req := &proto.ScanRequest{
Region: &proto.RegionSpecifier{
Type: proto.RegionSpecifier_REGION_NAME.Enum(),
Value: []byte(location.name),
},
NumberOfRows: pb.Uint32(uint32(s.numCached)),
Scan: &proto.Scan{},
}
...
cl := newCall(req)
server.call(cl)
...
select {
case msg := <-cl.responseCh:
return s.processResponse(msg)
}
}
這里看到有一個s.numCached, 我們猜測這個是用來指定一次call請求調用回多少條數據的。
看call函數
func newCall(request pb.Message) *call {
var responseBuffer pb.Message
var methodName string
switch request.(type) {
...
case *proto.ScanRequest:
responseBuffer = &proto.ScanResponse{}
methodName = "Scan"
...
}
return &call{
methodName: methodName,
request: request,
responseBuffer: responseBuffer,
responseCh: make(chan pb.Message, 1),
}
}
type call struct {
id uint32
methodName string
request pb.Message
responseBuffer pb.Message
responseCh chan pb.Message
}
可以看出,這個call是一個有responseBuffer的實際調用者。
下面看server.Call
至於這里的server, 我們不看代碼流程了,只需要知道最后他返回的是connection這么個結構
type connection struct {
connstr string
id int
name string
socket net.Conn
in *inputStream
calls map[int]*call
callId *atomicCounter
isMaster bool
}
創建是使用函數newConnection調用
func newConnection(connstr string, isMaster bool) (*connection, error) {
id := connectionIds.IncrAndGet()
log.Debug("Connecting to server[id=%d] [%s]", id, connstr)
socket, err := net.Dial("tcp", connstr)
if err != nil {
return nil, err
}
c := &connection{
connstr: connstr,
id: id,
name: fmt.Sprintf("connection(%s) id: %d", connstr, id),
socket: socket,
in: newInputStream(socket),
calls: make(map[int]*call),
callId: newAtomicCounter(),
isMaster: isMaster,
}
err = c.init()
if err != nil {
return nil, err
}
log.Debug("Initiated connection [id=%d] [%s]", id, connstr)
return c, nil
}
好,那么實際上就是調用connection.call(request *call)
func (c *connection) call(request *call) error {
id := c.callId.IncrAndGet()
rh := &proto.RequestHeader{
CallId: pb.Uint32(uint32(id)),
MethodName: pb.String(request.methodName),
RequestParam: pb.Bool(true),
}
request.setid(uint32(id))
bfrh := newOutputBuffer()
err := bfrh.WritePBMessage(rh)
...
bfr := newOutputBuffer()
err = bfr.WritePBMessage(request.request)
...
buf := newOutputBuffer()
buf.writeDelimitedBuffers(bfrh, bfr)
c.calls[id] = request
n, err := c.socket.Write(buf.Bytes())
...
}
邏輯就是先把requestHeader壓入,再壓入request.request
call只是完成了請求轉換成byte傳輸到hbase服務端,在什么地方進行消息回收呢?
回到NewConnection的方法,里面有個connection.init()
func (c *connection) init() error {
err := c.writeHead()
if err != nil {
return err
}
err = c.writeConnectionHeader()
if err != nil {
return err
}
go c.processMessages()
return nil
}
這里go c.processMessage()
func (c *connection) processMessages() {
for {
msgs := c.in.processData()
if msgs == nil || len(msgs) == 0 || len(msgs[0]) == 0 {
continue
}
var rh proto.ResponseHeader
err := pb.Unmarshal(msgs[0], &rh)
if err != nil {
panic(err)
}
callId := rh.GetCallId()
call, ok := c.calls[int(callId)]
delete(c.calls, int(callId))
exception := rh.GetException()
if exception != nil {
call.complete(fmt.Errorf("Exception returned: %s\n%s", exception.GetExceptionClassName(), exception.GetStackTrace()), nil)
} else if len(msgs) == 2 {
call.complete(nil, msgs[1])
}
}
}
這里將它簡化下:
func (c *connection) processMessages() {
for {
msgs := c.in.processData()
call.complete(nil, msgs[1])
}
}
c.in.processData
是在input_stream.go中
func (in *inputStream) processData() [][]byte {
nBytesExpecting, err := in.readInt32()
...
if nBytesExpecting > 0 {
buf, err := in.readN(nBytesExpecting)
if err != nil && err == io.EOF {
panic("Unexpected closed socket")
}
payloads := in.processMessage(buf)
if len(payloads) > 0 {
return payloads
}
}
return nil
}
先讀取出一個int值,這個int值判斷后面還有多少個bytes,再將后面的bytes讀取進入到buf中,進行input_stream的processMessage處理。
我們這里還看到並沒有執行我們map中定義的匿名方法。只是把消息解析出來了而已。
call.complete
func (c *call) complete(err error, response []byte) {
...
err2 := pb.Unmarshal(response, c.responseBuffer)
...
c.responseCh <- c.responseBuffer
}
這個函數有用的也就這兩句話把responseBuffer里面的內容通過管道傳遞給responseCh
這里就看到getData的時候,被堵塞的地方
select {
case msg := <-cl.responseCh:
return s.processResponse(msg)
}
那么這里就有把獲取到的responseCh的消息進行processResponse處理。
func (s *Scan) processResponse(response pb.Message) []*ResultRow {
...
results := res.GetResults()
n := len(results)
...
s.closeScan(s.server, s.location, s.id)
...
tbr := make([]*ResultRow, n)
for i, v := range results {
tbr[i] = newResultRow(v)
}
return tbr
}
這個函數並沒有什么特別的行為,只是進行ResultRow的組裝。
好吧,這個包有個地方可以優化,這個go-hbase的scan的時候,numCached默認是100,這個對於hbase來說太小了,完全可以調整大點,到2000~10000之間,你會發現scan的性能提升杠杠的。