一、項目總結三步驟
- 項目生命周期為基准線、分析要有層次感、不要想到什么說什么。
- 這條基准線上,負責的是哪一塊,做了什么。
- 舉例說明項目中遇到的問題及怎么解決的。
二、項目需求分析
管理員
1 注冊
2 登錄
3 上傳視頻
4 刪除視頻
5 發布公告
用戶
1 注冊
2 登錄
3 沖會員
4 查看視頻
5 下載免費視頻
6 下載收費視頻
7 查看觀影記錄
8 查看公告
三、搭建框架
層級結構:客戶端 服務端 數據庫
客戶端:
基於tcp連接的套接字程序
管理員視圖
注冊、登錄、上傳視頻、刪除視頻、發布公告
用戶視圖
注冊、登錄、購買vip、查看視頻、下載免費視頻、下載收費視頻、查看下載記錄、查看公告
服務端:
tcpserver:基於多線程實現並發的套接字通信 解決粘包問題
interface:admin_interface、user_interface、common_interface
models類和ORM框架:models類中的四張表繼承ORM框架中的基類model
數據庫:
創建四張表:user、movie、notice、download_record
四、ORM框架分析
# 優點:讓一個不懂數據庫操作的小白也能夠簡單快速操作數據庫實現相應功能
# 缺點:sql封裝固定,不利於sql查詢優化
# 對象關系映射
# 類 >>> 數據庫的表
# 對象 >>> 表的一條條的記錄
# 對象獲取屬性或方法 >>> 記錄的字段對應的值
# 一張表有字段,字段又有字段名,字段類型,字段是否是主鍵,字段的默認值
class Field(object):
pass
# 為了在定義的時候更加方便 通過繼承Field定義具體的字段類型
class StringField(Field):
pass
class IntegerField(Field):
pass
class Models(dict):
pass
def __getattr__(self,item):
return self.get(item)
def __setattr__(self,key,value)
self[key] = value
# 查詢
def select(self,**kwargs):
# select * from userinfo
# select * from userinfo where id = 1
# 新增
def save(self):
# insert into userinfo(name,password) values('jason','123')
# 修改:是基於已經存在了的數據進行修改操作
def update(self):
# update userinfo set name='jason',password='234' where id = 1
"""
(******)
hasattr
getattr
setattr
"""
# 元類攔截類的創建過程 使它具備表的特性
class ModelsMetaClass(type):
def __new__(cls,class_name,class_bases,class_attrs):
# 只攔截模型表的創建表
if class_name == 'Models':
return type.__new__(cls,class_name,calss_bases,class_attrs)
table_name = class_attrs.get('table_name',class_name)
primary_key = None
mappings = {}
for k,v in class_attrs.items():
if isinstance(v,Field):
mappings[k] = v
if v.primary:
if primary_key:
raise TypeError('主鍵重復')
primary_key = v.name
for k in mappings.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError('必須要有一個主鍵')
class_attrs['table_name'] = table_name
class_attrs['primary_key'] = primary_key
class_attrs['mappings'] = mappings
return type.__new__(cls,class_name,calss_bases,class_attrs)
五、數據庫設計
表結構定義好以后,數據怎么操作,邏輯代碼怎么實現就清晰了
用戶表:包含管理員和普通用戶信息 user--->id ,name ,password ,is_locked ,is_vip , user_type,register_time
電影表:movie---->id,name,, path,is_free, is_delete,user_id,create_time,file_md5
公告表:notice--->id,title,content,create_time,user_id
下載記錄表:download_record---->id,user_id,movie_id,create_time
create table user(
id int auto_increment primary key,
name varchar(255),
password varchar(255),
is_locked int not null default 0,
is_vip int not null default 0,
user_type varchar(255),
register_time varchar(255)
)engine=Innodb charset='utf8';
create table movie(
id int auto_increment primary key,
name varchar(64),
path varchar(255),
is_free int not null default 1,
is_delete int not null default 0,
create_time varchar(255),
file_md5 varchar(255),
user_id int
)engine=Innodb charset='utf8';
create table notice(
id int auto_increment primary key,
title varchar(255),
content varchar(255),
user_id int,
create_time varchar(255)
)engine=Innodb charset='utf8';
create table download_record(
id int auto_increment primary key,
user_id int,
movie_id int,
create_time varchar(255)
)engine=Innodb charset='utf8';
六、項目中各個功能模塊分析
管理員:
1、注冊功能
客戶端
1-1、選擇每個功能之前都需要都需要需要連接上服務器,即需要一個socket對象,每個函數傳一個client
1-2、密碼在傳遞過程中不能是明文吧,需要加密,可選擇hashlib中md5加密,定義一個公共方法咯
1-3、定義一個發送和接收的公共方法、這里要注意的是在這個方法有一個關鍵字形參、用於傳輸文件,默認為None
1-4、考慮一個問題,發送的字典中要包含哪些數據、對於注冊這個問題,包含服務器端用於標識的type的功能類型、
用戶名、密碼(要加密)、還有用戶類型"user_type"(admin或者user)這里是admin類型
1-5、接收得到的字典back_dic又包含那些數據,常見的就flag和msg,后續的功能中有返回列表類型的
服務端
1-6、首先就是基於多線程實現並發的套接字程序,子線程working函數中會先接收到客戶端發來的字典(用到json、struct模塊)
1-7、有個問題是有什么便利的方法將接收到的字典recv_dic 和與客戶端建立連接的socket對象conn 交給接口層中相應的功能進
行操作數據庫,那就定義一個分發函數dispatch(recv_dic,conn),然后判斷recv_dic["type]類型和全局func_dic字典中進行
比對,去執行與之對應的函數,如果傳過來的類型不存在func_dic字典中,那就自定義一個字典back_dic(包含flag和msg數據)
調用服務端公共發送數據方法返回給客戶端
1-8、咱們不知不覺就來到了服務端注冊接口了,意味着可以操作數據庫啦,就需要用到ORM框架和db目錄中models模塊中與表一一對應
的類、這四個類都是根據事先在數據庫中定義好的字段進行創建的,不要寫錯了,字段和類型。這四個類都繼承了ORM框架的基類
modle,所以可是直接點就可以調用ORM框架中基類中方法,select方法是類方法,得到的是一個列表套對象,還有save方法,用於保存
,還有一個update方法用於更新,那咱們回過頭來
1-9、注冊功能拿到的recv_dic中可以拿到注冊的用戶名,得到用戶名后使用user_data = models.User.select(name=name )進行判斷要注冊的
用戶是否存在,若果存在老規矩back_dic(flag為False,msg為注冊失敗)返回去,不存在那咋整,還能咋整保存到數據庫user表中唄,那
怎么保存呀,name,password,user_type,is_locked和is_vip都有默認值,register_time注冊時間的話寫個方法 time.strftime("%Y-%m-%d %X")
這樣不就全搞定了,什么數據都拿到了,那就用models.User()把這些數據搞進去創建得到一個對象,對象調用save方法進行方法就ojbk了,不急還有
要記得通知客戶端,老規矩back_dic字典,調用公共發送方法,注冊大功告成
登錄
客戶端
2-1、在注冊功能該項目的總體框架都已經打通了任督二脈,我的乖乖,那登錄功能需要考慮一個問題,客戶端如果登陸成功,是不是需要標記一下登陸狀態
,老規矩在全局定義一個字典,把返回的字典中一個session存到全局字典cookie中,解決了ojbk,
2-2、發送字典send_dic中type類型修改為login,密碼的話照樣發送密文,然后over了
服務器
2-3、還記得tcpserver模塊中的全局func_dic字典嗎?強大的地方來了,剛剛只是寫了一個注冊的映射接口,現在來了一個login類型,那咋整,就往里加一個
login的映射方法,還可以直接拿到recv_dic和conn,任督二脈打通了就是強,哦還有注冊和登錄都是管理員和普通用戶的公共方法,所以放到common_interface
中,其實放哪都一樣只要能找到就行啦 哈哈
2-4、你要登陸,邏輯點在哪里,首先我要判斷你這貨存不存在呀,不存在登陸個屁呀,淡定淡定,哈哈,上面說過select方法得到的是列表,別給老子忘了,列表里面
放的是一個個對象,models中User類調用select方法根據name=recv_dic["name"]得到user_list,如果user_list存在,那就取零號位就拿到user_obj用戶對象
2-5、拿到user_obj對象點表中的字段屬性判斷其類型和接收的recv_dic字典中類型和密碼是否一致,一致的話便可以得到一個back_dic字典了,老規矩包含flag和msg
2-6、重點來了,這里可能有帶你繞,請無關人員速速離開,要返回的back_dic字典中需要添加一個session添加到字典中,這個session是用戶登陸成功之后生成的一個
隨機字符串,咱這里也是用hashlib,這里要保證生成的字符串是唯一的,這里需要加鹽,加一個當前cpu執行代碼的時間 time.clock()
2-7、,服務端怎么校驗用戶的登陸問題,考慮兩個問題,第一個問題服務端需要保存session,第二個問題當用戶退出之后將該用戶對應的鍵值刪除?
那我們如何判斷用戶走了,運行到哪一段代碼就標記用戶走了呢,我們可不可以通過addr就可以定位哪一個用戶斷開了,找到當前用對應的數據刪除,數據保存形式
{‘addr’:[session,user_id]} 將這個東西存在哪里呢,可以放在全局,但我們這里把他存到Tcpsever目錄下user_data模塊中live_user['addr’']=[session,user_id]
那問題又來,怎么拿到add,第一種思路給每一個函數都添加addr參數,但是這個addr參數只是login函數用到,其他函數都沒用到,這樣第一種思路很不合理,第二種思路
可以通過working中接收到的recv_dic字典添加recv_dic["addr"] = str(addr) 再傳給每一個函數,在login函數中user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]
有考慮一個問題,因為多線程要操作公共數據user_data中的live_user字典,就會出現數據錯亂,所以要加鎖,那這個鎖在那里產生呢?我們要在tcpsever全局中產生mutex = Lock()
在這里產生,但是不能在這里用,因為會出現循環導入問題,tcpserver導入common_interface,在common_interface中又用到tcpserver中的鎖,相互導入就出現循環導入,解決辦法,
將鎖保存到user_data中 user_data.mutex = mutex,在login中給user_data.live_user[recv_dic["addr"]] = [session,user_obj.id]加鎖,直接導入user_data就可以使用到鎖啦
還沒完在tcpserver中 用戶退出(try...except.(下面的執行的代碼就表示其中一個線程斷開)..)就要刪除user_data.live_user.pop(str(addr)) ,這里也是公共方法需要
加鎖user_data.mutex.acquire()和user_data.mutex.release()
2-8、下面的功能都需要先登錄才能操作,這里來個裝飾器功能:校驗客戶端發過來的隨機字符串,如果有這個隨機字符串那就正常執行函數,如果沒有返回請先登錄的提示,意味着客戶端
發送的字典要帶着session過來,裝飾器inner(*args,**kwargs)中args=(recv_dic,conn) kwargs={} 拿到客戶端發過來的隨機字符串與服務器的數據進行比對 vlues=[session,user_id]
for vlues in user_data.live_user.vlues(): if args[0].get("session") == v[0]:將對應的user_id放入recv_dic中,以便后續使用args[0]["user_id"]=vlues[1] break
以上for循環不一定能找到,for循環只是單單的判斷session,然后將user_id放到接收字典recv_dic中,那被裝飾的函數到底執不執行,if args[0].get("user_id"): func(*args,**kwargs)
else: back_dic ={"flag"False,"msg":"請先登錄"} 然后調用返回函數send_back(back_dic,args[1])
3、上傳視頻
客戶端
3-1、查看有哪些影片需要上傳的,即獲取所有視頻
3-2、判斷影片是否存在才能上傳,那應該怎么判斷是個問題,我們能不能對上傳的視頻文件進行hashlib,自定義被hash的數據可以在文件開頭,1/3,2/3,末尾-10然后得到md5值
發送字典類型"check_movie",包含"session","file_md5",得到字典back_dic,如果視頻不存在那要輸入is_free,是否免費,然后在發字典send_dic,該字典類型為"upload_movie",還包含
"session"、"file_name"、 "file_size"、"file_md5",這里調用公共收發方法是要給文件file傳參了,把上傳文件路徑傳過去
服務端
3-3、還記得tcpserver模塊中的全局func_dic字典嗎?加上"check_movie"和"upload_movie"映射,映射函數全都加上裝飾器
3-4、"check_movie"比較簡單,只是查看要上傳視頻的file_md5是否在數據庫,注意數據庫中存的只是文件地址而已,不是真實的視頻文件
3-5、這里為了避免上傳的視頻名字是一樣的但是內容不一樣,所以文件名應該盡量取的唯一,所以給傳來的file_name加上一個隨機字符串,就直接調用之前定義的 get_session方法即可
3-6、這里要拼接文件的存放路徑了,根據file_size循環寫入文件
3-7、生成一個 movie_obj 電影對象,調用save方法保存,然后返回back_dic說明上傳成功
4、刪除視頻
客戶端
4-1、先查詢出所有沒有被刪除的電影列表,即send_dic字典中"type"為'get_movie_list' 和'movie_type'為"all",返回的電影列表可以全部是收費,全部是免費,收費免費都有,這里需要注意的是獲取所有視頻列表考
慮的不周全,如果單從管理員角度要獲得所有視頻不考慮用戶獲取收費或者免費的視頻,會出現一些代碼冗余,所以在獲取所有視頻這個功能要判斷傳過來的的movie_type是all、free、charge
4-2、拿到所有視頻列表movie_list,該列表的格式[電影名稱,是否免費收費,電影id]發送字典send_dic中"type"為"delete_movie"和delete_movie_id'為movie_list[choice-1][2]
服務端
4-3、還記得tcpserver模塊中的全局func_dic字典嗎?加上'get_movie_list'和"delete_movie"映射,映射函數全都加上裝飾器
4-4、刪除電影不是真的刪除,只是找到每一個電影對象,然后點is_delete屬性改為1即可,所以get_movie_list方法會先獲得所有對象列表,遍歷列表得到每一個對象,對每一個對象的is_delete屬性進行判斷,注意還要判斷
ecv_dic['movie_type'],這里是“all”類型,滿足的全部添加到一個返回的列表中back_movie_list,然后返回給客戶端
4-5、delete_movie方法的話 movie_list = models.Movie.select(id=recv_dic.get("delete_movie_id"))然后對列表去索引得到一個電影對象,然后修改movie_obj.is_delete,然后調用update()方法更新,然后返回back_cic
5、發布公告
客戶端
5-1 公告包含title和content 發送的字典send_dic包含"type"為"release_notice"、"session"、"title"、"content"
服務端、
5-2、這里需要知道接受的字典recv_dic是包含user_id字段的,要寫入表notice時用到
5-3、也是創建表notice對象,然后調用save方法保存
普通用戶
1、注冊
直接調用公共注冊方法
2、登錄
直接調用公共登錄,在全局添加user_dic中保存session和is_vip
3、購買會員
客戶端
3-1、判斷全局user_dic['is_vip']可知道是否是會員
3-2、如果不是的話,讓用戶選擇是否購買會員,購買的話最后要修改全局
服務端
3-3、根據recv_dic["user_id"]判斷是哪一個用戶要購買會員,得到的對象點is_vip屬性修改為1,調用update(0方法保存
4、查看所有視頻
客戶端
4-1、發送字典send_dic里面的type為'get_movie_list','movie_type為'all'
服務器
4-2、直接調用之前寫好的get_movie_list方法即可 這和管理員中刪除視頻就先獲取所有視頻
5、下載免費電影
客戶端
5-1、先列出所有免費電影,和上個功能差不多,只是'movie_type'改為'free'
5-2、再發送字典send_dic中'type'為'download_movie' 'movie_id'為movie_list[choice-1][2]
5-3、接受得到的字典back_dic中有一個wait_time 打印可能是0或者30秒 拼接下載的路徑,循環寫入文件
服務端
5-4、id=recv_dic.get('movie_id')來得到電影列表movie_list,然后索引取值得到電影對象
5-5 id=recv_dic['user_id']來得到用戶列表索引取得用戶對象user_obj
5-6、下載電影的話先判斷使用是否是vip,vip的話不需要等待30秒 不是的話需要等待30秒
5-7、更新下載記錄到down_record表中
5-8、循環發送文件
5-9、發送字典back_dic
6、下載收費電影
客戶端
6-1、針對普通用戶和vip用戶下載收費視頻收費標准不一樣(5元 10元)
6-2、發送字典send_dic 中還是'get_movie_list'但是電影類型為收費'movie_type':'charge'
6-3、剩下功能和下載免費電影差不多
服務器
同上
7、查看下載記錄
客戶端
7-1、發送字典send_dic 中的類型'check_download_record'
7-2、接受字典back_dic進行判斷即可
服務端
7-3、還記得tcpserver模塊中的全局func_dic字典嗎?加上'check_download_record'的映射方法
7-4、要查看下載記錄 先根據用戶id得到一個記錄列表,循環該列表得到的是每一個記錄對象
7-5、根據每一個對象點movie_id 和電影id判斷得到電影列表,索引取值得到各個對象
7-6、把每一個對象的名字添加到一個自定義的列表中,用於返回給客戶端
8、查看公告
客戶端
8-1、發送字典send_dic 中的類型'check_notice'
8-2、接受字典back_dic進行判斷即可
服務端
8-3、還記得tcpserver模塊中的全局func_dic字典嗎?加上'check_notice'的映射方法
8-4、Notice類調用select方法得到公告列表
8-5、列表存在的話 遍歷該列表得到每一個對象,返回字典中保存對象點title,點content進行返回
七、項目中遇到的問題及怎么解決的
- 校驗登陸問題(服務端必須校驗,客戶端無所謂)
- 獲取所有視頻列表考慮的不周全,如果單從管理員角度要獲得所有視頻不考慮用戶獲取收費或者免費的視頻,會出現一些代碼冗余,所以在獲取所有視頻這個功能要判斷傳過來的的movie_type是all、free、charge
- 服務端怎樣標識客戶端問題:cookie保存到客戶端、session保存到服務器user_data文件中
- 從客戶端到數據庫一頓操作打通以后遇到最多的問題有字段打錯了
八、客戶端代碼框架
8.1 conf
8.1.1 setting
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
UPDATE_MOVIE = os.path.join(BASE_DIR, 'update_movie')
DOWNLOAD_MOVIE_DIR = os.path.join(BASE_DIR, 'download_movie')
8.2 core
8.2.1 src
from core import admin, user
func_dic = {"1": admin.admin_view, "2": user.user_view}
def run():
while True:
print("""
1、管理員視圖
2、普通用戶視圖
q、退出
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)()
8.2.2 admin
import os
from Tcpclient import tcpclient
from lib import common
from conf import setting
user_info = {
"session": None
}
def register(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
re_password = input("please agan input your password>>:").strip()
if password != re_password:
print("兩次密碼不一致")
continue
send_dic = {"type": "register", "name": name, "password": common.get_md5(password), "user_type": "admin"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic["msg"])
def login(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
send_dic = {"type": "login", "name": name, "password": common.get_md5(password), "user_type": "admin"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
user_info["session"] = back_dic["session"]
break
else:
print(back_dic["msg"])
def update_movie(client):
"""
思路:
1、是不是要先獲取有哪些可以上傳的視頻
2、選擇好要上傳的視頻后是不是還要判斷服務器存不存在,存在了就不需要上傳了
3、那就需要校驗視頻文件,可自定義校驗規則
4、循環上傳
:param client:
:return:
"""
while True:
movie_list = common.get_movie()
if not movie_list:
print("暫無影片上傳")
return
for i, m in enumerate(movie_list, start=1):
print("%s:%s" % (i, m))
choice = input("please input your choice>>: ").strip()
if choice == 'q': break
if choice.isdigit():
choice = int(choice)
if choice in range(1, len(movie_list) + 1):
file_path = os.path.join(setting.UPDATE_MOVIE, movie_list[choice - 1])
file_md5 = common.get_file_md5(file_path)
send_dic = {"type": "check_movie", "session": user_info["session"], "file_md5": file_md5}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
# 如果可以上傳,那標識上傳免費還是收費
is_free = input("上傳的影片是否免費(y/n)>>:").strip()
is_free = 1 if is_free == 'y' else 0
file_name = movie_list[choice - 1]
send_dic = {"type": "update_movie", "session": user_info["session"], "is_free": is_free,
"file_name": file_name, "file_md5": file_md5, "file_size": os.path.getsize(file_path)}
back_dic = common.send_back(send_dic, client, file_path)
if back_dic["flag"]:
print(back_dic["msg"])
return
else:
print(back_dic["msg"])
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("input choice must be a number !")
def delete_movie(client):
"""
思路:
1、先從服務器獲取所有視頻
2、要刪除的發給服務器
:param client:
:return:
"""
send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "all"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
"""
服務器的get_movie_list會返回一個電影列表,列表里面為[電影名,收費或免費,電影id]
"""
movie_list = back_dic["movie_list"]
for i, m in enumerate(movie_list, start=1):
print("%s:%s-%s" % (i, m[0], m[1]))
choice = input("input your delete movie>>:").strip()
if choice.isdigit():
choice = int(choice)
if choice in range(1, len(movie_list) + 1):
send_dic = {"type": "delete_movie", "session": user_info["session"],
"movie_id": movie_list[choice - 1][2]}
back_dic = common.send_back(send_dic, client)
if back_dic['flag']:
print(back_dic['msg'])
return
else:
print(back_dic['msg'])
else:
print('choice noe in range')
else:
print(back_dic['msg'])
def release_notice(client):
while True:
title = input("please input title>>:").strip()
content = input("please input content>>:").strip()
send_dic = {"type": "release_notice", "session": user_info["session"], "title": title, "content": content}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic['msg'])
break
func_dic = {
"1": register,
"2": login,
"3": update_movie,
"4": delete_movie,
"5": release_notice
}
def admin_view():
client = tcpclient.get_client()
while True:
print("""
1、注冊
2、登錄
3、上傳電影
4、刪除電影
5、發布公告
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)(client)
8.2.3 user
import os
import time
from Tcpclient import tcpclient
from conf import setting
from lib import common
user_info = {
"session": None,
"is_vip": None
}
def register(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
re_password = input("please agan input your password>>:").strip()
if password != re_password:
print("兩次密碼不一致")
continue
send_dic = {"type": "register", "name": name, "password": common.get_md5(password), "user_type": "user"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic["msg"])
def login(client):
while True:
name = input("please input your name>>:").strip()
password = input("please input your password>>:").strip()
send_dic = {"type": "login", "name": name, "password": common.get_md5(password), "user_type": "user"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
user_info["session"] = back_dic["session"]
user_info["is_vip"] = back_dic["is_vip"]
break
else:
print(back_dic["msg"])
def buy_vip(client):
while True:
buy_vip = input("是否購買會員(y/n)>>:").strip()
if buy_vip == 'q': break
if buy_vip not in ['y', 'n']:
print("輸入有誤")
continue
if buy_vip == 'y':
send_dic = {"type": "buy_vip", "session": user_info["session"]}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print(back_dic["msg"])
break
else:
print(back_dic["msg"])
else:
print("歡迎下次購買")
break
def check_movie(client):
send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "all"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
movie_list = back_dic["movie_list"]
for i, m in enumerate(movie_list, start=1):
print("%s:%s-%s" % (i, m[0], m[1]))
else:
print(back_dic["msg"])
def download_free_movie(client):
send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "free"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
movie_list = back_dic["movie_list"]
for i, m in enumerate(movie_list, start=1):
print("%s:%s-%s" % (i, m[0], m[1]))
while True:
choice = input("請選擇要下載的電影編號>>:").strip()
if choice == 'q': break
if choice.isdigit():
choice = int(choice)
if choice in range(1, len(movie_list) + 1):
send_dic = {"type": "download_movie", "session": user_info["session"],
"movie_id": movie_list[choice - 1][2], "movie_type": "free"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print("請等待》》》")
time.sleep(back_dic["wait_time"])
file_path = os.path.join(setting.DOWNLOAD_MOVIE_DIR, back_dic["file_name"])
recv_size = 0
with open(file_path, 'wb') as f:
while recv_size < back_dic["file_size"]:
data = client.recv(1024)
f.write(data)
recv_size += len(data)
print("下載成功")
return
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("choice must be number")
else:
print(back_dic["msg"])
def download_charge_movie(client):
if user_info["is_vip"]:
charge = input("請支付10元(y/n)>>:").strip()
else:
charge = input('請支付20元(y/n)>>:').strip()
if charge != "y":
print("慢走 不送")
return
send_dic = {"type": "get_movie_list", "session": user_info["session"], "movie_type": "charge"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
movie_list = back_dic["movie_list"]
for i, m in enumerate(movie_list, start=1):
print("%s:%s-%s" % (i, m[0], m[1]))
while True:
choice = input("請選擇要下載的電影編號>>:").strip()
if choice == 'q': break
if choice.isdigit():
choice = int(choice)
if choice in range(1, len(movie_list) + 1):
send_dic = {"type": "download_movie", "session": user_info["session"],
"movie_id": movie_list[choice - 1][2], "movie_type": "free"}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
print("請等待》》》")
time.sleep(back_dic["wait_time"])
file_path = os.path.join(setting.DOWNLOAD_MOVIE_DIR, back_dic["file_name"])
recv_size = 0
with open(file_path, 'wb') as f:
while recv_size < back_dic["file_size"]:
data = client.recv(1024)
f.write(data)
recv_size += len(data)
print("下載成功")
return
else:
print(back_dic["msg"])
else:
print("choice not in range")
else:
print("choice must be number")
else:
print(back_dic["msg"])
def download_movie_record(client):
"""
思路:當前登錄的用戶需要查看自己的觀影記錄,需要得到電影名
:param client:
:return:
"""
send_dic = {"type": "download_movie_record", "session": user_info["session"]}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
back_record_list = back_dic['back_record_list']
for m in back_record_list:
print(m)
else:
print(back_dic['msg'])
def check_notice(client):
"""
查看公告思路:
:param client:
:return:
"""
send_dic = {"type": "check_notice", "session": user_info["session"]}
back_dic = common.send_back(send_dic, client)
if back_dic["flag"]:
back_record_list = back_dic['back_notice_list']
for m in back_record_list:
print(m)
else:
print(back_dic['msg'])
func_dic = {
"1": register,
"2": login,
"3": buy_vip,
"4": check_movie,
"5": download_free_movie,
"6": download_charge_movie,
"7": download_movie_record,
"8": check_notice
}
def user_view():
client = tcpclient.get_client()
while True:
print("""
1、注冊
2、登錄
3、購買會員
4、查看所有電影
5、下載免費電影
6、下載收費電影
7、查看觀影記錄
8、查看公告
""")
choice = input("please choice your number>>:").strip()
if choice == 'q': break
if choice not in func_dic:
print("choice err not in range")
continue
func_dic.get(choice)(client)
8.3 lib
8.3.1 common
import hashlib
import json
import os
import struct
from conf import setting
def send_back(send_dic, client, file=None):
json_bytes = json.dumps(send_dic).encode("utf-8")
client.send(struct.pack('i', len(json_bytes)))
client.send(json_bytes)
if file:
with open(file, 'rb') as f:
for line in f:
client.send(line)
recv_len = struct.unpack('i', client.recv(4))[0]
recv_dic = json.loads(client.recv(recv_len).decode("utf-8"))
return recv_dic
def get_md5(password):
md = hashlib.md5()
md.update(password.encode("utf-8"))
return md.hexdigest()
def get_movie():
movie_list = os.listdir(setting.UPDATE_MOVIE)
return movie_list
def get_file_md5(path):
md = hashlib.md5()
file_size = os.path.getsize(path)
file_list = [0, file_size // 3, (file_size // 3) * 2, file_size - 10]
with open(path, "rb") as f:
for line in file_list:
f.seek(line)
md.update(f.read(10))
return md.hexdigest()
8.4 Tcpclient
8.4.1 tcpclient
import socket
def get_client():
client = socket.socket()
client.connect(("127.0.0.1", 1688))
return client
8.5 start
8.5.1 start
import os, sys
from core import src
sys.path.append(os.path.dirname(__file__))
if __name__ == '__main__':
src.run()
九、服務端框架
9.1 conf
9.1.1 setting
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
MOVIE_DIR = os.path.join(BASE_DIR, 'movie_dir')
9.2 db
9.2.1 models
from orm_pool.orm import Models, StringField, IntegerField
class User(Models):
table_name = 'user'
id = IntegerField("id", primary_key=True)
name = StringField("name")
password = StringField("password")
is_locked = IntegerField("is_locked", default=0)
is_vip = IntegerField("is_vip", default=0)
user_type = StringField("user_type")
register_time = StringField("register_time")
class Movie(Models):
table_name = "movie"
id = IntegerField("id", primary_key=True)
name = StringField("name", column_type="varchar(64)")
path = StringField("path")
is_free = IntegerField("is_free")
is_delete = IntegerField("is_delete", default=0)
create_time = StringField("create_time")
user_id = IntegerField("user_id")
file_md5 = StringField("file_md5")
class Notice(Models):
table_name = "notice"
id = IntegerField("id", primary_key=True)
title = StringField("title")
content = StringField("content")
user_id = IntegerField("user_id")
create_time = StringField("create_time")
class DownloadRecord(Models):
table_name = "download_record"
id = IntegerField("id", primary_key=True)
user_id = IntegerField("user_id")
movie_id = IntegerField("movie_id")
create_time = StringField("create_time")
9.3 interface
9.3.1 admin_interface
import os
from db import models
from lib import common
from conf import setting
@common.login_auth
def check_movie(recv_dic, conn):
movie_data = models.Movie.select(file_md5=recv_dic["file_md5"])
if movie_data:
back_dic = {"flag": False, "msg": "該電影已存在"}
else:
back_dic = {"flag": True, "msg": "可以上傳"}
common.send_back(back_dic, conn)
@common.login_auth
def update_movie(recv_dic, conn):
file_name = common.get_session(recv_dic["file_name"]) + recv_dic["file_name"]
file_path = os.path.join(setting.MOVIE_DIR, file_name)
print(recv_dic)
recv_size = 0
with open(file_path, 'wb') as f:
while recv_size < recv_dic["file_size"]:
data = conn.recv(1024)
f.write(data)
recv_size += len(data)
movie_obj = models.Movie(name=file_name, path=file_path, is_free=recv_dic.get("is_free"), is_delete=0,
create_time=common.get_time(), user_id=recv_dic.get("user_id"),
file_md5=recv_dic.get("file_md5"))
movie_obj.save()
back_dic = {"flag": True, "msg": "上傳成功"}
common.send_back(back_dic, conn)
@common.login_auth
def delete_movie(recv_dic, conn):
movie_obj = models.Movie.select(id=recv_dic.get('movie_id'))[0]
movie_obj.is_delete = 1
movie_obj.update()
back_dic = {"flag": True, "msg": "刪除成功"}
common.send_back(back_dic, conn)
@common.login_auth
def release_notice(recv_dic, conn):
title = recv_dic["title"]
content = recv_dic["content"]
user_id = recv_dic["user_id"]
create_time = common.get_time()
notice_obj = models.Notice(title=title, content=content, user_id=user_id, create_time=create_time)
notice_obj.save()
back_dic = {"flag": True, "msg": "發布成功"}
common.send_back(back_dic, conn)
9.3.2 common_interface
from db import models
from lib import common
from Tcpserver import user_data
def register(recv_dic, conn):
user_list = models.User.select(name=recv_dic["name"])
if user_list:
back_dic = {"flag": False, "msg": "用戶已存在"}
common.send_back(back_dic, conn)
return
user_obj = models.User(name=recv_dic["name"], password=recv_dic["password"], is_locked=0, is_vip=0,
user_type=recv_dic["user_type"], register_time=common.get_time())
user_obj.save()
back_dic = {"flag": True, "msg": "注冊成功"}
common.send_back(back_dic, conn)
def login(recv_dic, conn):
user_list = models.User.select(name=recv_dic["name"])
if user_list:
user_obj = user_list[0]
if user_obj.user_type == recv_dic["user_type"]:
if user_obj.password == recv_dic["password"]:
back_dic = {"flag": True, "msg": "登陸成功", "is_vip": user_obj.is_vip}
# 獲取每個用戶的唯一隨機字符串,用於標識每個用戶
session = common.get_session(user_obj.name)
back_dic["session"] = session
# 服務端要記錄正在登錄的客戶端,將數據user_data文件live_user字典中,放在為了更好的標識
# 每一個客戶,字典的key為recv_dic["addr"] -----他是一個元組包含ip和端口,值的話是一個列表
# 保存每一個session和用戶id
# 因為時公共數據,且並發會造成數據錯亂,咱們給他來個鎖
user_data.mutex.acquire()
user_data.live_user[recv_dic["addr"]] = [session, user_obj.id]
user_data.mutex.release()
else:
back_dic = {"flag": False, "msg": "密碼不正確"}
else:
back_dic = {"flag": False, "msg": "用戶類型不對"}
else:
back_dic = {"flag": False, "msg": "用戶不存在"}
common.send_back(back_dic, conn)
@common.login_auth
def get_movie_list(recv_dic, conn):
"""
要給調用者返回相應的電影列表:all、free、charge
:param recv_dic:
:param conn:
:return:
"""
movie_list = models.Movie.select()
if movie_list:
back_movie_list = []
for movie_obj in movie_list:
if not movie_obj.is_delete:
if recv_dic["movie_type"] == "all":
back_movie_list.append([movie_obj.name, '免費' if movie_obj.is_free else '收費', movie_obj.id])
elif recv_dic["movie_type"] == "free":
if movie_obj.is_free:
back_movie_list.append([movie_obj.name, '免費', movie_obj.id])
else:
if not movie_obj.is_free:
back_movie_list.append([movie_obj.name, '收費', movie_obj.id])
if back_movie_list:
back_dic = {"flag": True, "movie_list": back_movie_list}
else:
back_dic = {"flag": False, "msg": "暫無影片"}
else:
back_dic = {"flag": False, "msg": "暫無影片"}
common.send_back(back_dic, conn)
9.3.3 user_interface
import os
from conf import setting
from db import models
from lib import common
@common.login_auth
def buy_vip(recv_dic, conn):
user_obj = models.User.select(id=recv_dic['user_id'])[0]
if user_obj.is_vip:
back_dic = {"flag": False, "msg": "您已經是會員啦"}
else:
user_obj.is_vip = 1
user_obj.save()
back_dic = {"flag": True, "msg": "購買成功"}
common.send_back(back_dic, conn)
@common.login_auth
def download_movie(recv_dic, conn):
"""
下載電影功能:普通用戶下載需要等待30秒 VIP用下載不需要等待
:param recv_dic:
:param conn:
:return:
"""
movie_list = models.Movie.select(id=recv_dic["movie_id"])
if movie_list:
movie_obj = movie_list[0]
file_path = movie_obj.path
user_obj = models.User.select(id=recv_dic["user_id"])[0]
wait_time = 0
if recv_dic["movie_type"] == "free":
if user_obj.is_vip:
wait_time = 0
else:
wait_time = 30
back_dic = {"flag": True, "file_name": movie_obj.name, "file_size": os.path.getsize(file_path),
"wait_time": wait_time}
download_record = models.DownloadRecord(user_id=user_obj.id, movie_id=movie_obj.id,
create_time=common.get_time())
download_record.save()
common.send_back(back_dic, conn)
with open(file_path, 'rb') as f:
for line in f:
conn.send(line)
else:
back_dic = {"flag": False, "msg": "暫無影片"}
common.send_back(back_dic, conn)
@common.login_auth
def download_movie_record(recv_dic, conn):
record_list = models.DownloadRecord.select(user_id=recv_dic["user_id"])
back_record_list = []
if record_list:
for m in record_list:
movie_obj = models.Movie.select(id=m.movie_id)[0]
back_record_list.append(movie_obj.name)
back_dic = {"flag": True, "back_record_list": back_record_list}
else:
back_dic = {"flag": False, "msg": "暫無下載記錄"}
common.send_back(back_dic, conn)
@common.login_auth
def check_notice(recv_dic, conn):
notice_list = models.Notice.select()
back_notice_list = []
if notice_list:
for notice_obj in notice_list:
back_notice_list.append([notice_obj.title, notice_obj.content])
back_dic = {"flag": True, "back_notice_list": back_notice_list}
else:
back_dic = {"flag": False, "msg": "暫無下載記錄"}
common.send_back(back_dic, conn)
9.4 lib
9.4.1 common
import hashlib
import json
import struct
import time
from functools import wraps
from Tcpserver import user_data
def send_back(back_dic, conn):
json_bytes = json.dumps(back_dic).encode("utf-8")
conn.send(struct.pack('i', len(json_bytes)))
conn.send(json_bytes)
def get_time():
return time.strftime("%Y-%m-%d %X")
def get_session(name):
# 為了保證每個用戶的隨機字符串是唯一的,不僅要對每一個用戶名加密還要加上cpu執行時機----加鹽
md = hashlib.md5()
md.update(str(time.clock()).encode("utf-8"))
md.update(name.encode("utf-8"))
return md.hexdigest()
def login_auth(func):
@wraps(func)
def inner(*args, **kwargs):
# args=(recv_dic,conn)
# 登錄了以后服務端user_data文件中live_user就有存在用戶登陸的數據,如果沒有登陸就沒有數據,可以為此
# 來作為判斷是否登錄的依據
for values in user_data.live_user.values():
# values = [session,user_id]
if args[0]["session"] == values[0]:
args[0]["user_id"] = values[1]
break
if args[0].get("user_id"):
func(*args, **kwargs)
else:
back_dic = {"flag": False, "msg": "請先登錄"}
send_back(back_dic, args[1])
return inner
9.5 orm_pool
9.5.1 db_pool
from DBUtils.PooledDB import PooledDB
import pymysql
POOL = PooledDB(
creator=pymysql, # 使用鏈接數據庫的模塊
maxconnections=6, # 連接池允許的最大連接數,0和None表示不限制連接數
mincached=2, # 初始化時,鏈接池中至少創建的空閑的鏈接,0表示不創建
maxcached=5, # 鏈接池中最多閑置的鏈接,0和None不限制
maxshared=3,
# 鏈接池中最多共享的鏈接數量,0和None表示全部共享。PS: 無用,因為pymysql和MySQLdb等模塊的 threadsafety都為1,所有值無論設置為多少,_maxcached永遠為0,所以永遠是所有鏈接都共享。
blocking=True, # 連接池中如果沒有可用連接后,是否阻塞等待。True,等待;False,不等待然后報錯
maxusage=None, # 一個鏈接最多被重復使用的次數,None表示無限制
setsession=
[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='youku',
charset='utf8',
autocommit='True')
9.5.2 mysql_singleton
import pymysql
from orm_pool import db_pool
class Mysql(object):
def __init__(self):
self.conn = db_pool.POOL.connection()
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
def close(self):
self.cursor.close()
self.conn.close()
def select(self, sql, args=None):
self.cursor.execute(sql, args)
res = self.cursor.fetchall() # 列表套字典
return res
def execute(self, sql, args):
try:
self.cursor.execute(sql, args)
except BaseException as e:
print(e)
9.5.3 orm
from orm_pool.mysql_singleton import Mysql
# 定義字段類
class Field(object):
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# 定義具體的字段
class StringField(Field):
def __init__(self,
name,
column_type='varchar(255)',
primary_key=False,
default=None):
super().__init__(name, column_type, primary_key, default)
class IntegerField(Field):
def __init__(self,
name,
column_type='int',
primary_key=False,
default=None):
super().__init__(name, column_type, primary_key, default)
class ModelMetaClass(type):
def __new__(cls, class_name, class_bases, class_attrs):
# 我僅僅只想攔截模型表的類的創建過程
if class_name == 'Models':
return type.__new__(cls, class_name, class_bases, class_attrs)
# 給類放表名,主鍵字段,所有字段
table_name = class_attrs.get('table_name', class_name)
# 定義一個存儲主鍵的變量
primary_key = None
# 定義一個字典用來存儲用戶自定義的表示表的所有字段信息
mappings = {}
# for循環當前類的名稱空間
for k, v in class_attrs.items():
if isinstance(v, Field):
mappings[k] = v
if v.primary_key:
if primary_key:
raise TypeError("主鍵只能有一個")
primary_key = v.name
# 將重復的鍵值對刪除
for k in mappings.keys():
class_attrs.pop(k)
if not primary_key:
raise TypeError('必須要有一個主鍵')
# 將處理好的數據放入class_attrs中
class_attrs['table_name'] = table_name
class_attrs['primary_key'] = primary_key
class_attrs['mappings'] = mappings
return type.__new__(cls, class_name, class_bases, class_attrs)
class Models(dict, metaclass=ModelMetaClass):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __getattr__(self, item):
return self.get(item, '沒有該鍵值對')
def __setattr__(self, key, value):
self[key] = value
# 查詢方法
@classmethod
def select(cls, **kwargs):
ms = Mysql()
# select * from userinfo
if not kwargs:
sql = 'select * from %s' % cls.table_name
res = ms.select(sql)
else:
# select * from userinfo where id = 1
k = list(kwargs.keys())[0]
v = kwargs.get(k)
sql = 'select * from %s where %s=?' % (cls.table_name, k)
# select * from userinfo where id = ?
sql = sql.replace('?',
'%s') # select * from userinfo where id = %s
res = ms.select(sql, v)
if res:
return [cls(**r) for r in res] # 將數據庫的一條數據映射成類的對象
# 新增方法
def save(self):
ms = Mysql()
# insert into userinfo(name,password) values('jason','123')
# insert into %s(%s) values(?)
fields = [] # [name,password]
values = []
args = []
for k, v in self.mappings.items():
if not v.primary_key: # 將id字段去除 因為新增一條數據 id是自動遞增的不需要你傳
fields.append(v.name)
args.append('?')
values.append(getattr(self, v.name))
# insert into userinfo(name,password) values(?,?)
sql = "insert into %s(%s) values(%s)" % (
self.table_name, ','.join(fields), ','.join(args))
# insert into userinfo(name,password) values(?,?)
sql = sql.replace('?', '%s')
ms.execute(sql, values)
# 修改方法:基於已經存在了的數據進行一個修改操作
def update(self):
ms = Mysql()
# update userinfo set name='jason',password='123' where id = 1
fields = [] # [name,password]
values = []
pr = None
for k, v in self.mappings.items():
if v.primary_key:
pr = getattr(self, v.name, v.default)
else:
fields.append(v.name + '=?')
values.append(getattr(self, v.name, v.default))
sql = 'update %s set %s where %s = %s' % (
self.table_name, ','.join(fields), self.primary_key, pr)
# update userinfo set name='?',password='?' where id = 1
sql = sql.replace('?', '%s')
ms.execute(sql, values)
# if __name__ == '__main__':
# class Teacher(Models):
# table_name = 'teacher'
# tid = IntegerField(name='tid',primary_key=True)
# tname = StringField(name='tname')
# obj = Teacher(tname='jason老師')
# obj.save()
# res = Teacher.select()
# for r in res:
# print(r.tname)
# print(res)
# res = Teacher.select(tid=1)
# teacher_obj = res[0]
# teacher_obj.tname = 'jason老師'
# teacher_obj.update()
# res1 = Teacher.select()
# print(res1)
# class User(Models):
# table_name = 'User'
# id = IntegerField(name='id', primary_key=True)
# name = StringField(name='name')
# password = StringField(name='password')
# print(User.primary_key)
# print(User.mappings)
# obj = User(name='jason')
# print(obj.table_name)
# print(obj.primary_key)
# print(obj.mappings)
9.6 Tcpserver
9.6.1 tcpserver
import json
import socket
import struct
import traceback
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from Tcpserver import user_data
from interface import common_interface, admin_interface, user_interface
from lib import common
pool = ThreadPoolExecutor(20)
#全局產生鎖,為了避免循環導入問題,將產生的鎖放到user_data中
mutex = Lock()
user_data.mutex = mutex
func_dic = {
"register": common_interface.register,
"login": common_interface.login,
"check_movie": admin_interface.check_movie,
"update_movie": admin_interface.update_movie,
"get_movie_list": common_interface.get_movie_list,
"delete_movie": admin_interface.delete_movie,
"release_notice": admin_interface.release_notice,
"buy_vip": user_interface.buy_vip,
"download_movie": user_interface.download_movie,
"download_movie_record": user_interface.download_movie_record,
"check_notice": user_interface.check_notice
}
def get_server():
server = socket.socket()
server.bind(("127.0.0.1", 1688))
server.listen(5)
while True:
conn, addr = server.accept()
pool.submit(working, conn, addr)
def working(conn, addr):
while True:
try:
recv_header = conn.recv(4)
recv_bytes = conn.recv(struct.unpack('i', recv_header)[0])
recv_dic = json.loads(recv_bytes.decode("utf-8"))
recv_dic["addr"] = str(addr)
dispatch(recv_dic, conn)
except Exception as e:
traceback.print_exc()
conn.close()
# 當用戶斷開以后,服務器就無需保存表示他的數據
user_data.mutex.acquire()
user_data.live_user.pop(str(addr))
user_data.mutex.release()
break
def dispatch(recv_dic, conn):
if recv_dic.get("type") in func_dic:
func_dic.get(recv_dic["type"])(recv_dic, conn)
else:
back_dic = {"flag": False, "msg": "類型不合法"}
common.send_back(back_dic, conn)
9.6.2 user_data
live_user={}
mutex=None
9.7 start
9.7.1 start
import os, sys
from Tcpserver import tcpserver
sys.path.append(os.path.dirname(__file__))
if __name__ == '__main__':
tcpserver.get_server()