結合docker做flask+kafka數據接口與壓力測試


一、需求

需要做實時數據接入的接口、數據最終要寫入庫,要做到高並發,數據的完整,不丟失數據。

二、技術選型

1.因為只是做簡單的接口,不需要復雜功能,所以決定用flask這個簡單的python框架(因為做運維的作者只會python所以只能在python框架里找);

2.要做到數據的實時性,考慮到數據落地入庫可能io會延時比較大,所以決定數據通過接口先寫入消息隊列中間件kafka

為什么用kafka因為kafka數據是順序寫文件,效率還可以,主要是的寫入文件可以保證自定義時間內的數據不丟失;kafka可以做集群提高性能;kafka支持同一個group下多個消費程序對同一個topic處理;如果聽不懂請自己學習kafka相關知識

3.因為考慮到后期的快速部署與遷移問題,所以決定結合docker來做。(主要是為了裝逼,再有就是回顧一下docker知識

三、原理圖

Sample Flowchart Template

條件可以的話,zookeeper+kafka也可以做成docker鏡像,這里這樣做是作者的服務器資源有限;也方便后面壓力測試,所以各個程序都獨立開

 

四、操作過程

1.安裝配置kafka

(1)、在服務器上先安裝java環境yum install  java-1.8.0-openjdk* –y

(2)、下載kafka包kafka_2.11-2.1.1.tgz;解壓到指定目錄下;

cd /data/kafka_2.11-2.1.1/目錄

bin/zookeeper-server-start.sh –daemon config/zookeeper.properties &

配置kafka配置文件 config/server.properties開啟listeners=PLAINTEXT://:9092(kafka監聽0.0.0.0:9092端口不開啟默認是localhost:9092);添加delete.topic.enable=true(允許指定刪除topic;默認為flase;kafka_2.11-2.1.1/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic xxxx);log.retention.hours=168(數據保留時間默認為168小時);log.segment.bytes=1073741824(最大segment數據文件大小為1G;單位字節);

開啟kafka服務

bin/kafka-server-start.sh config/server.properties &

如果安裝了nohup可以用nohup bin/kafka-server-start.sh config/server.properties & 開啟不會在退出終端時退出程序。

(3)、netstat –tpln看到9092和2181都已經監聽起來就OK再檢查selinux是否關閉,iptables是否放通

 

2.制做nginx+uwsgi+flask的docker鏡像

(1)、下載centos6.6的docker基礎境像到docker;運行后安裝nginx和python3(安裝python3下載python3的tar包到服務器解壓;創建目錄/usr/local/python3;進入python3解壓目錄./configure --prefix=/usr/local/python3;make && make install ; ln –s /usr/local/python3/bin/python3 /usr/bin;ln –s /usr/local/python3/bin/pip3 /usr/bin)

(2)、安裝uwsgi;pip3 install uwsgi;ln –s /usr/local/python3/bin/uwsgi /usr/bin;

(3)、安裝flask;pip3 install flask;

(4)、安裝python3 kafka相關包(pip3 install kakfa-python;pip3 install kafka [這個包方便在運行kafka-python下KafkaProducer方法時有錯誤話有報錯輸出,不然有問題是沒有報錯的 坑死了])

(5)、創建目錄/data/webapi

(6)、在/etc/hosts下加上kafka服務器的hostname的解析;不然服務器是無法發消息到kafka的大坑原因:是從zookeeper獲取broker的meta信息時候返回的不是IP而是hostname

(7)、配置啟動nginx和uwsgi腳本startnguw.sh如下:

#!/bin/bash
sh /root/nginx_ops.sh start
nohup uwsgi /data/webapi/app.ini &
tail -f /dev/null

注意nginx作者是腳本安裝的,安裝好就有nginx_ops.sh的啟動腳本;app.ini是uwsgi的配置文件;tail –f /dev/null這個是方便用docker-compose啟動加的不加用docker-compose啟動的話就會有直重啟docker

(8).配置/usr/local/nginx/config/vhosts/webapi.conf如下:

server {
        listen 4000;
        server_name 0.0.0.0;
 
        location / {
                include uwsgi_params;
                uwsgi_pass unix:/data/webapi/webapi.sock;
        }

}

(這里指定端口是4000;uwsgi是用的webapi.sock套接字,也可以用ip加端口具體看app.ini的uwsgi的配置文件是怎么配置的;)

以上配置好就可以通過docker commit –p 原docker名字 新docker名字保存為新的包含新安裝內容的docker鏡像;如果有阿里cr.console.aliyun.com或hub.docker賬號的可以命名為自己賬號下的倉庫名加版本號上傳。

作者已經做好了;地址為 registry.cn-shenzhen.aliyuncs.com/wuxiaozy/nguwsgi:v0.2

 

3.在centos7服務器上配置docker-compose.yml通過docker-compose運行容器

(1)、在centos7上安裝docker-compose

(2)、創建目錄/usr/local/nguwsgi

(3)、創建文件docker-compose.yml內容如下:

version: '2'
services:
  nguw:
    image: registry.cn-shenzhen.aliyuncs.com/wuxiaozy/nguwsgi:v0.2
    container_name: nguwsgi01
    restart: always
    dns_search: .
    cap_add:
      - ALL
    volumes:
      - /data/webapi:/data/webapi
    ports:
      - 4000:4000
    networks:
      - luntan
    command:
      - /bin/bash
      - -c
      - |
        /bin/bash /root/startnguw.sh
networks:
  luntan:
    external: false

注意:volumes為把centos7下的/data/webapi目錄映射到nguwsgi01這個docker容器的/data/webapi目錄前提是容器要有/data/webapi這個目錄;port表示把docker容器的4000端口映射成本機的4000端口

(4)、在centos7的/data/webapi目錄下上傳flask項目和app.ini uwsgi的配置文件

uwsgi配置文件如下:

[uwsgi]
base_dir = /data/webapi
chdir = /data/webapi
wsgi-file = myflask.py
callable = app
 
socket = %(base_dir)/webapi.sock
chmod-socket = 666
 
processes = 4
threads = 10
 
master = true
daemonize = %(base_dir)/chat.log
pidfile  = %(base_dir)/chat.pid

(注意myflask.py是flask的主啟動文件;socker這個配置和nginx下webapi.conf配置的 uwsgi_pass有關,這里配置是webapi.sock話nginx配置也是webapi.sock;process和threads表示uwsgi開多少個進程,每個進程開多少個線程和uwgi的性能配置有關)

 

flask內容如下:

#from gevent import monkey
#monkey.patch_all()

from flask import Flask,render_template,request
from kafka import KafkaProducer
import json


#from gevent.pywsgi import WSGIServer


app = Flask(__name__)


@app.route('/')
def hello_world():
    return 'Hello World!'

@app.route('/hello')
@app.route('/hello/<name>')
def hello(name=None):
    return render_template('hello.html',name=name)

@app.route('/user/<username>')
def show_user_profile(username):
    return "hello Mr %s"%(username)


@app.route('/financial_pro',methods=['GET','POST'])
def financial_pro():
    if request.method == 'POST':
        data = request.form
        producer = KafkaProducer(bootstrap_servers=['192.168.32.56:9092'],value_serializer=lambda v: json.dumps(v).encode("utf-8"))
        response = producer.send('financial_pro',data)
        producer.flush()
        print(response)
        return "OK"
    else:
        return "Methods Error"

if __name__ == '__main__':
    app.run(host="0.0.0.0",port=4000,threaded=True)
    #http_server = WSGIServer(('0.0.0.0',4000),app)
    #http_server.serve_forever()

注意:/;/hello;/user/name;這些是測試網頁;financial_pro是接口;在安裝了flask的python3環境下python3 myflask.py是可以開啟web服務的只是性能差高並發下會的數據丟失;也可以用gevent+flask開啟性能也沒有nginx+uwsgi高

(5)、開啟nguwsgi01這個docker容器

cd /usr/local/nguwsgi目錄下動行 docker-compose up –d(如果docker-compose.yml文件名字不叫這個就要用docker-compose –f xxxx.yml up –d)

 

4.編寫handler程序從kafka讀取數據實現數據的落地;

內容如下:(python3編寫需要pip3 install kafka-python)

from kafka import KafkaConsumer
import json
from multiprocessing import Pool
import time
import threading


##定義參數
#程序處理的接口
handler_API = ["identity_pro","financial_pro","internet_pro","social_pro","trip_pro","communication_pro"]
#不同接口處理程序group_id
gids = {"identity_pro":"ide_g","financial_pro":"fin_g","internet_pro":"int_g","social_pro":"soc_g","trip_pro":"tri_g","communication_pro":"com_g"}
#不同group_id下消費程序的數量
xf = {"identity_pro":1,"financial_pro":1,"internet_pro":1,"social_pro":1,"trip_pro":1,"communication_pro":1}
#kafka服務器地址和端口
kafka_servers=["192.168.32.56:9092"]
#各接口數據字段數
check_data = {"identity_pro":7,"financial_pro":4,"internet_pro":5,"social_pro":3,"trip_pro":3,"communication_pro":4}

class Handler(object):
    def __init__(self,handler_API,gids,xf,kafka_servers,check_data):
        self.hanapi = handler_API
        self.gids = gids
        self.xf = xf
        self.kafka_servers = kafka_servers
        self.check_data = check_data

    def conumers(self,api_name,kfksers):
        gid = self.gids[api_name]
        # eraliest為從最早的偏移量開始
        #con = KafkaConsumer(api_name,bootstrap_servers=[kfksers],auto_offset_reset="earliest",value_deserializer=json.loads)
        #auto_offset_reset默認為latest
        con = KafkaConsumer(api_name, bootstrap_servers=[kfksers], auto_offset_reset="latest",value_deserializer=json.loads)
        Tm = time.strftime("%Y-%m-%d %H:%M:%S")
        for message in con:
            print("[%s] %s:%d:%d:key:%s"%(Tm,message.topic,message.partition,message.offset,message.key))
            data = message.value
            if int(self.check_data[api_name]) == len(data.keys()):
                print(data)
                with open("d:/%s.txt"%api_name,"a") as F:
                    F.write("[%s]--%s\n"%(Tm,data))
                F.close()
            else:
                print("%s--數據字段不符合要求!!!"%data)
                with open("d:/%s.txt"%api_name,"a") as F:
                    F.write("[%s]--%s--%s"%(Tm,data,"數據字段數不符合要求!!!\n"))
                F.close()

    def conumers_thread_num(self,api_name,kfksers):
        thread_num = int(self.xf[api_name])
        for i in range(thread_num):
            t = threading.Thread(target=conumers_wrapper,args=(self,api_name,kfksers))
            t.setDaemon(True)
            t.start()
            t.join()

    def multirun(self):
        p = Pool(5)
        for i in range(int(len(self.hanapi))):
            api_name = self.hanapi[i]
            kfksers = ",".join(self.kafka_servers)
            print("開啟子進程%s"%i)
            p.apply_async(conumers_thread_num_wrapper,args=(self,api_name,kfksers))
        print('等待所有添加的進程運行完畢。。。')
        p.close()
        p.join()


def conumers_wrapper(cls_instance,api_name,kfksers):
    return cls_instance.conumers(api_name,kfksers)

def conumers_thread_num_wrapper(cls_instance,api_name,kfksers):
    return cls_instance.conumers_thread_num(api_name,kfksers)

if __name__ == "__main__":
    Hd = Handler(handler_API,gids,xf,kafka_servers,check_data)
    Hd.multirun()

注意:腳本通過修改參數可以支持kafka集群和kafka 同一goup_id下多consumer消費進程;)

 

5.在服務器centos6.5 ip:192.168.32.183上ab壓測

安裝httpd-ab;yum install httpd-ab

壓測命令:

time ab -c 100 -n 10000 -T 'application/x-www-form-urlencoded' -p /tmp/wx20191010.txt http://192.168.32.78:4000/financial_pro

/tmp/wx20191010.txt內容為test01=111&test02=222&test03=ccc&test04=ddd

(-c 100是100個並發; –n 10000是10000次請求;-T http頭;-p是post數據文件)

測試結果:

python3 myflask.py的情況下 100個並發10000次請求;cpu 70%;耗時1m49秒;有大約500~600個數據丟失;

nginx+uwsgi+flask下(四個進程processes=4;4線程threads=4);cpu 30%;耗時1m30秒;沒有數據丟失;

nginx+uwsgi+flask下(四個進程processes=4;4線程threads=10);cpu 60%;耗時49秒;沒有數據丟失;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM