利用python3.5 構建流媒體后台音視頻切換的服務端程序


#!/usr/bin/env python3.5.0
# -*- coding:utf8 -*-
import os,sys,socket,hashlib,time,select,threading,configparser
import pymssql
rootdir =os.path.abspath(sys.argv[0])
rootdir =os.path.dirname(rootdir) +"/"
cf =configparser.ConfigParser()
if os.path.exists(rootdir +'srs_server.conf'):
    cf.read(rootdir +'srs_server.conf')
else:
    # 如果文件不存在
    f = open(rootdir + "/srs_server.conf","w")
    f.write("")
    f.close()
    # 讀取配置文件
    cf.read(rootdir +'srs_server.conf')
    cf.add_section("srs")
    cf.set("srs","config","1")
    # 寫回配置文件
    cf.write(open(rootdir +'srs_server.conf',"w"))
def decrypt(s,key=2):
    """
    解密方法
    :param key:
    :param s:
    :return:
    """
    c = bytearray(str(s).encode("gbk"))
    n = len(c) # 計算 b 的字節數
    if n % 2 != 0 :
        return ""
    n = n // 2
    b = bytearray(n)
    j = 0
    for i in range(0, n):
        c1 = c[j]
        c2 = c[j+1]
        j = j+2
        c1 = c1 - 65
        c2 = c2 - 65
        b2 = c2*16 + c1
        b1 = b2^ key
        b[i]= b1
    try:
        return b.decode("gbk")
    except:
        return "failed"
# 實例化數據庫類
class MSSQL:
    def __init__(self,host,user,pwd,db,port =1433):
        self.host = host
        self.port = port
        self.user = user
        self.pwd = pwd
        self.db = db
    def __GetConnect(self):
        try:
            if not self.db:
                print("沒有設置數據庫信息")
            self.conn = pymssql.connect(host=self.host,port=self.port,user=self.user,password=self.pwd,database=self.db,charset="utf8")
            cur = self.conn.cursor()
            if not cur:
                print("連接數據庫失敗!")
            else:
                return cur
        except Exception as e:
            print("連接數據庫失敗,%s"%e)
    def ExecQuery(self,sql):
        cur = self.__GetConnect()
        cur.execute(sql)
        resList = cur.fetchall()
        # 查詢完畢后必須關閉連接
        self.conn.close()
        return resList

    def ExecNonQuery(self,sql):
        cur = self.__GetConnect()
        cur.execute(sql)
        self.conn.commit()
        self.conn.close()
# 查詢數據庫相關信息情況
# 更新SQL數據庫相關信息情況
def updatesql(load=0,status=0,item=0):
    # 加載配置文件
    cf = configparser.ConfigParser()
    if os.path.exists(rootdir+"srs_server.conf"):
        try:
            cf.read(rootdir+"srs_server.conf")
        except Exception as c:
            print(c)
    else:
        print("加載srs_server.conf配置文件失敗!")
    # 實例化MSSQL操作類
    host = cf.get("HOST","host")
    ms = MSSQL(host=cf.get("DB","ip"),user=decrypt(cf.get("DB","username")),pwd=decrypt(cf.get("DB","password")),db=cf.get("DB","db"),port=int(cf.get("DB","port")))
    try:
        # 更新該服務器所有記錄
        sql = "update sys_load set item = '%s',sysload='%s',status='%s' where servername ='%s'"%(item, load, status, host)
        ms.ExecNonQuery(sql)
    except:
        pass

def selectdata(conn):
    conf = cf.get("srs","config")
    if conf == "1":
        conn.send(bytes("當前為音頻模式!","utf-8"))
    elif conf == "2":
        conn.send(bytes("當前為視頻模式!","utf-8"))
def updatesound(conn):
    with  open(rootdir +'srs.conf',"r") as f:
        data = f.readlines()
        for i,line in enumerate(data):
            # 去除制表符,換行符
            line = line.replace('\t','').replace('\n','').replace(' ','')
            if line =="#allowpublish220.168.91.254;":
                data[i] ="\tallow\t\tpublish\t\t220.168.91.254;\t\n"
            elif line == "#forward10.27.68.150;":
                data[i] = "\tforward\t\t10.27.68.150;\t\t \n"
            elif line == "transcodelive/livestream{":
                f.seek(i+1)
                data[i+1] = "\tenabled\t\ton;\n"
    with open(rootdir +'srs.conf',"w") as xf:
        xf.writelines(data)
        xf.close()
    status = os.system("service srs restart")
    if status == 0:
        cf.read(rootdir +'srs_server.conf')
        cf.set("srs","config","1")
        cf.write(open(rootdir +'srs_server.conf',"w"))
        updatesql(item=0,load=1,status=0)
        conn.send(bytes("當前已成功切換為音頻模式","utf-8"))
    else:
        conn.send(bytes("切換為音頻模式失敗,請聯系IT部!","utf-8"))
def updatevideo(conn):
    with open(rootdir +'srs.conf',"r") as f:
        data = f.readlines()
        for i,line in enumerate(data):
            # 去除制表符,換行符
            line = line.replace('\t','').replace('\n','').replace(' ','')
            if line =="allowpublish220.168.91.254;":
                data[i] ="\t#allow\t\tpublish\t\t220.168.91.254;\t\n"
            elif line == "forward10.27.68.150;":
                data[i] = "\t#forward\t10.27.68.150;\n"
            elif line == "transcodelive/livestream{":
                # 下一行
                f.seek(i+1)
                data[i+1] = "\tenabled\t\toff;\n"
    with open(rootdir +'srs.conf',"w") as xf:
        xf.writelines(data)
        xf.close()
    status = os.system("service srs restart")
    if status == 0:
        cf.read(rootdir +'srs_server.conf')
        cf.set("srs","config","2")
        cf.write(open(rootdir +'srs_server.conf',"w"))
        updatesql(item=1,load=3,status=1)
        conn.send(bytes("當前已成功切換為視頻模式","utf-8"))
    else:
        conn.send(bytes("切換視頻模式失敗,請聯系IT部!","utf-8"))
def crash_recovery(conn):
    os.system("service srs stop")
    # 強制還原配置文件
    os.system("yes|cp -fr srs_bak.conf srs.conf")
    status = os.system("service srs start")
    if status == 0:
        cf.read(rootdir +'srs_server.conf')
        cf.set("srs","config","1")
        cf.write(open(rootdir +'srs_server.conf',"w"))
        updatesql(item=0,load=1,status=0)
        conn.send(bytes("修復音頻模式成功","utf-8"))
    else:
        conn.send(bytes("修復音頻模式失敗,請與IT部聯系!","utf-8"))
# 業務執行函數
def operation(conn):
    content = """
    請輸入數字選項進行操作:
    1、查詢當前直播頻道
    2、切換頻道為音頻直播
    3、切換頻道為視頻直播
    4、故障修復(默認修復為音頻模式)
    ***********************
    """
    conn.send(bytes(content,"utf-8"))
    while True:
        # 接收數據
        data = conn.recv(1000)
        test =data.decode()
        if int(test) == 1:
            selectdata(conn)
        elif int(test) ==2:
            updatesound(conn)
        elif int(test) ==3:
            updatevideo(conn)
        elif int(test) ==4:
            crash_recovery(conn)
# SOCKET主進程模塊
def process(conn,addr):
    try:
        i = 0
        # 認證失敗允許重試3次
        while i < 3:
            flage = False
            # 接收客戶端連接請求信息
            info = conn.recv(1000)
            # 實例化加密函數
            hash = hashlib.sha512()
            hash.update("a123456789B".encode("utf-8"))  # KEY=a123456789B
            hash_pwd = hash.hexdigest()
            if info.decode() == hash_pwd:
                hash_pwd = ""
                info = ""
                if addr[0] in ["192.168.1.252","192.168.1.190","127.0.0.1"]:
                    flage = True
            # 接收用戶及密碼信息
            while flage:
                operation(conn)
            else:
                # 登陸失敗,發送給客戶端重新驗證
                i += 1
                conn.send(bytes("error","utf8"))
            if i > 2:
                # 主動關閉連接
                conn.close()
                time.sleep(25)
    except Exception as e:
        conn.close()
        time.sleep(15)
# SOCKET服務模塊
def sock_server():
    '''
    啟動服務器端,開啟線程監聽
    :return:
    '''
    server = socket.socket()
    server_ip ="localhost"
    server_port = 1888
    server.bind((server_ip,server_port))
    server.listen(10)
    while True:
        r,w,e = select.select([server,], [], [], 1)
        for i,server in enumerate(r):
            time.sleep(1)
            conn,addr = server.accept()
            # 創建線程
            t = threading.Thread(target=process, args=(conn, addr))
            # 啟動線程
            t.start()
if __name__ =="__main__":
    sock_server()

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM