利用python itchat給女朋友定時發信息


利用itchat給女朋友定時發信息

涉及到的技術有itchat,redis,mysql,最主要的還是mysql咯,當然咯,這么多東西,我就只介紹我代碼需要用到的,其他的,如果需要了解的話,就需要看參考資料了喲

實現的功能:
1.可以保存微信的消息,包括群聊和好友(文字/視頻/語音/圖片)
2.在群里@自己,可以調用圖靈機器人的API進行文字回復(類似於機器人)
3.調用定時任務,在指定時間發送消息至某人

需要了解的基礎:
1.python基礎
2.mysql基礎
3.redis基礎

 

實現效果如下:

只需要在數據庫中填寫相應的數據,包括,發送給誰(to_user),如果有多個,則用分號(;)分開,執行間隔分鍾數(exe_time),如,1440則代表一天,下一次執行時間戳(next_time),主要是抓這個時間來進行發送,抓取的鍵值(redis_keys)

發送的效果是這樣的:

 

 

項目基礎

itchat模塊

官方參考文檔:https://itchat.readthedocs.io/zh/latest/

安裝

pip install itchat / pip3 install itchat

最簡單的測試給好友發送消息

#產生二維碼
itchat.auto_login()
#定義用戶的昵稱
send_userid='用戶的昵稱'
#查找用戶的userid
itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
#利用send_msg發送消息
itchat.send_msg('這是一個測試',toUserName=itcaht_user_name)

測試小腳本

#!/usr/bin/env python3

import itchat

#產生二維碼
itchat.auto_login()
#定義用戶的昵稱
send_userid='親愛的'
#查找用戶的userid
itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']
#利用send_msg發送消息
itchat.send_msg('這是一個測試',toUserName=itcaht_user_name)

在執行之后,會彈出一個二維碼,然后會讓你登錄到網頁微信,緊接着會去搜索用戶信息,搜索到了之后會去取用戶的ID,這個ID是微信隨機給的,然后利用send_msg進行發送信息

 

mysql模塊

參考文檔:http://www.runoob.com/python3/python3-mysql.html

安裝

pip install pymysql / pip3 install pymysql

pymysql的基本使用

連接mysql

db = pymysql.connect(host='hostname',port=port,user='username',passwd='password',db='databasename',charset='charset')

填寫正確的信息后就成功的連接了mysql,注意,port這里,端口號不能加'',否則會連接不上mysql

 

簡單的操作mysql

insert
#連接DB
db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
#編寫SQL
sql = "insert into test_message values (%d);" %(int(time.time()))
#獲取游標
cursor = db.cursor()
#數據庫重連
db.ping(reconnect=True)
#執行SQL
cursor.execute(sql)
#commit 
db.commit()

執行完畢后,如果正確的話,就可以看到表中有數據了,當然,數據庫和數據表得自己去建立才行,如果沒有數據,則需要看看程序的輸出結果了,還有一點,在做DML的時候,須要加上commit,這是事務的一個特性,否則數據會丟失的

 

select
#連接DB
db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8')
#編寫SQL
sql = "select * from test_message"
#獲取游標
cursor = db.cursor()
#數據庫重連
db.ping(reconnect=True)
#執行SQL
cursor.execute(sql)
#獲取全部結果
cursor.fetchall()
#返回單個數據
cursor.fetchone()

執行完畢后,如果正確的話,就可以看到結果了

 

redis模塊

 參考文檔:https://www.cnblogs.com/xiaoming279/p/6293583.html

安裝 

pip install redis / pip3 install redis  

連接Redis

r = redis.Redis('host',port,db_port,'password')
判斷鍵是否存在
r = redis.Redis('localhost',6379,0,'redis')
keys = '123'
if r.exists(keys):
    print ("存在")
else:
    print ("不存在")
隨機取出列表中的數據
r = redis.Redis('localhost',6379,0,'redis')
len_keys = r.llen(keys)
random_int = int(random.randint(0,len_keys-1))
print (r.lindex(keys,random_int).decode('utf-8'))

基本上在下面代碼中,幾乎會用到如上基礎

MySQL數據庫/表建立

數據庫建立 

CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */

數據表的建立

itchat_login_info

CREATE TABLE `itchat_login_info` (
`createdate` int(11) NOT NULL,
`myid` varchar(128) DEFAULT NULL,
`username` varchar(128) DEFAULT NULL,
`mysignature` varchar(128) DEFAULT NULL,
`mysex` varchar(4) DEFAULT NULL,
`myuseruin` varchar(128) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_message

CREATE TABLE `itchat_message` (
`createdate` int(11) NOT NULL,
`msgid` varchar(128) NOT NULL,
`nickname` varchar(128) DEFAULT NULL,
`username` varchar(128) DEFAULT NULL,
`actualnickname` varchar(128) DEFAULT NULL,
`actualusername` varchar(128) DEFAULT NULL,
`msgtext` varchar(5000) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

itchat_friend_message

CREATE TABLE `itchat_friend_message` (
`createdate` int(11) NOT NULL,
`msgid` varchar(128) NOT NULL,
`fromuser` varchar(128) NOT NULL,
`fromuserid` varchar(128) NOT NULL,
`fromsex` varchar(4) NOT NULL,
`touser` varchar(128) NOT NULL,
`touserid` varchar(128) NOT NULL,
`tousersex` varchar(4) NOT NULL,
`msgtext` varchar(5000) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

auto_timer 

CREATE TABLE `auto_timer` (
`timer_id` int(11) NOT NULL,
`createdate` int(11) NOT NULL,
`content` varchar(200) DEFAULT NULL,
`to_user` varchar(1000) NOT NULL,
`exe_time` int(11) NOT NULL,
`last_time` int(11) DEFAULT NULL,
`next_time` int(11) NOT NULL,
`redis_keys` varchar(100) NOT NULL,
`timer_count` bigint(20) NOT NULL,
PRIMARY KEY (`timer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

代碼

mian.py

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import itchat
from itchat.content import *
import collections_logs
import save_db
import liwang_redis_itchat
import time 
import threading
import os

#定義裝飾器,用於監聽圖片,視頻等消息,並且下載下來
@itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True)
def download_files(msg):
    logger.debug(msg['FromUserName'])
    logger.debug("發送的是語音/視頻等,需要保存至本地")
    msg.download(msg.fileName)
    file_value = '127.0.0.1/' + msg.FileName + '_' + msg['Type']
    logger.debug(file_value)

    #將消息寫入數據庫
    if '@@' in msg['FromUserName'] :
        logger.debug("判斷為群聊,將數據寫入MySQL數據庫itchat_message中")
        sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(file_value))
        save_db.exe_db(db,sql,logger)
    elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
        logger.debug("判斷為群聊,且消息為自己發送的,將數據寫入MySQL數據庫itchat_message中")
        sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(file_value))
        save_db.exe_db(db,sql,logger)
    else :
    #將群聊設置為0,讓后面的@抓不到
        isgroup = 0
        #判斷是否是自己發送
        if msg['FromUserName'] == MyID :
            logger.debug("判斷為好友且為自己發送的,將數據寫入MySQL數據庫itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],file_value)
            save_db.exe_db(db,sql,logger)
        else:
            logger.debug("判斷為好友且為別人發送的,將數據寫入MySQL數據庫itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,file_value)
            save_db.exe_db(db,sql,logger)


#定義裝飾器,用於監聽好友和群聊微信群聊內容
@itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True)   
def simple_reply(msg):

    #定義群聊
    isgroup = 1

    #將消息寫入數據庫
    if '@@' in msg['FromUserName'] :
        logger.debug("判斷為群聊,將數據寫入MySQL數據庫itchat_message中")
        sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(msg['Text']))
        save_db.exe_db(db,sql,logger)
    elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']):
        logger.debug("判斷為群聊,且消息為自己發送的,將數據寫入MySQL數據庫itchat_message中")
        sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(msg['Text']))
        save_db.exe_db(db,sql,logger)
    else :
        #將群聊設置為0,讓后面的@抓不到
        isgroup = 0
        #判斷是否是自己發送
        if msg['FromUserName'] == MyID :
            logger.debug("判斷為好友且為自己發送的,將數據寫入MySQL數據庫itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],msg['Text'])
            save_db.exe_db(db,sql,logger)
        else:
            logger.debug("判斷為好友且為別人發送的,將數據寫入MySQL數據庫itchat_friend_message中")
            sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,msg['Text'])
            save_db.exe_db(db,sql,logger)

    #將所有的聊天信息都寫入到mongodb中
    #暫時不寫,mongodb還未掌握基本的使用

    #這里寫聊天機器人端口
    #判斷是否@了自己,且為群聊
    if isgroup == 1 :
        if msg['isAt'] :
            logger.debug("@了自己,移動到redis_itchat")
            from_message = msg['Text']
            send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger)
            itchat.send_msg(send_msg,toUserName=msg['ToUserName'])
            itchat.send_msg(send_msg,toUserName=msg['FromUserName'])
        else:
            logger.debug("沒有@自己")


    #空出兩個空LOG
    logger.debug("-------------------------------------------------------------------")
    logger.debug("-------------------------------------------------------------------")

def auto_reply():
    logger.debug("============================================================================================")
    
    #獲取全部ID sql 
    sql = "select timer_id from auto_timer;"
    #獲取結果
    userid_list = save_db.select_db(db,sql,logger)

    #遍歷auto_timer ID
    for userid in userid_list:
        #打印正在處理的信息
        sql = "select content from auto_timer where timer_id = '%s';" %(userid)
        id_content = save_db.select_db(db,sql,logger)[0]
        logger.debug("判斷ID:%s" %(userid))
        logger.debug("判斷內容:%s" %(id_content))

        #判斷時間是否符合
        #獲取當前的時間戳
        now_time = int(time.time())
        logger.debug("當前時間為:%s" %(now_time))
        sql = "select next_time from auto_timer where timer_id = '%s';" %(userid)
        next_time = save_db.select_db(db,sql,logger)[0]
        next_time = int(next_time[0])
        logger.debug("下次執行時間戳:%s" %(next_time))

        #判斷,如果當前時間大於或等於下一次執行時間,就執行
        if now_time >= next_time :
            logger.debug("處理內容:%s" %(id_content))

            #獲取發送人
            sql = "select to_user from auto_timer where timer_id = '%s';" %(userid)
            to_user = save_db.select_db(db,sql,logger)[0]
            to_user = to_user[0]
            logger.debug("查找用戶為:%s" %(to_user))

            #單用戶
            single_user = 1

            #判斷字符串是否有分割
            if ';' in to_user :
                allocation_user = to_user.split(';')
                logger.debug("需要發送多人,信息為:%s" %(allocation_user))
                single_user = 0
            else:
                allocation_user = to_user
                logger.debug("需要發送一人,信息為:%s" %(allocation_user))

            #判斷為單用戶,不需要做處理
            if single_user == 1 :
                to_user = to_user + ';'
                allocation_user = to_user.split(';')
                


            for send_userid in allocation_user :
                logger.debug("正在處理%s用戶" %(send_userid))
                #查找微信是否存在此用戶
                try:
                    itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName']

                    if itcaht_user_name == " ":
                        logger.debug("微信不存在此好友,請檢查")
                    else:
                        #獲取發送的內容:
                        sql = "select redis_keys from auto_timer where timer_id = '%s';" %(userid)
                        redis_keys = save_db.select_db(db,sql,logger)[0]
                        redis_keys = str(redis_keys[0])
                        return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys)

                        #獲取其他信息
                        sql = "select last_time from auto_timer where timer_id = '%s';" %(userid)
                        last_time = save_db.select_db(db,sql,logger)[0]
                        last_time = last_time[0]

                        sql = "select timer_count from auto_timer where timer_id = '%s';" %(userid)
                        timer_count = save_db.select_db(db,sql,logger)[0]
                        timer_count = int(timer_count[0])

                        sql = "select exe_time from auto_timer where timer_id = '%s';" %(userid)
                        exe_time = save_db.select_db(db,sql,logger)[0]
                        exe_time = int(exe_time[0]) * 60


                        logger.debug("將要發送的信息如下")
                        logger.debug("timer_ID:%s" %(userid))
                        logger.debug("發送簡介為:%s" %(id_content))
                        logger.debug("上一次執行的時間戳是:%s" %(last_time))
                        logger.debug("下一次執行的時間戳是:%s" %(next_time))
                        logger.debug("獲取Redis的值為:%s" %(redis_keys))
                        logger.debug("間隔描述為:%s" %(exe_time))
                        logger.debug("已經執行次數為:%s" %(timer_count))

                        try:
                            logger.debug("將要發送的消息: %s" %(return_values))
                            itchat.send_msg(return_values,toUserName=itcaht_user_name)
                            
                        except Exception as e:
                            logger.debug("信息發送失敗,詳細信息如下:")
                            logger.debug(e)
                except Exception as e:
                    logger.debug("未找到該用戶信息,詳細信息如下:")
                    logger.debug(e)

            try:
                # 更新信息
                last_time = now_time + exe_time
                timer_count = timer_count + 1


                userid = userid[0]

                sql = "update auto_timer set last_time = '%s' , next_time = '%s' , timer_count = '%s' where timer_id = '%s';" \
                %(now_time,last_time,timer_count,userid)

                save_db.exe_db(db,sql,logger)

                logger.debug("更新信息完畢")


            except Exception as e:
                logger.debug("消息已經發送,但是更新數據庫失敗,詳細信息如下:")
                logger.debug(e)
        else:
            logger.debug("判斷時間未到")

        #修整60秒
        logger.debug("============================================================================================")
        time.sleep(3)
    time.sleep(60)

if __name__ == "__main__" :
    
    #設置登陸的PK
    loginpk = 'login_' + str(time.time())
    
    #定義微信自動登錄
    itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk)
#    itchat.auto_login(statusStorageDir=loginpk)

    #獲取自己的ID
    global MyID
    MyID = itchat.get_friends(update=True)[0]["UserName"]

    #獲取用戶信息
    global myUserName
    global mysex
    myUserName = itchat.get_friends()[0]['NickName']
    myUserUin = itchat.get_friends()[0]['Uin']
    mysignature = itchat.get_friends()[0]['Signature']
    mysex = itchat.get_friends()[0]['Sex']

    #設置logger
    global logger
    logname = str(myUserName) + '_' + str(myUserUin)
    logger = collections_logs.build_logs(logname)
    
    #定義數據庫
    global db
    db = save_db.itchat_connect_mysql(logger)
    global redis_db
    redis_db = liwang_redis_itchat.connect_redis(logger)

    #每登陸一次,記錄一次login
    logger.debug("記錄登陸Log")
    sql = "insert into itchat_login_info values (%d,'%s','%s','%s','%s','%s');" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin)
    save_db.exe_db(db,sql,logger)

    #啟動線程
    threading._start_new_thread(itchat.run,())

    #開始循環
    while 1:
        itchat.configured_reply()
        auto_reply()

collections_logs.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import logging
import time

def build_logs(file_name):

    #獲取當前時間
    now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime())
    #設置log名稱
    logname = 'itchat' + file_name + now_time
    #定義logger
    logger = logging.getLogger()
    #設置級別為debug
    logger.setLevel(level = logging.DEBUG)
    #設置 logging文件名稱
    handler = logging.FileHandler(logname)
    #設置級別為debug
    handler.setLevel(logging.DEBUG)
    #設置log的格式
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    #將格式壓進logger
    handler.setFormatter(formatter)
    console = logging.StreamHandler()
    console.setLevel(logging.DEBUG)
    #寫入logger
    logger.addHandler(handler)
    logger.addHandler(console)
    #將logger返回
    return logger

save_db.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import pymysql

def itchat_connect_mysql(logger):

    try:
        db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='itchat',charset='utf8')
        logger.debug('MySQL數據庫連接成功')
        logger.debug('數據庫ID:%s' %(db))
        return db
    except Exception as e:
        logger.debug('MySQL數據庫連接失敗')
        logger.debug(e)

def exe_db(db,sql,logger):
    try:
        cursor = db.cursor()
        logger.debug('獲取游標:%s' %(cursor))
        db.ping(reconnect=True)
        logger.debug("數據庫重連")
        cursor.execute(sql)
        logger.debug("執行SQL")
        logger.debug(sql)
        db.commit()
        logger.debug("數據庫commit")
    except Exception as e:
        logger.debug('SQL執行失敗,請至日志查看該SQL記錄')
        logger.debug(sql)
        logger.debug(e)

def select_db(db,sql,logger):
    try:
        cursor = db.cursor()
        logger.debug('獲取游標:%s' %(cursor))
        db.ping(reconnect=True)
        logger.debug("數據庫重連")
        cursor.execute(sql)
        logger.debug("執行SQL")
        logger.debug(sql)
        return cursor.fetchall()
    except Exception as e:
        logger.debug('SQL執行失敗,請至日志查看該SQL記錄')
        logger.debug(sql)
        logger.debug(e)

liwang_redis_itchat.py

 

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

import redis
import json
import requests
import random


def connect_redis(logger):
    try:
        r = redis.Redis('127.0.0.1',6379,0,'redis')
        logger.debug("Redis數據庫連接成功")
        return r
    except Exception as e:
        logger.debug("Redis數據庫連接失敗")
        logger.debug(e)

def return_rand_keys(redis_db,logger,keys):

    #判斷是否有這個key

    if redis_db.exists(keys):

        len_keys = redis_db.llen(keys)
        
        random_int = int(random.randint(0,len_keys-1))
        logger.debug("Redis獲取的隨機值為:%s" %(random_int))

        return_redis_values = redis_db.lindex(keys,random_int).decode('utf-8')
        logger.debug("Redis將要返回的值為:%s" %(return_redis_values))

        return return_redis_values

    else:
        logger.debug("沒有找到相關key")
        return 0

def auto_box(message,redis_db,logger):

    #設置分隔符
    if '\u2005' in message :
        myid='@小子\u2005'
    else:
        myid = '@小子 '

    #獲取分隔符之后的值
    if len(message.split(myid)) == 1 :
        return "@我了要說話喲"
    else:
        dealwith = message.split(myid)[1]

    if dealwith == " " :
        logger.debug("只是@了我,並沒有輸入內容")
    else:
        #圖靈機器人API接口
        url_api = "http://openapi.tuling123.com/openapi/api/v2"

        #將問題賦值給box_quest
        box_quest = dealwith

        #定義json post 
        request_json = json.dumps ({
            "perception":
            {
                "inputText":
                {
                    "text": box_quest
                },

                "selfInfo":
                {
                    "location":
                    {
                        "city": "成都",
                        "province": "成都",
                        "street": "天府三街"
                    }
                }
            },

            "userInfo": 
            {
                "apiKey": "APIkey",
                "userId": "tuling"
            }
        }
        )

        # 獲取POST之后的值
        r = requests.post(url_api, data=request_json)
        r_test = r.text
        r_json = json.loads(r_test)

        #定義返回值
        return_str = (r_json['results'][0]['values']['text'])

        logger.debug("圖靈機器人將要返回的值:")
        logger.debug(return_str)

        #需要自己建立自己的機器人倉庫
        logger.debug("將返回的結果存入Redis數據庫中")
        redis_db.rpush(box_quest,return_str)

        return return_str

實現的效果 

如下以log展現

接收消息並且保存至數據庫

 

圖靈機器人回復 

定時任務


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM