python項目——新聞管理系統


 

 

 

 

 

 

DAO(Data Access Object) 數據訪問對象是一個面向對象的數據庫接口
控制台的輸入輸出都是再app.py里面完成的

 

 

 

 

 

 mysql_db.py

import mysql.connector.pooling

__config = {
    "host":"192.168.111.153",
    "port":3306,
    "user":"quan",
    "password":"2004",
    "database":"vega"
}

try:
    pool = mysql.connector.pooling.MySQLConnectionPool(
        **__config,
        pool_size=10
    )
except Exception as e:
    print(e)

 

 news_dao.py

from db.mysql_db import pool


class NewDao:
    #查詢待審批新聞列表
    def search_unreview_list(self,page):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT n.id,n.title,n.state,t.type,u.username " \
                  "FROM t_news AS n JOIN t_type AS t ON n.type_id = t.id " \
                  "JOIN t_user AS u ON n.editor_id = u.id " \
                  "WHERE n.state = %s " \
                  "ORDER BY n.create_time DESC " \
                  "LIMIT %s,%s"
            cursor.execute(sql, ("待審批",(page-1)*10,10))
            result = cursor.fetchall()
            return result
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #查詢待審批新聞的總頁數
    def search_unreview_count_page(self):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT CEIL(COUNT(*)/10) FROM t_news WHERE state = %s"
            cursor.execute(sql, ["待審批"])
            count_page = cursor.fetchone()[0]
            return count_page
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #審批新聞
    def update_unreview_news(self,id):
        try:
            con = pool.get_connection()
            con.start_transaction()
            cursor = con.cursor()
            sql = "UPDATE t_news SET state = %s WHERE id = %s"
            cursor.execute(sql, ("已審批",id))
            con.commit()
        except Exception as e:
            if "con" == dir():
                con.rollback()
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #查詢新聞列表
    def search_list(self,page):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT n.id,n.title,n.state,t.type,u.username " \
                  "FROM t_news AS n JOIN t_type AS t ON n.type_id = t.id " \
                  "JOIN t_user AS u ON n.editor_id = u.id " \
                  "ORDER BY n.create_time DESC " \
                  "LIMIT %s,%s"
            cursor.execute(sql, ((page - 1) * 10, 10))
            result = cursor.fetchall()
            return result
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #查詢新聞總頁數
    def search_count_page(self):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT CEIL(COUNT(*)/10) FROM t_news "
            cursor.execute(sql)
            count_page = cursor.fetchone()[0]
            return count_page
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #刪除新聞
    def delete_by_id(self,id):
        try:
            con = pool.get_connection()
            con.start_transaction()
            cursor = con.cursor()
            sql = "DELETE FROM t_news WHERE id = %s"
            cursor.execute(sql, [id])
            con.commit()
        except Exception as e:
            if "con" == dir():
                con.rollback()
            print(e)
        finally:
            if "con" in dir():
                con.close()

role_dao.py

from db.mysql_db import pool



class RoleDao:
    # 查詢角色列表
    def search_list(self):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT id,role  FROM t_role "
            cursor.execute(sql)
            result = cursor.fetchall()
            return result
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

user_dao.py

from db.mysql_db import pool


# con = pool.get_connection()
# cursor = con.cursor()
# sql = "SELECT COUNT(*) FROM t_user WHERE username = %s AND " \
#   "AES_DECRYPT(UNHEX(password),'HelloWorld') = %s"
# cursor.execute(sql,("admin","123456"))
# count = cursor.fetchone()[0]
# print(count)
# con = pool.get_connection()
# cursor = con.cursor()
# sql = "SELECT r.role FROM t_user AS u JOIN t_role AS r ON " \
#       "u.role_id = r.id WHERE u.username = %s"
# cursor.execute(sql, ["admin"])
# role = cursor.fetchone()[0]
# print(role)
class UserDao:
    #驗證用戶登陸信息
    def login(self,username,password):
        global con
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT COUNT(*) FROM t_user WHERE username = %s AND " \
                  "AES_DECRYPT(UNHEX(password),'HelloWorld') = %s"
            cursor.execute(sql,(username,password))
            count = cursor.fetchone()[0]
            return  True if count == 1 else False

        except Exception as e:
            print(e)
        finally:
            if "con" in dir():#因為這個項目使可以執行一次程序,多次登陸#需要給回連接給連接池,通過調用close()即可
                con.close()

    def serch_user_role(self,username):
        #查詢用戶角色
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT r.role FROM t_user AS u JOIN t_role AS r ON " \
                  "u.role_id = r.id WHERE u.username = %s"
            cursor.execute(sql, [username])
            role = cursor.fetchone()[0]
            return role
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #插入新用戶
    def insert_user(self,username,password,email,role_id):
        try:
            con = pool.get_connection()
            con.start_transaction()
            cursor = con.cursor()
            sql = "INSERT INTO t_user(username,password,email,role_id)" \
                  "VALUES (%s,HEX(AES_ENCRYPT(%s,'HelloWorld')),%s,%s)"
            cursor.execute(sql, (username,password,email,role_id))
            con.commit()
        except Exception as e:
            if "con" == dir():
                con.rollback()
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #查詢用戶總頁數
    def search_count_page(self):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT CEIL(COUNT(*)/10) FROM t_user "
            cursor.execute(sql)
            count_page = cursor.fetchone()[0]
            return count_page
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #插敘用戶的分頁記錄
    def search_list(self,page):
        try:
            con = pool.get_connection()
            cursor = con.cursor()
            sql = "SELECT u.id,u.username,r.role,u.email FROM t_user AS u JOIN t_role AS r" \
                  " ON u.role_id = r.id " \
                  "ORDER BY u.id " \
                  "LIMIT %s,%s"
            cursor.execute(sql,((page - 1)*10,10))
            result = cursor.fetchall()
            return result
        except Exception as e:
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #修改用戶信息
    def update(self,id,username,password,email,role_id):
        try:
            con = pool.get_connection()
            con.start_transaction()
            cursor = con.cursor()
            sql = "UPDATE t_user SET " \
                  "username = %s, " \
                  "password = HEX(AES_ENCRYPT(%s,'HelloWord')), " \
                  "email = %s, " \
                  "role_id = %s " \
                  "WHERE id = %s "
            cursor.execute(sql, (username, password, email, role_id,id))
            con.commit()
        except Exception as e:
            if "con" == dir():
                con.rollback()
            print(e)
        finally:
            if "con" in dir():
                con.close()

    #刪除用戶
    def delete_by_id(self,id):
        try:
            con = pool.get_connection()
            con.start_transaction()
            cursor = con.cursor()
            sql = "DELETE FROM t_user WHERE id = %s "
            cursor.execute(sql, [id])
            con.commit()
        except Exception as e:
            if "con" == dir():
                con.rollback()
            print(e)
        finally:
            if "con" in dir():
                con.close()

 

news_service.py

from db.news_dao import NewDao


class NewsService:
    __news_dao = NewDao()

    #查詢待審批新聞列表
    def search_unreview_list(self,page):
        result = self.__news_dao.search_unreview_list(page)
        return result

    # 查詢待審批新聞的總頁數
    def search_unreview_count_page(self):
        count_page = self.__news_dao.search_unreview_count_page()
        return count_page

    # 審批新聞
    def update_unreview_news(self, id):
        self.__news_dao.update_unreview_news(id)

    #查詢新聞列表
    def search_list(self,page):
        result = self.__news_dao.search_list(page)
        return result

    # 查詢新聞總數
    def search_count_page(self):
        count_page = self.__news_dao.search_count_page()
        return count_page

    # 刪除新聞
    def delete_by_id(self, id):
        self.__news_dao.delete_by_id(id)

 

role_service.py

from db.role_dao import RoleDao


class RoleService:
    __role_dao = RoleDao()

    # 查詢角色列表
    def search_list(self):
        result = self.__role_dao.search_list()
        return result

user_service.py

from db.user_dao import UserDao


class UserService:
    __user_dao = UserDao()
    #驗證用戶登陸
    def login(self,username,password):
        result = self.__user_dao.login(username,password)
        return result
    #查詢用戶角色
    def search_user_role(self,username):
        role = self.__user_dao.serch_user_role(username)
        return  role

    # 插入新用戶
    def insert_user(self, username, password, email, role_id):
        self.__user_dao.insert_user(username, password, email, role_id)

    #查詢用戶總頁數
    def search_count_page(self):
        count_page = self.__user_dao.search_count_page()
        return count_page

    #插敘用戶的分頁記錄
    def search_list(self,page):
        result = self.__user_dao.search_list(page)
        return result

    # 修改用戶信息
    def update(self, id, username, password, email, role_id):
        self.__user_dao.update(id, username, password, email, role_id)

    # 刪除用戶
    def delete_by_id(self, id):
        self.__user_dao.delete_by_id(id)

 

app.py

from colorama import Fore,Style
from getpass import getpass #獲取用輸入密碼
from service.user_service import UserService
from service.news_service import NewsService
from service.role_service import RoleService
import os  #為了清空控制台內容
import sys
import time

__user_service = UserService()
__news_service = NewsService()
__role_service = RoleService()

while True:
    os.system("cls")
    print(Fore.LIGHTBLUE_EX,"\n\t+++++++++++++++++++")
    print(Fore.LIGHTBLUE_EX, "\n\t歡迎使用新聞管理系統")
    print(Fore.LIGHTBLUE_EX, "\n\t+++++++++++++++++++")
    print(Fore.LIGHTGREEN_EX,"\n\t1)登陸系統")
    print(Fore.LIGHTGREEN_EX, "\n\t2)退出系統")
    print(Style.RESET_ALL)
    opt = input("\n\t輸入你的操作編號:")
    if opt == "1":
        username = input("\n\t輸入你的用戶名:")
        password = getpass("\n\t輸入你的密碼:")
        result = __user_service.login(username,password)
        #登陸成功
        if result == True:
            #查詢角色
            role = __user_service.search_user_role(username)
            while True:
                os.system("cls")
                if role == "新聞編輯":
                    print("test")
                elif role == "管理員":
                    print(Fore.LIGHTGREEN_EX,"\n\t1)新聞管理")
                    print(Fore.LIGHTGREEN_EX, "\n\t2)用戶管理")
                    print(Fore.LIGHTRED_EX, "\n\tback)退出登陸")
                    print(Fore.LIGHTRED_EX, "\n\texit)退出系統")
                    print(Style.RESET_ALL)
                    opt = input("\n\t輸入你的操作編號:")
                    if opt == "1":
                        while True:
                            os.system("cls")
                            print(Fore.LIGHTGREEN_EX, "\n\t1)審批新聞")
                            print(Fore.LIGHTGREEN_EX, "\n\t2)刪除新聞")
                            print(Fore.LIGHTRED_EX, "\n\tback)返回")
                            print(Style.RESET_ALL)
                            opt = input("\n\t輸入你的操作編號:")
                            if opt == "1":
                                page = 1
                                while True:
                                    os.system("cls")
                                    count_page = __news_service.search_unreview_count_page()
                                    result = __news_service.search_unreview_list(page)
                                    for index in range(len(result)):
                                        one = result[index]
                                        print(Fore.LIGHTBLUE_EX,"\n\t{0}\t{1}\t{2}\t{3}".format(index+1,one[1],one[2],one[3]))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTBLUE_EX,"\n\t{0}/{1}".format(page,count_page))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTRED_EX, "\n\tback)返回")
                                    print(Fore.LIGHTRED_EX, "\n\tprev)上一頁")
                                    print(Fore.LIGHTRED_EX, "\n\tnext)下一頁")
                                    print(Style.RESET_ALL)
                                    opt = input("\n\t輸入你的操作編號:")
                                    if opt == "back":
                                        break
                                    elif opt == "prev" and page > 1:
                                        page -=1
                                    elif opt == "next" and page < count_page:
                                        page +=1
                                    elif int(opt) >= 1 and int(opt) <= 10:
                                        news_id = result[int(opt) - 1][0]
                                        __news_service.update_unreview_news(news_id)
                            elif opt == "2":
                                page = 1
                                while True:
                                    os.system("cls")
                                    count_page = __news_service.search_count_page()
                                    result = __news_service.search_list(page)
                                    for index in range(len(result)):
                                        one = result[index]
                                        print(Fore.LIGHTBLUE_EX, "\n\t{0}\t{1}\t{2}\t{3}".format(index + 1, one[1], one[2], one[3]))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTRED_EX, "\n\tback)返回")
                                    print(Fore.LIGHTRED_EX, "\n\tprev)上一頁")
                                    print(Fore.LIGHTRED_EX, "\n\tnext)下一頁")
                                    print(Style.RESET_ALL)
                                    opt = input("\n\t輸入你的操作編號:")
                                    if opt == "back":
                                        break
                                    elif opt == "prev" and page > 1:
                                        page -= 1
                                    elif opt == "next" and page < count_page:
                                        page += 1
                                    elif int(opt) >= 1 and int(opt) <= 10:
                                        news_id = result[int(opt) - 1][0]
                                        __news_service.delete_by_id(news_id)
                            elif opt == "back":

                                break
                    elif opt == "2":
                        while True:
                            os.system("cls")
                            print(Fore.LIGHTGREEN_EX, "\n\t1)添加用戶")
                            print(Fore.LIGHTGREEN_EX, "\n\t2)修改用戶")
                            print(Fore.LIGHTGREEN_EX, "\n\t3)刪除用戶")
                            print(Fore.LIGHTRED_EX, "\n\tback)返回")
                            print(Style.RESET_ALL)
                            opt = input("\n\t輸入你的操作編號:")
                            if opt == "back":
                                break
                            elif opt == "1":
                                os.system("cls")
                                username = input("\n\t你要添加的用戶名字")
                                password = getpass("\n\t輸入新用戶密碼")
                                repassword = getpass("\n\t再次輸入新用戶密碼")
                                if password != repassword:
                                    print(Fore.LIGHTRED_EX,"\n\t兩次密碼不一致(3秒自動返回)")
                                    time.sleep(3)
                                    continue
                                email = input("\n\t新用戶郵箱:")
                                result = __role_service.search_list()
                                for index in range(len(result)):
                                    one = result[index]
                                    print(Fore.LIGHTBLUE_EX,"\n\t{0}.{1}".format(index+1,one[1]))
                                print(Style.RESET_ALL)
                                opt = input("\n\t新用戶的角色編號:")
                                role_id = result[int(opt)-1][0]
                                __user_service.insert_user(username,password,email,role_id)
                                print("\n\t保存成功(3秒自動返回)")
                                time.sleep(3)

                            elif opt == "2":

                                page = 1
                                while True:
                                    os.system("cls")
                                    count_page = __user_service.search_count_page()
                                    result = __user_service.search_list(page)
                                    for index in range(len(result)):
                                        one = result[index]
                                        print(Fore.LIGHTBLUE_EX,
                                              "\n\t{0}\t{1}\t{2}".format(index + 1, one[1], one[2]))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTRED_EX, "\n\tback)返回")
                                    print(Fore.LIGHTRED_EX, "\n\tprev)上一頁")
                                    print(Fore.LIGHTRED_EX, "\n\tnext)下一頁")
                                    print(Style.RESET_ALL)
                                    opt = input("\n\t輸入你的操作編號:")
                                    if opt == "back":
                                        break
                                    elif opt == "prev" and page > 1:
                                        page -= 1
                                    elif opt == "next" and page < count_page:
                                        page += 1
                                    elif int(opt) >= 1 and int(opt) <= 10:
                                        os.system("cls")
                                        user_id = result[int(opt)-1][0]
                                        username = input("\n\t你要修改成的用戶名字")
                                        password = getpass("\n\t輸入新密碼")
                                        repassword = getpass("\n\t再次輸入新密碼")
                                        if password != repassword:
                                            print(Fore.LIGHTRED_EX,"\n\t兩次密碼不一致(3秒自動返回)")
                                            time.sleep(3)
                                            print(Style.RESET_ALL)
                                            break
                                        email = input("\n\t新用戶郵箱:")
                                        result = __role_service.search_list()
                                        for index in range(len(result)):
                                            one = result[index]
                                            print(Fore.LIGHTBLUE_EX, "\n\t{0}.{1}".format(index + 1, one[1]))
                                        print(Style.RESET_ALL)
                                        opt = input("\n\t新用戶的角色編號:")
                                        role_id = result[int(opt) - 1][0]
                                        opt = input("\n\t是否保存(Y/N)")
                                        if opt.upper() == "Y":
                                            __user_service.update(user_id,username,password,email,role_id)
                                            print("\n\t保存成功(3秒自動返回)")
                                            time.sleep(3)
                            elif opt == "3":

                                page = 1
                                while True:
                                    os.system("cls")
                                    count_page = __user_service.search_count_page()
                                    result = __user_service.search_list(page)
                                    for index in range(len(result)):
                                        one = result[index]
                                        print(Fore.LIGHTBLUE_EX,
                                              "\n\t{0}\t{1}\t{2}".format(index + 1, one[1], one[2]))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTBLUE_EX, "\n\t{0}/{1}".format(page, count_page))
                                    print(Fore.LIGHTBLUE_EX, "\n\t-------------------")
                                    print(Fore.LIGHTRED_EX, "\n\tback)返回")
                                    print(Fore.LIGHTRED_EX, "\n\tprev)上一頁")
                                    print(Fore.LIGHTRED_EX, "\n\tnext)下一頁")
                                    print(Style.RESET_ALL)
                                    opt = input("\n\t輸入你的操作編號:")
                                    if opt == "back":
                                        break
                                    elif opt == "prev" and page > 1:
                                        page -= 1
                                    elif opt == "next" and page < count_page:
                                        page += 1
                                    elif int(opt) >= 1 and int(opt) <= 10:
                                        os.system("cls")
                                        user_id = result[int(opt) - 1][0]
                                        __user_service.delete_by_id(user_id)
                                        print("\n\t刪除成功(3秒自動返回)")
                                        time.sleep(3)









                    elif opt == "back":
                        break
                    elif opt == "exit":
                        sys.exit(0)
        else:
            print("\n\t登陸失敗,3秒自動返回")
            time.sleep(3)
    elif opt == "2":
        sys.exit(0)#0代表安全退出,等數據釋放等

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM