本次帶給大家的是WSGI-mini-web框架, 其中要下載一些網絡頁面, 大佬們不要見怪.
我所做的mini-web 支持路由, 正則表達式, 添加了log日志功能:解析了url編碼可以用
來理解WSGI協議, 一個簡單的mini-web框架帶給大家.
接下來就是服務器段的代碼, 注意大家要在python3下運行, 用到了TCP.
server:
import socket import re import multiprocessing import time # import dynamic.mini_frame import sys class WSGIServer(object): def __init__(self, port, app, static_path): # 1. 創建套接字 self.tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 2. 綁定 self.tcp_server_socket.bind(("", port)) # 3. 變為監聽套接字 self.tcp_server_socket.listen(128) self.application = app self.static_path = static_path def service_client(self, new_socket): """為這個客戶端返回數據""" # 1. 接收瀏覽器發送過來的請求 ,即http請求 # GET / HTTP/1.1 # ..... request = new_socket.recv(1024).decode("utf-8") # print(">>>"*50) # print(request) request_lines = request.splitlines() print("") print(">"*20) print(request_lines) # GET /index.html HTTP/1.1 # get post put del file_name = "" ret = re.match(r"[^/]+(/[^ ]*)", request_lines[0]) if ret: file_name = ret.group(1) # print("*"*50, file_name) if file_name == "/": file_name = "/index.html" # 2. 返回http格式的數據,給瀏覽器 # 2.1 如果請求的資源不是以.html結尾,那么就認為是靜態資源(css/js/png,jpg等) if not file_name.endswith(".html"): try: f = open(self.static_path + file_name, "rb") except: response = "HTTP/1.1 404 NOT FOUND\r\n" response += "\r\n" response += "------file not found-----" new_socket.send(response.encode("utf-8")) else: html_content = f.read() f.close() # 2.1 准備發送給瀏覽器的數據---header response = "HTTP/1.1 200 OK\r\n" response += "\r\n" # 2.2 准備發送給瀏覽器的數據---boy # response += "hahahhah" # 將response header發送給瀏覽器 new_socket.send(response.encode("utf-8")) # 將response ic.mini_frame.applicationbody發送給瀏覽器 new_socket.send(html_content) else: # 2.2 如果是以.py結尾,那么就認為是動態資源的請求 env = dict() # 這個字典中存放的是web服務器要傳遞給 web框架的數據信息 env['PATH_INFO'] = file_name # {"PATH_INFO": "/index.py"} # body = dynamic.mini_frame.application(env, self.set_response_header) body = self.application(env, self.set_response_header) header = "HTTP/1.1 %s\r\n" % self.status for temp in self.headers: header += "%s:%s\r\n" % (temp[0], temp[1]) header += "\r\n" response = header+body # 發送response給瀏覽器 new_socket.send(response.encode("utf-8")) # 關閉套接 new_socket.close() def set_response_header(self, status, headers): self.status = status self.headers = [("server", "mini_web v8.8")] self.headers += headers def run_forever(self): """用來完成整體的控制""" while True: # 4. 等待新客戶端的鏈接 new_socket, client_addr = self.tcp_server_socket.accept() # 5. 為這個客戶端服務 p = multiprocessing.Process(target=self.service_client, args=(new_socket,)) p.start() new_socket.close() # 關閉監聽套接字 self.tcp_server_socket.close() def main(): """控制整體,創建一個web 服務器對象,然后調用這個對象的run_forever方法運行""" if len(sys.argv) == 3: try: port = int(sys.argv[1]) # 7890 frame_app_name = sys.argv[2] # mini_frame:application except Exception as ret: print("端口輸入錯誤。。。。。") return else: print("請按照以下方式運行:") print("python3 xxxx.py 7890 mini_frame:application") return # mini_frame:application ret = re.match(r"([^:]+):(.*)", frame_app_name) if ret: frame_name = ret.group(1) # mini_frame app_name = ret.group(2) # application else: print("請按照以下方式運行:") print("python3 xxxx.py 7890 mini_frame:application") return with open("./web_server.conf") as f: conf_info = eval(f.read()) # 此時 conf_info是一個字典里面的數據為: # { # "static_path":"./static", # "dynamic_path":"./dynamic" # } sys.path.append(conf_info['dynamic_path']) # import frame_name --->找frame_name.py frame = __import__(frame_name) # 返回值標記這 導入的這個模板 app = getattr(frame, app_name) # 此時app就指向了 dynamic/mini_frame模塊中的application這個函數 # print(app) wsgi_server = WSGIServer(port, app, conf_info['static_path']) wsgi_server.run_forever() if __name__ == "__main__": main()
min-web代碼: 注意了我在自己電腦的文件夾下有靜態的網頁數據,所以要看運行結果的話就去down下靜態網頁, 僅供參考.
import re import urllib.parse import logging from pymysql import connect """ URL_FUNC_DICT = { "/index.html": index, "/center.html": center } """ URL_FUNC_DICT = dict() def route(url): def set_func(func): # URL_FUNC_DICT["/index.py"] = index URL_FUNC_DICT[url] = func def call_func(*args, **kwargs): return func(*args, **kwargs) return call_func return set_func @route(r"/index.html") def index(ret): with open("./templates/index.html") as f: content = f.read() # my_stock_info = "哈哈哈哈 這是你的本月名稱....." # content = re.sub(r"\{%content%\}", my_stock_info, content) # 創建Connection連接 conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') # 獲得Cursor對象 cs = conn.cursor() cs.execute("select * from info;") stock_infos = cs.fetchall() cs.close() conn.close() tr_template = """<tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td> <input type="button" value="添加" id="toAdd" name="toAdd" systemidvaule="%s"> </td> </tr> """ html = "" for line_info in stock_infos: html += tr_template % (line_info[0],line_info[1],line_info[2],line_info[3],line_info[4],line_info[5],line_info[6],line_info[7], line_info[1]) # content = re.sub(r"\{%content%\}", str(stock_infos), content) content = re.sub(r"\{%content%\}", html, content) return content @route(r"/center.html") def center(ret): with open("./templates/center.html") as f: content = f.read() # my_stock_info = "這里是從mysql查詢出來的數據。。。" # content = re.sub(r"\{%content%\}", my_stock_info, content) # 創建Connection連接 conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') # 獲得Cursor對象 cs = conn.cursor() cs.execute("select i.code,i.short,i.chg,i.turnover,i.price,i.highs,f.note_info from info as i inner join focus as f on i.id=f.info_id;") stock_infos = cs.fetchall() cs.close() conn.close() tr_template = """ <tr> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td>%s</td> <td> <a type="button" class="btn btn-default btn-xs" href="/update/%s.html"> <span class="glyphicon glyphicon-star" aria-hidden="true"></span> 修改 </a> </td> <td> <input type="button" value="刪除" id="toDel" name="toDel" systemidvaule="%s"> </td> </tr> """ html = "" for line_info in stock_infos: html += tr_template % (line_info[0],line_info[1],line_info[2],line_info[3],line_info[4],line_info[5],line_info[6], line_info[0], line_info[0]) # content = re.sub(r"\{%content%\}", str(stock_infos), content) content = re.sub(r"\{%content%\}", html, content) return content # 給路由添加正則表達式的原因:在實際開發時,url中往往會帶有很多參數,例如/add/000007.html中000007就是參數, # 如果沒有正則的話,那么就需要編寫N次@route來進行添加 url對應的函數 到字典中,此時字典中的鍵值對有N個,浪費空間 # 而采用了正則的話,那么只要編寫1次@route就可以完成多個 url例如/add/00007.html /add/000036.html等對應同一個函數,此時字典中的鍵值對個數會少很多 @route(r"/add/(\d+)\.html") def add_focus(ret): # 1. 獲取股票代碼 stock_code = ret.group(1) # 2. 判斷試下是否有這個股票代碼 conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') cs = conn.cursor() sql = """select * from info where code=%s;""" cs.execute(sql, (stock_code,)) # 如果要是沒有這個股票代碼,那么就認為是非法的請求 if not cs.fetchone(): cs.close() conn.close() return "沒有這支股票,大哥 ,我們是創業公司,請手下留情..." # 3. 判斷以下是否已經關注過 sql = """ select * from info as i inner join focus as f on i.id=f.info_id where i.code=%s;""" cs.execute(sql, (stock_code,)) # 如果查出來了,那么表示已經關注過 if cs.fetchone(): cs.close() conn.close() return "已經關注過了,請勿重復關注..." # 4. 添加關注 sql = """insert into focus (info_id) select id from info where code=%s;""" cs.execute(sql, (stock_code,)) conn.commit() cs.close() conn.close() return "關注成功...." @route(r"/del/(\d+)\.html") def del_focus(ret): # 1. 獲取股票代碼 stock_code = ret.group(1) # 2. 判斷試下是否有這個股票代碼 conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') cs = conn.cursor() sql = """select * from info where code=%s;""" cs.execute(sql, (stock_code,)) # 如果要是沒有這個股票代碼,那么就認為是非法的請求 if not cs.fetchone(): cs.close() conn.close() return "沒有這支股票,大哥 ,我們是創業公司,請手下留情..." # 3. 判斷以下是否已經關注過 sql = """ select * from info as i inner join focus as f on i.id=f.info_id where i.code=%s;""" cs.execute(sql, (stock_code,)) # 如果沒有關注過,那么表示非法的請求 if not cs.fetchone(): cs.close() conn.close() return "%s 之前未關注,請勿取消關注..." % stock_code # 4. 取消關注 # sql = """insert into focus (info_id) select id from info where code=%s;""" sql = """delete from focus where info_id = (select id from info where code=%s);""" cs.execute(sql, (stock_code,)) conn.commit() cs.close() conn.close() return "取消關注成功...." @route(r"/update/(\d+)\.html") def show_update_page(ret): """顯示修改的那個頁面""" # 1. 獲取股票代碼 stock_code = ret.group(1) # 2. 打開模板 with open("./templates/update.html") as f: content = f.read() # 3. 根據股票代碼查詢相關的備注信息 conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') cs = conn.cursor() sql = """select f.note_info from focus as f inner join info as i on i.id=f.info_id where i.code=%s;""" cs.execute(sql, (stock_code,)) stock_infos = cs.fetchone() note_info = stock_infos[0] # 獲取這個股票對應的備注信息 cs.close() conn.close() content = re.sub(r"\{%note_info%\}", note_info, content) content = re.sub(r"\{%code%\}", stock_code, content) return content @route(r"/update/(\d+)/(.*)\.html") def save_update_page(ret): """"保存修改的信息""" stock_code = ret.group(1) comment = ret.group(2) comment = urllib.parse.unquote(comment) conn = connect(host='localhost',port=3306,user='root',password='mysql',database='stock_db',charset='utf8') cs = conn.cursor() sql = """update focus set note_info=%s where info_id = (select id from info where code=%s);""" cs.execute(sql, (comment, stock_code)) conn.commit() cs.close() conn.close() return "修改成功..." def application(env, start_response): start_response('200 OK', [('Content-Type', 'text/html;charset=utf-8')]) file_name = env['PATH_INFO'] # file_name = "/index.py" """ if file_name == "/index.py": return index() elif file_name == "/center.py": return center() else: return 'Hello World! 我愛你中國....' """ logging.basicConfig(level=logging.INFO, filename='./log.txt', filemode='a', format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s') logging.info("訪問的是,%s" % file_name) try: # func = URL_FUNC_DICT[file_name] # return func() # return URL_FUNC_DICT[file_name]() for url, func in URL_FUNC_DICT.items(): # { # r"/index.html":index, # r"/center.html":center, # r"/add/\d+\.html":add_focus # } ret = re.match(url, file_name) if ret: return func(ret) else: logging.warning("沒有對應的函數....") return "請求的url(%s)沒有對應的函數...." % file_name except Exception as ret: return "產生了異常:%s" % str(ret)