服務端項目目錄
conf

import os BASE_DIR = os.path.dirname(os.path.dirname(__file__)) BASE_MOVIE_DIR = os.path.join(BASE_DIR,'movie_dir')
db

from orm_pool.orm import Models, StringField, IntegerField class User(Models): table_name = 'user' id = IntegerField('id', primary_key=True) name = StringField('name') password = StringField('password') is_locked = IntegerField('is_locked', default=0) is_vip = IntegerField('is_vip', default=0) user_type = StringField('user_type') register_time = StringField('register_time') class Movie(Models): table_name = 'movie' id = IntegerField('id', primary_key=True) name = StringField('name',column_type='varchar(64)') path = StringField('path') is_free = IntegerField('is_free', default=1) is_delete = IntegerField('is_delete', default=0) create_time = StringField('create_time') user_id = IntegerField('user_id') file_md5 = StringField('file_md5') class Notice(Models): table_name = 'notice' id = IntegerField('id', primary_key=True) name = StringField('name') content = StringField('content') user_id = IntegerField('user_id') create_time = StringField('create_time') class DownloadRecord(Models): table_name = 'download_record' id = IntegerField('id', primary_key=True) user_id = IntegerField('user_id') movie_id = IntegerField('movie_id') create_time = StringField('create_time')
interface

from db import models from datetime import datetime from lib import common from conf import settings import os @common.login_auth def release_notice(recv_dic,conn): title = recv_dic.get('title') content = recv_dic.get('content') user_id = recv_dic.get('user_id') obj = models.Notice(name=title,content=content,user_id=user_id,create_time=str(datetime.now())) obj.save() back_dic = {'flag':True,'msg':'發布公告成功'} common.send_back(conn,back_dic) @common.login_auth def check_movie(recv_dic,conn): movie_data = models.Movie.select(file_md5=recv_dic['file_md5']) if movie_data: back_dic = {'flag':False,'msg':'該電影已存在'} else: back_dic = {'flag':True,'msg':'該電影不存在,可以上傳'} common.send_back(conn,back_dic) @common.login_auth def upload_movie(recv_dic,conn): # 這里為了避免上傳的視頻名字是一樣的但是內容不一樣,所以文件名應該盡量取的唯一 file_name = common.get_session(recv_dic.get('file_name'))+recv_dic.get('file_name') file_path = os.path.join(settings.BASE_MOVIE_DIR,file_name) recv_size = 0 with open(file_path,'wb') as f: while recv_size < recv_dic['file_size']: recv_data = conn.recv(1024) f.write(recv_data) recv_size += len(recv_data) # 調用orm完成保存操作 movie_obj = models.Movie(name=file_name,path=file_path,is_free=recv_dic.get("is_free"),is_delete=0, create_time = common.get_nowtime(),user_id = recv_dic.get('user_id'), file_md5=recv_dic.get("file_md5") ) movie_obj.save() back_dic = {'flag':True,'msg':'上傳成功'} common.send_back(conn,back_dic) @common.login_auth def delete_movie(recv_dic,conn): # 刪除電影並不是真的將電影數據刪除 而僅僅是將標示該電影是否刪除的字段改為1 movie_list = models.Movie.select(id=recv_dic.get("delete_movie_id")) movie_obj = movie_list[0] movie_obj.is_delete = 1 movie_obj.update() back_dic = {'flag':True,'msg':'刪除成功'} common.send_back(conn,back_dic)

from lib import common from db import models import os @common.login_auth def get_movie_list(recv_dic,conn): movie_list = models.Movie.select() if movie_list: back_movie_list = [] for movie in movie_list: # 篩選出所有沒有被刪除的電影 if not movie.is_delete: # 根據用戶想要查詢的電影類型進行划分 if recv_dic['movie_type'] == 'all': # 只要不是刪除的電影都要 數據格式[[電影名,是否免費,數據id],[...],[...]] back_movie_list.append([movie.name, '免費' if movie.is_free else '收費', movie.id]) elif recv_dic['movie_type'] == 'free': if movie.is_free: back_movie_list.append([movie.name, '免費', movie.id]) else: if not movie.is_free: back_movie_list.append([movie.name, '收費', movie.id]) # 校驗電影列表是否為空 if not back_movie_list: back_dic = {"flag":False,'msg':"暫無符合要求的電影"} else: back_dic = {'flag':True,'movie_list':back_movie_list} else: back_dic = {"flag":True,'msg':'暫無電影'} common.send_back(conn,back_dic) @common.login_auth def buy_member(recv_dic, conn): user_list = models.User.select(id=recv_dic['user_id']) user_obj = user_list[0] user_obj.is_vip = 1 user_obj.update() back_dic = {'flag': True, 'msg': '購買會員成功'} common.send_back(conn,back_dic) @common.login_auth def download_movie(recv_dic,conn): movie_list = models.Movie.select(id=recv_dic.get('movie_id')) if movie_list: movie_obj = movie_list[0] user_obj = models.User.select(id=recv_dic['user_id'])[0] # 下載免費視頻需要判斷當前用戶是否是vip會員,如果不是客戶端需要等待30秒才能繼續下載 wait_time = 0 if recv_dic['movie_type'] == 'free': if user_obj.is_vip: wait_time = 0 else: wait_time = 30 back_dic = {'flag':True,"file_name":movie_obj.name,'file_size':os.path.getsize(movie_obj.path),'wait_time':wait_time} # 將下載記錄記錄到專門存放記錄的表中 down_record = models.DownloadRecord(user_id=user_obj.id,movie_id=movie_obj.id,create_time=common.get_nowtime()) down_record.save() common.send_back(conn,back_dic) # 打開文件傳輸給客戶端 with open(movie_obj.path,'rb') as f: for line in f: conn.send(line) else: back_dic = {'flag':False,'msg':'該電影不存在'} common.send_back(conn,back_dic) @common.login_auth def check_download_record(recv_dic,conn): record_list = models.DownloadRecord.select(user_id=recv_dic['user_id']) back_record = [] if record_list: for record in record_list: # 查出電影對應的電影名 movie_obj = models.Movie.select(id=record.movie_id)[0] back_record.append(movie_obj.name) back_dic = {"flag":True,'record':back_record} else: back_dic = {'flag':False,'msg':"暫無觀影記錄"} common.send_back(conn,back_dic) @common.login_auth def check_notice(recv_dic,conn): notice_list = check_notice_by_count(count=None) if notice_list: back_dic = {'flag': True, 'notice_list': notice_list} else: back_dic = {'flag': False, 'msg': '暫無公告'} common.send_back(conn, back_dic) # 定義專門用來獲取公告的方法 def check_notice_by_count(count=None): notice_list = models.Notice.select() back_notice_list = [] if notice_list: # 不為空,繼續查詢,為空直接返回false if not count: for notice in notice_list: back_notice_list.append({notice.name: notice.content}) else: # 查一條 notice_list = sorted(notice_list, key=lambda notice: notice.create_time,reverse=True) back_notice_list.append({notice_list[0].name: notice_list[0].content}) return back_notice_list else: return False

from lib import common from db import models from datetime import datetime from TcpServer import user_data from interface import user_interface def register(recv_dic, conn): name = recv_dic.get('name') user_data = models.User.select(name=name) if user_data: back_dic = {"flag": False, 'msg': '用戶名已存在'} else: password = recv_dic.get('password') user_type = recv_dic.get('user_type') user_obj = models.User(name=name, password=password, user_type=user_type, is_locked=0, is_vip=0, register_time="%s" % datetime.now()) user_obj.save() back_dic = {'flag': True, 'msg': '注冊成功'} common.send_back(conn, back_dic) def login(recv_dic, conn): name = recv_dic.get("name") user_list = models.User.select(name=name) if user_list: user_obj = user_list[0] if user_obj.user_type == recv_dic.get("user_type"): if user_obj.password == recv_dic.get('password'): back_dic = {'flag': True, 'msg': '登陸成功','is_vip':user_obj.is_vip} session = common.get_session(user_obj.name) back_dic['session'] = session # 這里需要存儲用戶的session以及user_id 客戶端登陸成功后只會攜帶session過來 # 我們除了校驗他的合法身份之外,也應該存該用戶數據庫對於的主鍵值,方便后期獲取該用戶數據進行相應的修改 user_data.mutex.acquire() user_data.live_user[recv_dic['addr']] = [session, user_obj.id] user_data.mutex.release() # 普通用戶在登陸之后強制打印最新一條公告內容 if recv_dic['user_type'] == 'user': last_notice = user_interface.check_notice_by_count(1) back_dic['last_notice'] = last_notice else: back_dic = {'flag': False, 'msg': "密碼錯誤"} else: back_dic = {'flag':False,'msg':'用戶類型不正確'} else: back_dic = {'flag': False, 'msg': '用戶不存在'} common.send_back(conn, back_dic)
lib

import json import hashlib import time from functools import wraps from TcpServer import user_data import struct def send_back(conn,back_dic): back_bytes = json.dumps(back_dic).encode('utf-8') header = struct.pack('i',len(back_bytes)) conn.send(header) conn.send(back_bytes) def get_session(name): # 為了保證生成的隨機字符串是獨一無二的這里用cpu執行時間加鹽 md = hashlib.md5() md.update(str(time.clock()).encode('utf-8')) md.update(name.encode('utf-8')) return md.hexdigest() def login_auth(func): @wraps(func) def inner(*args,**kwargs): # args=(recv_dic,conn) for values in user_data.live_user.values(): if args[0]['session'] == values[0]: # 如果當前用戶存在且登陸 將該用戶id放入recv_dic中 args[0]['user_id'] = values[1] break if args[0].get("user_id"): func(*args,**kwargs) else: back_dic = {'flag':False,'msg':'請先登陸'} send_back(args[1],back_dic) return inner def get_nowtime(): now_time = time.strftime('%Y-%m-%d %X') return now_time
orm_pool

from DBUtils.PooledDB import PooledDB import pymysql POOL = PooledDB( creator=pymysql, # 使用鏈接數據庫的模塊 maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數 mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建 maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制 maxshared=3, # 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。 blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯 maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制 setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='12345678', database='youku', charset='utf8', autocommit='True' )

import pymysql from orm_pool.db_pool import POOL class Mysql(object): def __init__(self): self.conn = POOL.connection() self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self, sql, args=None): self.cursor.execute(sql, args) res = self.cursor.fetchall() return res def execute(self, sql, args): try: self.cursor.execute(sql, args) except BaseException as e: print(e) if __name__ == '__main__': ms = Mysql() res = ms.select('select * from class where cid=%s',1) print(res)

from orm_pool.mysql_pool import Mysql class Field(object): def __init__(self, name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default class StringField(Field): def __init__(self, name, column_type='varchar(32)', primary_key=False, default=None): super().__init__(name, column_type, primary_key, default) class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=0): super().__init__(name, column_type, primary_key, default) class ModelMetaClass(type): def __new__(cls, name, bases, attrs): if name == 'Models': return type.__new__(cls, name, bases, attrs) table_name = attrs.get('table_name', name) # 獲取表名,如果用戶沒有自定義那么就用類名 primary_key = None mappings = {} for k, v in attrs.items(): if isinstance(v, Field): # 判斷是否我們自定義的屬性table_name,id,name等... mappings[k] = v if v.primary_key: if primary_key: raise TypeError("一張表有且只有一個主鍵") primary_key = k # 判斷是否是主鍵 # 將單個單個的自定義的字段屬性從attrs中 for k in mappings.keys(): attrs.pop(k) if not primary_key: raise TypeError('沒有設置主鍵') # 將表的三個特性表名,主鍵,字段付給類的屬性 attrs['table_name'] = table_name attrs['primary_key'] = primary_key attrs['mappings'] = mappings return type.__new__(cls, name, bases, attrs) class Models(dict, metaclass=ModelMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): # 字典對象獲取值 return self.get(item, '沒有該鍵值對!') def __setattr__(self, key, value): # 字段對象設置值 self[key] = value @classmethod def select(cls, **kwargs): ms = Mysql() if not kwargs: # 第一種查詢可能:select * from userinfo; sql = "select * from %s" % cls.table_name res = ms.select(sql) # 結果肯定是[{},{},{}]列表套字典 else: # 第二種查詢可能:select * from userinfo where id=%s或name=%s或password=%s...; key = list(kwargs.keys())[0] # 這里我們規定過濾條件只能有一個,所以這里只取一個 value = kwargs.get(key) sql = "select * from %s where %s=?" % (cls.table_name, key) sql = sql.replace('?', '%s') res = ms.select(sql, value) # 結果肯定是[{},{},{}]列表套字典 if res: # 列表推導式獲取到一個個字典,**打散成name='jason',password='123'形式,傳入類中完成實例化 return [cls(**r) for r in res] # 返回結果為[obj1,obj2,obj3...] def update(self): ms = Mysql() # update user set name='jason',password='123' where id=1; 更新操作where后統一就用主鍵 fields = [] pr = None args = [] for k,v in self.mappings.items(): if v.primary_key: pr = getattr(self,v.name,v.default) else: fields.append(v.name+'=?') args.append(getattr(self,v.name,v.default)) sql = "update %s set %s where %s=%s"%(self.table_name,','.join(fields),self.primary_key,pr) # 上句的拼接結果update user set name=?,password=? where id=1 sql = sql.replace('?','%s') ms.execute(sql,args) def save(self): ms = Mysql() # insert into user(name,password) values('jason','123') fields = [] args = [] values = [] for k,v in self.mappings.items(): if not v.primary_key: fields.append(v.name) args.append('?') values.append(getattr(self,v.name,v.default)) sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args)) # sql = "insert into user(name,password) values(?,?)" sql = sql.replace('?','%s') ms.execute(sql,values)
orm_singleton
TcpServer

import socket import json from db import models from datetime import datetime from lib import common from interface import admin_interface,user_interface,common_interface from concurrent.futures import ThreadPoolExecutor from threading import Lock from TcpServer import user_data import struct # 對於報錯消息是Address already in use from socket import SOL_SOCKET,SO_REUSEADDR pool = ThreadPoolExecutor(20) mutex = Lock() # 將生成的鎖放到user_data,避免出現交叉導入的現象,這里導了common_interface,common_interface中又要導這里 user_data.mutex = mutex func_dic = { 'register':common_interface.register, 'login':common_interface.login, 'release_notice':admin_interface.release_notice, 'check_movie':admin_interface.check_movie, 'upload_movie':admin_interface.upload_movie, 'get_movie_list':user_interface.get_movie_list, 'delete_movie':admin_interface.delete_movie, 'buy_member':user_interface.buy_member, 'download_movie':user_interface.download_movie, 'check_download_record':user_interface.check_download_record, 'check_notice':user_interface.check_notice } def get_server(): server = socket.socket() server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) # 在bind之前設置,避免服務重啟時避免報Address already in use server.bind(('127.0.0.1',8082)) server.listen(5) while True: conn,addr = server.accept() pool.submit(working,conn,addr) def working(conn,addr): while True: try: recv_header = conn.recv(4) recv_bytes = conn.recv(struct.unpack('i',recv_header)[0]) recv_dic = json.loads(recv_bytes.decode('utf-8')) # 在執行分發方法之前,將當前用戶的addr也放入接受的到的字典中 recv_dic['addr'] = str(addr) # 由於addr不是字符串,這里手動轉一下,后面需要拿它作為字典的key dispatch(recv_dic,conn) except Exception as e: print(e) conn.close() user_data.mutex.acquire() user_data.live_user.pop(str(addr)) user_data.mutex.release() break def dispatch(recv_dic,conn): if recv_dic['type'] in func_dic: func_dic.get(recv_dic['type'])(recv_dic,conn) else: back_dic = {'flag':False,'msg':'請求不合法!'} common.send_back(conn,back_dic)

live_user = {}
mutex = None

import os import sys BASE_DIR = os.path.dirname(__file__) sys.path.append(BASE_DIR) from TcpServer import tcpserver if __name__ == '__main__': tcpserver.get_server()