hadoop streaming編程小demo(python版)


大數據團隊搞數據質量評測。自動化質檢和監控平台是用django,MR也是通過python實現的。(后來發現有orc壓縮問題,python不知道怎么解決,正在改成java版本)

這里展示一個python編寫MR的例子吧。

抄一句話:Hadoop Streaming是Hadoop提供的一個編程工具,它允許用戶使用任何可執行文件或者腳本文件作為Mapper和Reducer。

 

1、首先,先介紹一下背景,我們的數據是存放在hive里的。hive建表語句如下:

我們將會解析元數據,和HDFS上的數據進行merge,方便處理。這里的partition_key用的是year/month/day。

hive (gulfstream_ods)> desc g_order;
OK
col_name        data_type       comment
order_id                bigint                  訂單id                
driver_id               bigint                  司機id,司機搶單前該值為0      
driver_phone            string                  司機電話                
passenger_id            bigint                  乘客id                
passenger_phone         string                  乘客電話                
car_id                  int                     接駕車輛id              
area                    int                     城市id                
district                string                  城市區號                
type                    int                     訂單時效,0 實時  1預約      
current_lng             decimal(19,6)           乘客發單時的經度            
current_lat             decimal(19,6)           乘客發單時的緯度            
starting_name           string                  起點名稱                
starting_lng            decimal(19,6)           起點經度                
starting_lat            decimal(19,6)           起點緯度                
dest_name               string                  終點名稱                
dest_lng                decimal(19,6)           終點經度                
dest_lat                decimal(19,6)           終點緯度                
driver_start_distance   int                     司機與出發地的路面距離,單位:米    
start_dest_distance     int                     出發地與終點的路面距離,單位:米    
departure_time          string                  出發時間(預約單的預約時間,實時單為發單時間)
strive_time             string                  搶單成功時間              
consult_time            string                  協商時間                
arrive_time             string                  司機點擊‘我已到達’的時間       
setoncar_time           string                  上車時間(暫時不用)          
begin_charge_time       string                  司機點機‘開始計費’的時間       
finish_time             string                  完成時間                
year                    string                                      
month                   string                                      
day                     string                                      
                 
# Partition Information          
# col_name              data_type               comment             
                 
year                    string                                      
month                   string                                      
day                     string              

 

2、我們解析元數據

這里是解析元數據的過程。之后我們把元數據序列化后存入文件desc.gulfstream_ods.g_order,我們將會將此配置文件連同MR腳本一起上傳到hadoop集群。

import subprocess
from subprocess import Popen


def desc_table(db, table):
    process = Popen('hive -e "desc %s.%s"' % (db, table),
            shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    is_column = True
    structure_list = list()
    column_list = list()
    for line in stdout.split('\n'):
        value_list = list()
        if not line or len(line.split()) < 2:
            break
        if is_column:
            column_list = line.split()
            is_column = False
            continue
        else:
            value_list = line.split()
        structure_dict = dict(zip(column_list, value_list))
        structure_list.append(structure_dict)

    return structure_list

 

3、下面是hadoop streaming執行腳本。

#!/bin/bash
source /etc/profile
source ~/.bash_profile

#hadoop目錄
echo "HADOOP_HOME: "$HADOOP_HOME
HADOOP="$HADOOP_HOME/bin/hadoop"

DB=$1
TABLE=$2
YEAR=$3
MONTH=$4
DAY=$5
echo $DB--$TABLE--$YEAR--$MONTH--$DAY

if [ "$DB" = "gulfstream_ods" ]
then
DB_NAME="gulfstream"
else
DB_NAME=$DB
fi
TABLE_NAME=$TABLE

#輸入路徑
input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"
#標記文件后綴名
input_mark="_SUCCESS"
echo $input_path
#輸出路徑
output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"
output_mark="_SUCCESS"
echo $output_path
#性能約束參數
capacity_mapper=500
capacity_reducer=200
map_num=10
reducer_num=10
queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"
#啟動job name
job_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"
mapper="python mapper.py $DB $TABLE_NAME"
reducer="python reducer.py"

$HADOOP fs -rmr $output_path
$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-jobconf mapred.job.name="$job_name" \
-jobconf mapred.job.queue.name=$queue_name \
-jobconf mapred.map.tasks=$map_num \
-jobconf mapred.reduce.tasks=$reducer_num \
-jobconf mapred.map.capacity=$capacity_mapper \
-jobconf mapred.reduce.capacity=$capacity_reducer \
-input $input_path \
-output $output_path \
-file ./mapper.py \
-file ./reducer.py \
-file ./utils.py \
-file ./"desc.${DB}.${TABLE_NAME}" \
-mapper "$mapper" \
-reducer "$reducer"
if [ $? -ne 0 ]; then
echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"
fi
$HADOOP fs -touchz "${output_path}/$output_mark"
rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}

 

 4、這里是Wordcount的進階版本,第一個功能是分區域統計訂單量,第二個功能是在一天中分時段統計訂單量。

mapper腳本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')


# 將字段和元數據匹配, 返回迭代器
def read_from_input(file, separator, columns):
    for line in file:
        if line is None or line == '':
            continue
        data_list = mapper_input(line, separator)
        if not data_list:
            continue
        item = None
        # 最后3列, 年月日作為partitionkey, 無用
        if len(data_list) == len(columns) - 3:
            item = dict(zip(columns, data_list))
        elif len(data_list) == len(columns):
            item = dict(zip(columns, data_list))
        if not item:
            continue
        yield item


def index_columns(db, table):
    with open('desc.%s.%s' % (db, table), 'r') as fr:
        structure_list = deserialize(fr.read())
    return [column.get('col_name') for column in structure_list]


# map入口
def main(separator, columns):
    items = read_from_input(sys.stdin, separator, columns)
    mapper_result = {}
    for item in items:
        mapper_plugin_1(item, mapper_result)
        mapper_plugin_2(item, mapper_result)

def mapper_plugin_1(item, mapper_result): # key在現實中可以是不同appkey, 是用來分發到不同的reducer上的, 相同的route用來分發到相同的reducer key = 'route1' area = item.get('area') district = item.get('district') order_id = item.get('order_id') if not area or not district or not order_id: return mapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1}) def mapper_plugin_2(item, mapper_result): key = 'route2' strive_time = item.get('strive_time') order_id = item.get('order_id') if not strive_time or not order_id: return try: day_hour = strive_time.split(':')[0] mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex: pass def serialize(data, type='json'): if type == 'json': try: return json.dumps(data) except Exception, ex: return '' elif type == 'pickle': try: return pickle.dumps(data) except Exception, ex: return '' else: return '' def deserialize(data, type='json'): if type == 'json': try: return json.loads(data) except Exception, ex: return [] elif type == 'pickle': try: return pickle.loads(data) except Exception, ex: return [] else: return [] def mapper_input(line, separator='\t'): try: return line.split(separator) except Exception, ex: return None def mapper_output(key, data, separator='\t'): key = str(key) data = serialize(data) print '%s%s%s' % (key, separator, data) # print >> sys.stderr, '%s%s%s' % (key, separator, data) if __name__ == '__main__': db = sys.argv[1] table = sys.argv[2] columns = index_columns(db, table) main('||', columns)

reducer腳本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetter


def read_from_mapper(file, separator):
    for line in file:
        yield reducer_input(line)


def main(separator='\t'):
    reducer_result = {}
    line_list = read_from_mapper(sys.stdin, separator)
    for route_key, group in groupby(line_list, itemgetter(0)):
        if route_key is None:
            continue
        reducer_result.setdefault(route_key, {})
        if route_key == 'route1':
            reducer_plugin_1(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])
        if route_key == 'route2':
            reducer_plugin_2(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])

def reducer_plugin_1(route_key, group, reducer_result): for _, data in group: if data is None or len(data) == 0: continue if not data.get('area') or not data.get('district') or not data.get('count'): continue key = '_'.join([data.get('area'), data.get('district')]) reducer_result[route_key].setdefault(key, 0) reducer_result[route_key][key] += int(data.get('count')) # print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key]) def reducer_plugin_2(route_key, group, reducer_result): for _, data in group: if data is None or len(data) == 0: continue if not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'): continue key = data.get('day_hour') reducer_result[route_key].setdefault(key, {}) reducer_result[route_key][key].setdefault('count', 0) reducer_result[route_key][key].setdefault('order_list', []) reducer_result[route_key][key]['count'] += int(data.get('count')) if len(reducer_result[route_key][key]['order_list']) < 100: reducer_result[route_key][key]['order_list'].append(data.get('order_id')) # print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
def serialize(data, type='json'): if type == 'json': try: return json.dumps(data) except Exception, ex: return '' elif type == 'pickle': try: return pickle.dumps(data) except Exception, ex: return '' else: return '' def deserialize(data, type='json'): if type == 'json': try: return json.loads(data) except Exception, ex: return [] elif type == 'pickle': try: return pickle.loads(data) except Exception, ex: return [] else: return [] def reducer_input(data, separator='\t'): data_list = data.strip().split(separator, 2) key = data_list[0] data = deserialize(data_list[1]) return [key, data] def reducer_output(key, data, separator='\t'): key = str(key) data = serialize(data) print '%s\t%s' % (key, data) # print >> sys.stderr, '%s\t%s' % (key, data) if __name__ == '__main__': main()

 

5、上一個版本,遭遇了reduce慢的情況,原因有兩個:一是因為route的設置,所有相同的route都將分發到同一個reducer,造成單個reducer處理壓力大,性能下降。二是因為集群是搭建在虛擬機上的,性能本身就差。可以對這個問題進行改進。改進版本如下,方案是在mapper階段先對數據進行初步的統計,緩解reducer的計算壓力。

mapper腳本

# -*- coding:utf-8 -*-
#!/usr/bin/env python
import sys
import json
import pickle
reload(sys)
sys.setdefaultencoding('utf-8')


# 將字段和元數據匹配, 返回迭代器
def read_from_input(file, separator, columns):
    for line in file:
        if line is None or line == '':
            continue
        data_list = mapper_input(line, separator)
        if not data_list:
            continue
        item = None
        # 最后3列, 年月日作為partitionkey, 無用
        if len(data_list) == len(columns) - 3:
            item = dict(zip(columns, data_list))
        elif len(data_list) == len(columns):
            item = dict(zip(columns, data_list))
        if not item:
            continue
        yield item


def index_columns(db, table):
    with open('desc.%s.%s' % (db, table), 'r') as fr:
        structure_list = deserialize(fr.read())
    return [column.get('col_name') for column in structure_list]


# map入口
def main(separator, columns):
    items = read_from_input(sys.stdin, separator, columns)
    mapper_result = {}
    for item in items:
        mapper_plugin_1(item, mapper_result)
        mapper_plugin_2(item, mapper_result)

    for route_key, route_value in mapper_result.iteritems():
        for key, value in route_value.iteritems():
            ret_dict = dict()
            ret_dict['route_key'] = route_key
            ret_dict['key'] = key
            ret_dict.update(value)
            mapper_output('route_total', ret_dict)


def mapper_plugin_1(item, mapper_result):
    # key在現實中可以是不同appkey, 是用來分發到不同的reducer上的, 相同的route用來分發到相同的reducer
    key = 'route1'
    area = item.get('area')
    district = item.get('district')
    order_id = item.get('order_id')
    if not area or not district or not order_id:
        returntry:
        # total統計
        mapper_result.setdefault(key, {})
        mapper_result[key].setdefault('_'.join([area, district]), {})
        mapper_result[key]['_'.join([area, district])].setdefault('count', 0)
        mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])
        mapper_result[key]['_'.join([area, district])]['count'] += 1
        if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:
            mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)
    except Exception, ex:
        pass


def mapper_plugin_2(item, mapper_result):
    key = 'route2'
    strive_time = item.get('strive_time')
    order_id = item.get('order_id')
    if not strive_time or not order_id:
        return
    try:
        day_hour = strive_time.split(':')[0]# total統計
        mapper_result.setdefault(key, {})
        mapper_result[key].setdefault(day_hour, {})
        mapper_result[key][day_hour].setdefault('count', 0)
        mapper_result[key][day_hour].setdefault('order_id', [])
        mapper_result[key][day_hour]['count'] += 1
        if len(mapper_result[key][day_hour]['order_id']) < 10:
            mapper_result[key][day_hour]['order_id'].append(order_id)
    except Exception, ex:
        pass


def serialize(data, type='json'):
    if type == 'json':
        try:
            return json.dumps(data)
        except Exception, ex:
            return ''
    elif type == 'pickle':
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ''
    else:
        return ''


def deserialize(data, type='json'):
    if type == 'json':
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == 'pickle':
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []


def mapper_input(line, separator='\t'):
    try:
        return line.split(separator)
    except Exception, ex:
        return None


def mapper_output(key, data, separator='\t'):
    key = str(key)
    data = serialize(data)
    print '%s%s%s' % (key, separator, data)
    # print >> sys.stderr, '%s%s%s' % (key, separator, data)


if __name__ == '__main__':
    db = sys.argv[1]
    table = sys.argv[2]
    columns = index_columns(db, table)
    main('||', columns)

reducer腳本

#!/usr/bin/env python
# vim: set fileencoding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import json
import pickle
from itertools import groupby
from operator import itemgetter


def read_from_mapper(file, separator):
    for line in file:
        yield reducer_input(line)


def main(separator='\t'):
    reducer_result = {}
    line_list = read_from_mapper(sys.stdin, separator)
    for route_key, group in groupby(line_list, itemgetter(0)):
        if route_key is None:
            continue
        reducer_result.setdefault(route_key, {})if route_key == 'route_total':
            reducer_total(route_key, group, reducer_result)
            reducer_output(route_key, reducer_result[route_key])


def reducer_total(route_key, group, reducer_result):
    for _, data in group:
        if data is None or len(data) == 0:
            continue
        if data.get('route_key') == 'route1':
            reducer_result[route_key].setdefault(data.get('route_key'), {})
            reducer_result[route_key][data.get('key')].setdefault('count', 0)
            reducer_result[route_key][data.get('key')].setdefault('order_id', [])
            reducer_result[route_key][data.get('key')]['count'] += data.get('count')
            for order_id in data.get('order_id'):
                if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:
                    reducer_result[route_key][data.get('key')]['order_id'].append(order_id)
        elif data.get('route_key') == 'route2':
            reducer_result[route_key].setdefault(data.get('route_key'), {})
            reducer_result[route_key][data.get('key')].setdefault('count', 0)
            reducer_result[route_key][data.get('key')].setdefault('order_id', [])
            reducer_result[route_key][data.get('key')]['count'] += data.get('count')
            for order_id in data.get('order_id'):
                if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:
                    reducer_result[route_key][data.get('key')]['order_id'].append(order_id)
        else:
            pass


def serialize(data, type='json'):
    if type == 'json':
        try:
            return json.dumps(data)
        except Exception, ex:
            return ''
    elif type == 'pickle':
        try:
            return pickle.dumps(data)
        except Exception, ex:
            return ''
    else:
        return ''


def deserialize(data, type='json'):
    if type == 'json':
        try:
            return json.loads(data)
        except Exception, ex:
            return []
    elif type == 'pickle':
        try:
            return pickle.loads(data)
        except Exception, ex:
            return []
    else:
        return []


def reducer_input(data, separator='\t'):
    data_list = data.strip().split(separator, 2)
    key = data_list[0]
    data = deserialize(data_list[1])
    return [key, data]


def reducer_output(key, data, separator='\t'):
    key = str(key)
    data = serialize(data)
    print '%s\t%s' % (key, data)
    # print >> sys.stderr, '%s\t%s' % (key, data)


if __name__ == '__main__':
    main()

 

遇到的問題:

1、The DiskSpace /user/bigdata/qa quota of  is exceeded

在reducer結束后,遭遇如上問題,是因為HDFS  路徑下的disk容量已經被沾滿,釋放容量即可;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM