利用itchat給女朋友定時發信息
涉及到的技術有itchat,redis,mysql,最主要的還是mysql咯,當然咯,這么多東西,我就只介紹我代碼需要用到的,其他的,如果需要了解的話,就需要看參考資料了喲
實現的功能:
1.可以保存微信的消息,包括群聊和好友(文字/視頻/語音/圖片)
2.在群里@自己,可以調用圖靈機器人的API進行文字回復(類似於機器人)
3.調用定時任務,在指定時間發送消息至某人
需要了解的基礎:
1.python基礎
2.mysql基礎
3.redis基礎
實現效果如下:

只需要在數據庫中填寫相應的數據,包括,發送給誰(to_user),如果有多個,則用分號(;)分開,執行間隔分鍾數(exe_time),如,1440則代表一天,下一次執行時間戳(next_time),主要是抓這個時間來進行發送,抓取的鍵值(redis_keys)
發送的效果是這樣的:

項目基礎
itchat模塊
官方參考文檔:https://itchat.readthedocs.io/zh/latest/
安裝
pip install itchat / pip3 install itchat
最簡單的測試給好友發送消息
#產生二維碼 itchat.auto_login() #定義用戶的昵稱 send_userid='用戶的昵稱' #查找用戶的userid itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName'] #利用send_msg發送消息 itchat.send_msg('這是一個測試',toUserName=itcaht_user_name)
測試小腳本
#!/usr/bin/env python3 import itchat #產生二維碼 itchat.auto_login() #定義用戶的昵稱 send_userid='親愛的' #查找用戶的userid itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName'] #利用send_msg發送消息 itchat.send_msg('這是一個測試',toUserName=itcaht_user_name)
在執行之后,會彈出一個二維碼,然后會讓你登錄到網頁微信,緊接着會去搜索用戶信息,搜索到了之后會去取用戶的ID,這個ID是微信隨機給的,然后利用send_msg進行發送信息
mysql模塊
參考文檔:http://www.runoob.com/python3/python3-mysql.html
安裝
pip install pymysql / pip3 install pymysql
pymysql的基本使用
連接mysql
db = pymysql.connect(host='hostname',port=port,user='username',passwd='password',db='databasename',charset='charset')
填寫正確的信息后就成功的連接了mysql,注意,port這里,端口號不能加'',否則會連接不上mysql
簡單的操作mysql
insert
#連接DB db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8') #編寫SQL sql = "insert into test_message values (%d);" %(int(time.time())) #獲取游標 cursor = db.cursor() #數據庫重連 db.ping(reconnect=True) #執行SQL cursor.execute(sql) #commit db.commit()
執行完畢后,如果正確的話,就可以看到表中有數據了,當然,數據庫和數據表得自己去建立才行,如果沒有數據,則需要看看程序的輸出結果了,還有一點,在做DML的時候,須要加上commit,這是事務的一個特性,否則數據會丟失的
select
#連接DB db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='test_1',charset='utf8') #編寫SQL sql = "select * from test_message" #獲取游標 cursor = db.cursor() #數據庫重連 db.ping(reconnect=True) #執行SQL cursor.execute(sql) #獲取全部結果 cursor.fetchall() #返回單個數據 cursor.fetchone()
執行完畢后,如果正確的話,就可以看到結果了
redis模塊
參考文檔:https://www.cnblogs.com/xiaoming279/p/6293583.html
安裝
pip install redis / pip3 install redis
連接Redis
r = redis.Redis('host',port,db_port,'password')
判斷鍵是否存在
r = redis.Redis('localhost',6379,0,'redis') keys = '123' if r.exists(keys): print ("存在") else: print ("不存在")
隨機取出列表中的數據
r = redis.Redis('localhost',6379,0,'redis') len_keys = r.llen(keys) random_int = int(random.randint(0,len_keys-1)) print (r.lindex(keys,random_int).decode('utf-8'))
基本上在下面代碼中,幾乎會用到如上基礎
MySQL數據庫/表建立
數據庫建立
CREATE DATABASE `itchat` /*!40100 DEFAULT CHARACTER SET utf8mb4 */
數據表的建立
itchat_login_info
CREATE TABLE `itchat_login_info` ( `createdate` int(11) NOT NULL, `myid` varchar(128) DEFAULT NULL, `username` varchar(128) DEFAULT NULL, `mysignature` varchar(128) DEFAULT NULL, `mysex` varchar(4) DEFAULT NULL, `myuseruin` varchar(128) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
itchat_message
CREATE TABLE `itchat_message` ( `createdate` int(11) NOT NULL, `msgid` varchar(128) NOT NULL, `nickname` varchar(128) DEFAULT NULL, `username` varchar(128) DEFAULT NULL, `actualnickname` varchar(128) DEFAULT NULL, `actualusername` varchar(128) DEFAULT NULL, `msgtext` varchar(5000) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
itchat_friend_message
CREATE TABLE `itchat_friend_message` ( `createdate` int(11) NOT NULL, `msgid` varchar(128) NOT NULL, `fromuser` varchar(128) NOT NULL, `fromuserid` varchar(128) NOT NULL, `fromsex` varchar(4) NOT NULL, `touser` varchar(128) NOT NULL, `touserid` varchar(128) NOT NULL, `tousersex` varchar(4) NOT NULL, `msgtext` varchar(5000) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
auto_timer
CREATE TABLE `auto_timer` ( `timer_id` int(11) NOT NULL, `createdate` int(11) NOT NULL, `content` varchar(200) DEFAULT NULL, `to_user` varchar(1000) NOT NULL, `exe_time` int(11) NOT NULL, `last_time` int(11) DEFAULT NULL, `next_time` int(11) NOT NULL, `redis_keys` varchar(100) NOT NULL, `timer_count` bigint(20) NOT NULL, PRIMARY KEY (`timer_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
代碼
mian.py
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import itchat from itchat.content import * import collections_logs import save_db import liwang_redis_itchat import time import threading import os #定義裝飾器,用於監聽圖片,視頻等消息,並且下載下來 @itchat.msg_register([PICTURE, RECORDING, ATTACHMENT, VIDEO],isFriendChat=True, isGroupChat=True) def download_files(msg): logger.debug(msg['FromUserName']) logger.debug("發送的是語音/視頻等,需要保存至本地") msg.download(msg.fileName) file_value = '127.0.0.1/' + msg.FileName + '_' + msg['Type'] logger.debug(file_value) #將消息寫入數據庫 if '@@' in msg['FromUserName'] : logger.debug("判斷為群聊,將數據寫入MySQL數據庫itchat_message中") sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(file_value)) save_db.exe_db(db,sql,logger) elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']): logger.debug("判斷為群聊,且消息為自己發送的,將數據寫入MySQL數據庫itchat_message中") sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(file_value)) save_db.exe_db(db,sql,logger) else : #將群聊設置為0,讓后面的@抓不到 isgroup = 0 #判斷是否是自己發送 if msg['FromUserName'] == MyID : logger.debug("判斷為好友且為自己發送的,將數據寫入MySQL數據庫itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],file_value) save_db.exe_db(db,sql,logger) else: logger.debug("判斷為好友且為別人發送的,將數據寫入MySQL數據庫itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,file_value) save_db.exe_db(db,sql,logger) #定義裝飾器,用於監聽好友和群聊微信群聊內容 @itchat.msg_register(TEXT, isFriendChat=True, isGroupChat=True) def simple_reply(msg): #定義群聊 isgroup = 1 #將消息寫入數據庫 if '@@' in msg['FromUserName'] : logger.debug("判斷為群聊,將數據寫入MySQL數據庫itchat_message中") sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['ActualNickName'],msg['ActualUserName'],(msg['Text'])) save_db.exe_db(db,sql,logger) elif (msg['FromUserName'] == MyID and '@@' in msg['ToUserName']): logger.debug("判斷為群聊,且消息為自己發送的,將數據寫入MySQL數據庫itchat_message中") sql = "insert into itchat_message values (%d,'%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],myUserName,msg['ActualUserName'],(msg['Text'])) save_db.exe_db(db,sql,logger) else : #將群聊設置為0,讓后面的@抓不到 isgroup = 0 #判斷是否是自己發送 if msg['FromUserName'] == MyID : logger.debug("判斷為好友且為自己發送的,將數據寫入MySQL數據庫itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],myUserName,MyID,mysex,msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],msg['Text']) save_db.exe_db(db,sql,logger) else: logger.debug("判斷為好友且為別人發送的,將數據寫入MySQL數據庫itchat_friend_message中") sql = "insert into itchat_friend_message values (%d,'%s','%s','%s','%s','%s','%s','%s','%s');" %(int(time.time()),msg['MsgId'],msg['User']['NickName'],msg['User']['UserName'],msg['User']['Sex'],myUserName,MyID,mysex,msg['Text']) save_db.exe_db(db,sql,logger) #將所有的聊天信息都寫入到mongodb中 #暫時不寫,mongodb還未掌握基本的使用 #這里寫聊天機器人端口 #判斷是否@了自己,且為群聊 if isgroup == 1 : if msg['isAt'] : logger.debug("@了自己,移動到redis_itchat") from_message = msg['Text'] send_msg = liwang_redis_itchat.auto_box(from_message,redis_db,logger) itchat.send_msg(send_msg,toUserName=msg['ToUserName']) itchat.send_msg(send_msg,toUserName=msg['FromUserName']) else: logger.debug("沒有@自己") #空出兩個空LOG logger.debug("-------------------------------------------------------------------") logger.debug("-------------------------------------------------------------------") def auto_reply(): logger.debug("============================================================================================") #獲取全部ID sql sql = "select timer_id from auto_timer;" #獲取結果 userid_list = save_db.select_db(db,sql,logger) #遍歷auto_timer ID for userid in userid_list: #打印正在處理的信息 sql = "select content from auto_timer where timer_id = '%s';" %(userid) id_content = save_db.select_db(db,sql,logger)[0] logger.debug("判斷ID:%s" %(userid)) logger.debug("判斷內容:%s" %(id_content)) #判斷時間是否符合 #獲取當前的時間戳 now_time = int(time.time()) logger.debug("當前時間為:%s" %(now_time)) sql = "select next_time from auto_timer where timer_id = '%s';" %(userid) next_time = save_db.select_db(db,sql,logger)[0] next_time = int(next_time[0]) logger.debug("下次執行時間戳:%s" %(next_time)) #判斷,如果當前時間大於或等於下一次執行時間,就執行 if now_time >= next_time : logger.debug("處理內容:%s" %(id_content)) #獲取發送人 sql = "select to_user from auto_timer where timer_id = '%s';" %(userid) to_user = save_db.select_db(db,sql,logger)[0] to_user = to_user[0] logger.debug("查找用戶為:%s" %(to_user)) #單用戶 single_user = 1 #判斷字符串是否有分割 if ';' in to_user : allocation_user = to_user.split(';') logger.debug("需要發送多人,信息為:%s" %(allocation_user)) single_user = 0 else: allocation_user = to_user logger.debug("需要發送一人,信息為:%s" %(allocation_user)) #判斷為單用戶,不需要做處理 if single_user == 1 : to_user = to_user + ';' allocation_user = to_user.split(';') for send_userid in allocation_user : logger.debug("正在處理%s用戶" %(send_userid)) #查找微信是否存在此用戶 try: itcaht_user_name = itchat.search_friends(name=send_userid)[0]['UserName'] if itcaht_user_name == " ": logger.debug("微信不存在此好友,請檢查") else: #獲取發送的內容: sql = "select redis_keys from auto_timer where timer_id = '%s';" %(userid) redis_keys = save_db.select_db(db,sql,logger)[0] redis_keys = str(redis_keys[0]) return_values = liwang_redis_itchat.return_rand_keys(redis_db,logger,redis_keys) #獲取其他信息 sql = "select last_time from auto_timer where timer_id = '%s';" %(userid) last_time = save_db.select_db(db,sql,logger)[0] last_time = last_time[0] sql = "select timer_count from auto_timer where timer_id = '%s';" %(userid) timer_count = save_db.select_db(db,sql,logger)[0] timer_count = int(timer_count[0]) sql = "select exe_time from auto_timer where timer_id = '%s';" %(userid) exe_time = save_db.select_db(db,sql,logger)[0] exe_time = int(exe_time[0]) * 60 logger.debug("將要發送的信息如下") logger.debug("timer_ID:%s" %(userid)) logger.debug("發送簡介為:%s" %(id_content)) logger.debug("上一次執行的時間戳是:%s" %(last_time)) logger.debug("下一次執行的時間戳是:%s" %(next_time)) logger.debug("獲取Redis的值為:%s" %(redis_keys)) logger.debug("間隔描述為:%s" %(exe_time)) logger.debug("已經執行次數為:%s" %(timer_count)) try: logger.debug("將要發送的消息: %s" %(return_values)) itchat.send_msg(return_values,toUserName=itcaht_user_name) except Exception as e: logger.debug("信息發送失敗,詳細信息如下:") logger.debug(e) except Exception as e: logger.debug("未找到該用戶信息,詳細信息如下:") logger.debug(e) try: # 更新信息 last_time = now_time + exe_time timer_count = timer_count + 1 userid = userid[0] sql = "update auto_timer set last_time = '%s' , next_time = '%s' , timer_count = '%s' where timer_id = '%s';" \ %(now_time,last_time,timer_count,userid) save_db.exe_db(db,sql,logger) logger.debug("更新信息完畢") except Exception as e: logger.debug("消息已經發送,但是更新數據庫失敗,詳細信息如下:") logger.debug(e) else: logger.debug("判斷時間未到") #修整60秒 logger.debug("============================================================================================") time.sleep(3) time.sleep(60) if __name__ == "__main__" : #設置登陸的PK loginpk = 'login_' + str(time.time()) #定義微信自動登錄 itchat.auto_login(enableCmdQR=2,statusStorageDir=loginpk) # itchat.auto_login(statusStorageDir=loginpk) #獲取自己的ID global MyID MyID = itchat.get_friends(update=True)[0]["UserName"] #獲取用戶信息 global myUserName global mysex myUserName = itchat.get_friends()[0]['NickName'] myUserUin = itchat.get_friends()[0]['Uin'] mysignature = itchat.get_friends()[0]['Signature'] mysex = itchat.get_friends()[0]['Sex'] #設置logger global logger logname = str(myUserName) + '_' + str(myUserUin) logger = collections_logs.build_logs(logname) #定義數據庫 global db db = save_db.itchat_connect_mysql(logger) global redis_db redis_db = liwang_redis_itchat.connect_redis(logger) #每登陸一次,記錄一次login logger.debug("記錄登陸Log") sql = "insert into itchat_login_info values (%d,'%s','%s','%s','%s','%s');" %(int(time.time()),MyID,myUserName,mysignature,mysex,myUserUin) save_db.exe_db(db,sql,logger) #啟動線程 threading._start_new_thread(itchat.run,()) #開始循環 while 1: itchat.configured_reply() auto_reply()
collections_logs.py
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import logging import time def build_logs(file_name): #獲取當前時間 now_time = time.strftime("%Y_%m_%d_%H_%M_%S",time.localtime()) #設置log名稱 logname = 'itchat' + file_name + now_time #定義logger logger = logging.getLogger() #設置級別為debug logger.setLevel(level = logging.DEBUG) #設置 logging文件名稱 handler = logging.FileHandler(logname) #設置級別為debug handler.setLevel(logging.DEBUG) #設置log的格式 formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') #將格式壓進logger handler.setFormatter(formatter) console = logging.StreamHandler() console.setLevel(logging.DEBUG) #寫入logger logger.addHandler(handler) logger.addHandler(console) #將logger返回 return logger
save_db.py
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import pymysql def itchat_connect_mysql(logger): try: db = pymysql.connect(host='127.0.0.1',port=3306,user='root',passwd='root',db='itchat',charset='utf8') logger.debug('MySQL數據庫連接成功') logger.debug('數據庫ID:%s' %(db)) return db except Exception as e: logger.debug('MySQL數據庫連接失敗') logger.debug(e) def exe_db(db,sql,logger): try: cursor = db.cursor() logger.debug('獲取游標:%s' %(cursor)) db.ping(reconnect=True) logger.debug("數據庫重連") cursor.execute(sql) logger.debug("執行SQL") logger.debug(sql) db.commit() logger.debug("數據庫commit") except Exception as e: logger.debug('SQL執行失敗,請至日志查看該SQL記錄') logger.debug(sql) logger.debug(e) def select_db(db,sql,logger): try: cursor = db.cursor() logger.debug('獲取游標:%s' %(cursor)) db.ping(reconnect=True) logger.debug("數據庫重連") cursor.execute(sql) logger.debug("執行SQL") logger.debug(sql) return cursor.fetchall() except Exception as e: logger.debug('SQL執行失敗,請至日志查看該SQL記錄') logger.debug(sql) logger.debug(e)
liwang_redis_itchat.py
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import redis import json import requests import random def connect_redis(logger): try: r = redis.Redis('127.0.0.1',6379,0,'redis') logger.debug("Redis數據庫連接成功") return r except Exception as e: logger.debug("Redis數據庫連接失敗") logger.debug(e) def return_rand_keys(redis_db,logger,keys): #判斷是否有這個key if redis_db.exists(keys): len_keys = redis_db.llen(keys) random_int = int(random.randint(0,len_keys-1)) logger.debug("Redis獲取的隨機值為:%s" %(random_int)) return_redis_values = redis_db.lindex(keys,random_int).decode('utf-8') logger.debug("Redis將要返回的值為:%s" %(return_redis_values)) return return_redis_values else: logger.debug("沒有找到相關key") return 0 def auto_box(message,redis_db,logger): #設置分隔符 if '\u2005' in message : myid='@小子\u2005' else: myid = '@小子 ' #獲取分隔符之后的值 if len(message.split(myid)) == 1 : return "@我了要說話喲" else: dealwith = message.split(myid)[1] if dealwith == " " : logger.debug("只是@了我,並沒有輸入內容") else: #圖靈機器人API接口 url_api = "http://openapi.tuling123.com/openapi/api/v2" #將問題賦值給box_quest box_quest = dealwith #定義json post request_json = json.dumps ({ "perception": { "inputText": { "text": box_quest }, "selfInfo": { "location": { "city": "成都", "province": "成都", "street": "天府三街" } } }, "userInfo": { "apiKey": "APIkey", "userId": "tuling" } } ) # 獲取POST之后的值 r = requests.post(url_api, data=request_json) r_test = r.text r_json = json.loads(r_test) #定義返回值 return_str = (r_json['results'][0]['values']['text']) logger.debug("圖靈機器人將要返回的值:") logger.debug(return_str) #需要自己建立自己的機器人倉庫 logger.debug("將返回的結果存入Redis數據庫中") redis_db.rpush(box_quest,return_str) return return_str
實現的效果
如下以log展現
接收消息並且保存至數據庫
圖靈機器人回復
定時任務


