優酷服務端


服務端項目目錄

conf

import os


BASE_DIR = os.path.dirname(os.path.dirname(__file__))
BASE_MOVIE_DIR = os.path.join(BASE_DIR,'movie_dir')
settings.py

db

from orm_pool.orm import Models, StringField, IntegerField


class User(Models):
    table_name = 'user'
    id = IntegerField('id', primary_key=True)
    name = StringField('name')
    password = StringField('password')
    is_locked = IntegerField('is_locked', default=0)
    is_vip = IntegerField('is_vip', default=0)
    user_type = StringField('user_type')
    register_time = StringField('register_time')


class Movie(Models):
    table_name = 'movie'
    id = IntegerField('id', primary_key=True)
    name = StringField('name',column_type='varchar(64)')
    path = StringField('path')
    is_free = IntegerField('is_free', default=1)
    is_delete = IntegerField('is_delete', default=0)
    create_time = StringField('create_time')
    user_id = IntegerField('user_id')
    file_md5 = StringField('file_md5')


class Notice(Models):
    table_name = 'notice'
    id = IntegerField('id', primary_key=True)
    name = StringField('name')
    content = StringField('content')
    user_id = IntegerField('user_id')
    create_time = StringField('create_time')


class DownloadRecord(Models):
    table_name = 'download_record'
    id = IntegerField('id', primary_key=True)
    user_id = IntegerField('user_id')
    movie_id = IntegerField('movie_id')
    create_time = StringField('create_time')
modles.py

interface

from db import models
from datetime import datetime
from lib import common
from conf import settings
import os


@common.login_auth
def release_notice(recv_dic,conn):
    title = recv_dic.get('title')
    content = recv_dic.get('content')
    user_id = recv_dic.get('user_id')
    obj = models.Notice(name=title,content=content,user_id=user_id,create_time=str(datetime.now()))
    obj.save()
    back_dic = {'flag':True,'msg':'發布公告成功'}
    common.send_back(conn,back_dic)


@common.login_auth
def check_movie(recv_dic,conn):
    movie_data = models.Movie.select(file_md5=recv_dic['file_md5'])
    if movie_data:
        back_dic = {'flag':False,'msg':'該電影已存在'}
    else:
        back_dic = {'flag':True,'msg':'該電影不存在,可以上傳'}
    common.send_back(conn,back_dic)


@common.login_auth
def upload_movie(recv_dic,conn):
    # 這里為了避免上傳的視頻名字是一樣的但是內容不一樣,所以文件名應該盡量取的唯一
    file_name = common.get_session(recv_dic.get('file_name'))+recv_dic.get('file_name')
    file_path = os.path.join(settings.BASE_MOVIE_DIR,file_name)
    recv_size = 0
    with open(file_path,'wb') as f:
        while recv_size < recv_dic['file_size']:
            recv_data = conn.recv(1024)
            f.write(recv_data)
            recv_size += len(recv_data)
    # 調用orm完成保存操作
    movie_obj = models.Movie(name=file_name,path=file_path,is_free=recv_dic.get("is_free"),is_delete=0,
                             create_time = common.get_nowtime(),user_id = recv_dic.get('user_id'),
                             file_md5=recv_dic.get("file_md5")
                             )
    movie_obj.save()
    back_dic = {'flag':True,'msg':'上傳成功'}
    common.send_back(conn,back_dic)


@common.login_auth
def delete_movie(recv_dic,conn):
    # 刪除電影並不是真的將電影數據刪除 而僅僅是將標示該電影是否刪除的字段改為1
    movie_list = models.Movie.select(id=recv_dic.get("delete_movie_id"))
    movie_obj = movie_list[0]
    movie_obj.is_delete = 1
    movie_obj.update()
    back_dic = {'flag':True,'msg':'刪除成功'}
    common.send_back(conn,back_dic)
admin_interface
from lib import common
from db import models
import os


@common.login_auth
def get_movie_list(recv_dic,conn):
    movie_list = models.Movie.select()
    if movie_list:
        back_movie_list = []
        for movie in movie_list:
            # 篩選出所有沒有被刪除的電影
            if not movie.is_delete:
                # 根據用戶想要查詢的電影類型進行划分
                if recv_dic['movie_type'] == 'all':
                    # 只要不是刪除的電影都要  數據格式[[電影名,是否免費,數據id],[...],[...]]
                    back_movie_list.append([movie.name, '免費' if movie.is_free else '收費', movie.id])
                elif recv_dic['movie_type'] == 'free':
                    if movie.is_free:
                        back_movie_list.append([movie.name, '免費', movie.id])
                else:
                    if not movie.is_free:
                        back_movie_list.append([movie.name, '收費', movie.id])
        # 校驗電影列表是否為空
        if not back_movie_list:
            back_dic = {"flag":False,'msg':"暫無符合要求的電影"}
        else:
            back_dic = {'flag':True,'movie_list':back_movie_list}
    else:
        back_dic = {"flag":True,'msg':'暫無電影'}
    common.send_back(conn,back_dic)


@common.login_auth
def buy_member(recv_dic, conn):
    user_list = models.User.select(id=recv_dic['user_id'])
    user_obj = user_list[0]
    user_obj.is_vip = 1
    user_obj.update()
    back_dic = {'flag': True, 'msg': '購買會員成功'}
    common.send_back(conn,back_dic)


@common.login_auth
def download_movie(recv_dic,conn):
    movie_list = models.Movie.select(id=recv_dic.get('movie_id'))
    if movie_list:
        movie_obj = movie_list[0]
        user_obj = models.User.select(id=recv_dic['user_id'])[0]
        # 下載免費視頻需要判斷當前用戶是否是vip會員,如果不是客戶端需要等待30秒才能繼續下載
        wait_time = 0
        if recv_dic['movie_type'] == 'free':
            if user_obj.is_vip:
                wait_time = 0
            else:
                wait_time = 30
        back_dic = {'flag':True,"file_name":movie_obj.name,'file_size':os.path.getsize(movie_obj.path),'wait_time':wait_time}
        # 將下載記錄記錄到專門存放記錄的表中
        down_record = models.DownloadRecord(user_id=user_obj.id,movie_id=movie_obj.id,create_time=common.get_nowtime())
        down_record.save()
        common.send_back(conn,back_dic)

        # 打開文件傳輸給客戶端
        with open(movie_obj.path,'rb') as f:
            for line in f:
                conn.send(line)
    else:
        back_dic = {'flag':False,'msg':'該電影不存在'}
        common.send_back(conn,back_dic)



@common.login_auth
def check_download_record(recv_dic,conn):
    record_list = models.DownloadRecord.select(user_id=recv_dic['user_id'])
    back_record = []
    if record_list:
        for record in record_list:
            # 查出電影對應的電影名
            movie_obj = models.Movie.select(id=record.movie_id)[0]
            back_record.append(movie_obj.name)
        back_dic = {"flag":True,'record':back_record}
    else:
        back_dic = {'flag':False,'msg':"暫無觀影記錄"}
    common.send_back(conn,back_dic)


@common.login_auth
def check_notice(recv_dic,conn):
    notice_list = check_notice_by_count(count=None)
    if notice_list:
        back_dic = {'flag': True, 'notice_list': notice_list}
    else:
        back_dic = {'flag': False, 'msg': '暫無公告'}

    common.send_back(conn, back_dic)


# 定義專門用來獲取公告的方法
def check_notice_by_count(count=None):
    notice_list = models.Notice.select()
    back_notice_list = []
    if notice_list:  # 不為空,繼續查詢,為空直接返回false
        if not count:
            for notice in notice_list:
                back_notice_list.append({notice.name: notice.content})
        else:  # 查一條
            notice_list = sorted(notice_list, key=lambda notice: notice.create_time,reverse=True)
            back_notice_list.append({notice_list[0].name: notice_list[0].content})
        return back_notice_list
    else:
        return False
user_interface
from lib import common
from db import models
from datetime import datetime
from TcpServer import user_data
from interface import user_interface


def register(recv_dic, conn):
    name = recv_dic.get('name')
    user_data = models.User.select(name=name)
    if user_data:
        back_dic = {"flag": False, 'msg': '用戶名已存在'}
    else:
        password = recv_dic.get('password')
        user_type = recv_dic.get('user_type')
        user_obj = models.User(name=name, password=password, user_type=user_type, is_locked=0, is_vip=0,
                               register_time="%s" % datetime.now())
        user_obj.save()
        back_dic = {'flag': True, 'msg': '注冊成功'}
    common.send_back(conn, back_dic)


def login(recv_dic, conn):
    name = recv_dic.get("name")
    user_list = models.User.select(name=name)
    if user_list:
        user_obj = user_list[0]
        if user_obj.user_type == recv_dic.get("user_type"):
            if user_obj.password == recv_dic.get('password'):
                back_dic = {'flag': True, 'msg': '登陸成功','is_vip':user_obj.is_vip}
                session = common.get_session(user_obj.name)
                back_dic['session'] = session
                # 這里需要存儲用戶的session以及user_id 客戶端登陸成功后只會攜帶session過來
                # 我們除了校驗他的合法身份之外,也應該存該用戶數據庫對於的主鍵值,方便后期獲取該用戶數據進行相應的修改
                user_data.mutex.acquire()
                user_data.live_user[recv_dic['addr']] = [session, user_obj.id]
                user_data.mutex.release()
                # 普通用戶在登陸之后強制打印最新一條公告內容
                if recv_dic['user_type'] == 'user':
                    last_notice = user_interface.check_notice_by_count(1)
                    back_dic['last_notice'] = last_notice
            else:
                back_dic = {'flag': False, 'msg': "密碼錯誤"}
        else:
            back_dic = {'flag':False,'msg':'用戶類型不正確'}
    else:
        back_dic = {'flag': False, 'msg': '用戶不存在'}
    common.send_back(conn, back_dic)
common_interface

lib

import json
import hashlib
import time
from functools import wraps
from TcpServer import user_data
import struct


def send_back(conn,back_dic):
    back_bytes = json.dumps(back_dic).encode('utf-8')
    header = struct.pack('i',len(back_bytes))
    conn.send(header)
    conn.send(back_bytes)


def get_session(name):
    # 為了保證生成的隨機字符串是獨一無二的這里用cpu執行時間加鹽
    md = hashlib.md5()
    md.update(str(time.clock()).encode('utf-8'))
    md.update(name.encode('utf-8'))
    return md.hexdigest()


def login_auth(func):
    @wraps(func)
    def inner(*args,**kwargs):
        # args=(recv_dic,conn)
        for values in user_data.live_user.values():
            if args[0]['session'] == values[0]:
                # 如果當前用戶存在且登陸 將該用戶id放入recv_dic中
                args[0]['user_id'] = values[1]
                break
        if args[0].get("user_id"):
            func(*args,**kwargs)
        else:
            back_dic = {'flag':False,'msg':'請先登陸'}
            send_back(args[1],back_dic)
    return inner


def get_nowtime():
    now_time = time.strftime('%Y-%m-%d %X')
    return now_time
common

orm_pool

from DBUtils.PooledDB import PooledDB
import pymysql

POOL = PooledDB(
    creator=pymysql,  # 使用鏈接數據庫的模塊
    maxconnections=6,  # 連接池允許的最大連接數,0和None表示不限制連接數
    mincached=2,  # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
    maxcached=5,  # 鏈接池中最多閑置的鏈接,0和None不限制
    maxshared=3,
    # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
    blocking=True,  # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
    maxusage=None,  # 一個鏈接最多被重復使用的次數,None表示無限制
    setsession=[],  # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='12345678',
    database='youku',
    charset='utf8',
    autocommit='True'
)
db_pool
import pymysql
from orm_pool.db_pool import POOL

class Mysql(object):
    def __init__(self):
        self.conn = POOL.connection()
        self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)

    def close_db(self):
        self.cursor.close()
        self.conn.close()

    def select(self, sql, args=None):
        self.cursor.execute(sql, args)
        res = self.cursor.fetchall()
        return res

    def execute(self, sql, args):
        try:
            self.cursor.execute(sql, args)
        except BaseException as e:
            print(e)


if __name__ == '__main__':
    ms = Mysql()
    res = ms.select('select * from class where cid=%s',1)
    print(res)
mysql_pool
from orm_pool.mysql_pool import Mysql


class Field(object):
    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default


class StringField(Field):
    def __init__(self, name, column_type='varchar(32)', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


class IntegerField(Field):
    def __init__(self, name, column_type='int', primary_key=False, default=0):
        super().__init__(name, column_type, primary_key, default)


class ModelMetaClass(type):
    def __new__(cls, name, bases, attrs):
        if name == 'Models':
            return type.__new__(cls, name, bases, attrs)
        table_name = attrs.get('table_name', name)  # 獲取表名,如果用戶沒有自定義那么就用類名
        primary_key = None
        mappings = {}
        for k, v in attrs.items():
            if isinstance(v, Field):  # 判斷是否我們自定義的屬性table_name,id,name等...
                mappings[k] = v
                if v.primary_key:
                    if primary_key:
                        raise TypeError("一張表有且只有一個主鍵")
                    primary_key = k  # 判斷是否是主鍵
        # 將單個單個的自定義的字段屬性從attrs中
        for k in mappings.keys():
            attrs.pop(k)
        if not primary_key:
            raise TypeError('沒有設置主鍵')
        # 將表的三個特性表名,主鍵,字段付給類的屬性
        attrs['table_name'] = table_name
        attrs['primary_key'] = primary_key
        attrs['mappings'] = mappings
        return type.__new__(cls, name, bases, attrs)


class Models(dict, metaclass=ModelMetaClass):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def __getattr__(self, item):
        # 字典對象獲取值
        return self.get(item, '沒有該鍵值對!')

    def __setattr__(self, key, value):
        # 字段對象設置值
        self[key] = value

    @classmethod
    def select(cls, **kwargs):
        ms = Mysql()
        if not kwargs:
            # 第一種查詢可能:select * from userinfo;
            sql = "select * from %s" % cls.table_name
            res = ms.select(sql)  # 結果肯定是[{},{},{}]列表套字典
        else:
            # 第二種查詢可能:select * from userinfo where id=%s或name=%s或password=%s...;
            key = list(kwargs.keys())[0]  # 這里我們規定過濾條件只能有一個,所以這里只取一個
            value = kwargs.get(key)
            sql = "select * from %s where %s=?" % (cls.table_name, key)
            sql = sql.replace('?', '%s')
            res = ms.select(sql, value)  # 結果肯定是[{},{},{}]列表套字典
        if res:
            # 列表推導式獲取到一個個字典,**打散成name='jason',password='123'形式,傳入類中完成實例化
            return [cls(**r) for r in res]  # 返回結果為[obj1,obj2,obj3...]

    def update(self):
        ms = Mysql()
        # update user set name='jason',password='123' where id=1;  更新操作where后統一就用主鍵
        fields = []
        pr = None
        args = []
        for k,v in self.mappings.items():
            if v.primary_key:
                pr = getattr(self,v.name,v.default)
            else:
                fields.append(v.name+'=?')
                args.append(getattr(self,v.name,v.default))
        sql = "update %s set %s where %s=%s"%(self.table_name,','.join(fields),self.primary_key,pr)
        # 上句的拼接結果update user set name=?,password=? where id=1
        sql = sql.replace('?','%s')
        ms.execute(sql,args)

    def save(self):
        ms = Mysql()
        # insert into user(name,password) values('jason','123')
        fields = []
        args = []
        values = []
        for k,v in self.mappings.items():
            if not v.primary_key:
                fields.append(v.name)
                args.append('?')
                values.append(getattr(self,v.name,v.default))
        sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args))
        # sql = "insert into user(name,password) values(?,?)"
        sql = sql.replace('?','%s')
        ms.execute(sql,values)
orm

orm_singleton

TcpServer

import socket
import json
from db import models
from datetime import datetime
from lib import common
from interface import admin_interface,user_interface,common_interface
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from TcpServer import user_data
import struct

# 對於報錯消息是Address already in use
from socket import SOL_SOCKET,SO_REUSEADDR



pool = ThreadPoolExecutor(20)
mutex = Lock()
# 將生成的鎖放到user_data,避免出現交叉導入的現象,這里導了common_interface,common_interface中又要導這里
user_data.mutex = mutex


func_dic = {
    'register':common_interface.register,
    'login':common_interface.login,
    'release_notice':admin_interface.release_notice,
    'check_movie':admin_interface.check_movie,
    'upload_movie':admin_interface.upload_movie,
    'get_movie_list':user_interface.get_movie_list,
    'delete_movie':admin_interface.delete_movie,
    'buy_member':user_interface.buy_member,
    'download_movie':user_interface.download_movie,
    'check_download_record':user_interface.check_download_record,
    'check_notice':user_interface.check_notice
}

def get_server():
    server = socket.socket()
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)  # 在bind之前設置,避免服務重啟時避免報Address already in use
    server.bind(('127.0.0.1',8082))
    server.listen(5)
    while True:
        conn,addr = server.accept()
        pool.submit(working,conn,addr)





def working(conn,addr):
    while True:
        try:
            recv_header = conn.recv(4)
            recv_bytes = conn.recv(struct.unpack('i',recv_header)[0])
            recv_dic = json.loads(recv_bytes.decode('utf-8'))
            # 在執行分發方法之前,將當前用戶的addr也放入接受的到的字典中
            recv_dic['addr'] = str(addr)  # 由於addr不是字符串,這里手動轉一下,后面需要拿它作為字典的key
            dispatch(recv_dic,conn)
        except Exception as e:
            print(e)
            conn.close()
            user_data.mutex.acquire()
            user_data.live_user.pop(str(addr))
            user_data.mutex.release()
            break


def dispatch(recv_dic,conn):
    if recv_dic['type'] in func_dic:
        func_dic.get(recv_dic['type'])(recv_dic,conn)
    else:
        back_dic = {'flag':False,'msg':'請求不合法!'}
        common.send_back(conn,back_dic)
tcpserver
live_user = {}
mutex = None
user_data

 

import os
import sys


BASE_DIR = os.path.dirname(__file__)
sys.path.append(BASE_DIR)
from TcpServer import tcpserver


if __name__ == '__main__':
    tcpserver.get_server()
start.py

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM