python發送微信


申請企業微信

使用python發送信息到企業微信,同時支持python2與python3環境,需要先申請一個企業微信,然后創建應用,獲取以下三個信息

企業IP、Agentid、Secret

網信為創建的應用名稱

 

 腳本描述

 將以上三個信息替換到腳本中,主要是

class WeiXin(object):部分,其他的輔助性工具類,收集的一些常用腳本可不用關注
#!/usr/bin/env python 
#coding=utf-8
'''
Created on 2018年2月8日

@author: root
'''


from datetime import datetime
import sys, os, re, json,socket,time
from subprocess import Popen, PIPE
from sys import version_info
if version_info.major == 3 and version_info.minor >=3:
    import urllib.request as urllib2
    pyversion = 3
elif version_info.major == 3:
    pyversion = 3
else:
    import urllib2
    pyversion = 2


try:
    if version_info.major and version_info.major == 3:
        pyversion=3
    elif version_info.major and version_info.major == 2:
        pyversion=2
    else:
        pyversion=2
except Exception as e:
    pyversion = 2

localpath = os.path.split(os.path.realpath(__file__))[0]


class OSCmd():
    '''
    OS Command:直接可調用執行命令的方法,不包括業務邏輯
    本腳本為分好層次的項目中抽出出來的方法,歸為3個類,一個是命令類OSCmd,一個是系統檢查類;
    為保持代碼統計,命令類OSCmd是項目是調試好的代碼復制過來的,不在此腳本中修改,每次都去項目中取相應的方法
    系統檢查邏輯類可以修改
    '''

    def __init__(self):
        '''
        Constructor
        '''

    def exes(self, cmd_shell):
        '''
        call shell command
        '''
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        if pyversion == 3:
            s = s.encode(encoding="utf-8")
        return (s.communicate()[0]).strip('\n')

    def getexesfstval(self, cmd_shell):
        '''
        call shell command
        比如在通過ps -ef過濾進程號的時候,在eclipse中執行可以返回正確的結果,然后在shell在測試腳本時卻多返回一個數字(比如13742),這里取第一個數字,舍棄多返回的數字
        '''
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        res = (s.communicate()[0]).strip('\n')
        ress = res.split('\n')
        return ress[0]

    def exef(self, filename, args):
        '''
        filename : the file is needed to exec as the way like "./filename args"
        args: list []
        for exp: oscmd.exef('/scripts/test/t2.py', ['a','b'])
        '''
        args.insert(0, '')
        if os.path.exists(filename):
            os.execv(filename, args)
        else:
            print('The {0} is not exist'.format(filename))

    def getLineFromFile(self, targetFile, *param):
        '''
        文件中,返回某行記錄,適合小文件
        f:返回首行
        l:返回末行
        n:返回第n行,n為正整數
        默認返回最后一行數據
        '''
        global f
        try:
            f = open(targetFile);
            pnum = len(param);
            with open(targetFile, 'r') as f:  # 打開文件,適合小文件
                lines = f.readlines()  # 讀取所有行
                first_line = lines[0]  # 取第一行
                last_line = lines[-1]  # 取最后一行

            if pnum > 0:
                if type(param[0]) == type('a') and param[0].lower() == 'f':
                    return first_line
                elif type(param[0]) == type('a') and param[0].lower() == 'l':
                    return last_line
                else:
                    return lines[int(param[0]) - 1]
            return last_line
        finally:
            f.close();

    

    def timeminustoS(self, t1, t2):
        t1 = time.localtime(t1)
        t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1)
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")

        # t2=time.localtime(t2)
        t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2)
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return self.total_seconds(t2 - t1)

    def total_seconds(self, time_delta):
        '''
        python 2.6 has not total_seconds method
        '''
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6

    def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain):
        '''
        刪除指定目錄下指定分鍾之前的文件,不刪除目錄,也不遞歸目錄

        刪除/backup/rman/backdb目錄下超過30個小時的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60)

        刪除/backup/rman/backdb目錄下超過30個小時,包含_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60,'_MEMDB_20')

        刪除/backup/rman/backdb目錄下超過30個小時,同時包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60,'back_full','_MEMDB_20')
        '''
        clen = len(contain)
        defilist = []
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                if clen > 0:
                    for c in contain:
                        if c in str(fil):
                            defilist.append(fil)
        #排序
        defilist = self.get_filist_bytime(defilist)
        lsz = len(defilist)
        if  lsz > p_savenum:
            defilist = defilist[0,lsz - p_savenum]

        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isBeforeMins(fil, mins):
                            os.remove(fil)

    def isBeforeMins(self, fil, mins):
        '''
        判斷一個文件的最后修改時間距離當前時間,是否超過指定的分鍾數
        '''
        if os.path.isfile(fil):
            mtfile = os.path.getmtime(fil)
            tnow = time.localtime()
            sec = self.timeminustoS(mtfile, tnow)
            mms = round(sec / 60)
            mins = eval(mins)
            if mms - mins > 0:
                return True
            return False

    def isMorthanSize(self, fil, siz):
        '''
        判斷一個文件是否超過指定的大小,單位為M
        '''
        if os.path.isfile(fil):
            filsiz = os.path.getsize(fil)
            fsiz = eval(siz)*1024*1024
            if filsiz - fsiz > 0:
                return True
            return False

    def rmfilMorthanSize(self, targetDir, siz, *contain):
        '''
        刪除指定目錄下超過指定大小的文件,不刪除目錄,也不遞歸目錄

        刪除/backup/rman/backdb目錄下超過2G大小的文件
        rmfileFrmNow('/backup/rman/backdb', 2*1024)

        刪除/backup/rman/backdb目錄下超過10G大小,包含_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 10*1024,'_MEMDB_20')

        刪除/backup/rman/backdb目錄下超過3G大小,同時包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 3*1024,'back_full','_MEMDB_20')
        '''
        clen = len(contain)
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isMorthanSize(fil, siz):
                            os.remove(fil)


    def mkdir(self, dr, *mod):
        # import stat
        s1 = str(dr)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ['HOME']
            s1 = '%s%s' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = 'mkdir -p %s' % (s1)
            # os.mkdir(dir)  這個命令不識別linux 用戶home目錄“~”符號
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = 'chmod -R 755 %s' % (s1)
            if p_num == 1:
                chmod = 'chmod -R %s %s' % (mod[0], s1)
                self.exes(chmod)
                # os.chmod(dir, mod)
            else:
                # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH)   該行會拋出異常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found
                self.exes(chmod)
            return s1

    def mknod(self, filename, *mod):
        s1 = str(filename)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ['HOME']
            s1 = '%s%s' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = 'touch %s' % (s1)
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = 'chmod -R 644 %s' % (s1)
            if p_num == 1:
                chmod = 'chmod -R %s %s' % (mod[0], s1)
                self.exes(chmod)
            else:
                self.exes(chmod)
        return s1

    def get_filist_bytime(self,file_path):
        dir_list = os.listdir(file_path)
        if not dir_list:
            return
        else:
            # 注意,這里使用lambda表達式,將文件按照最后修改時間順序升序排列
            # os.path.getmtime() 函數是獲取文件最后修改時間
            # os.path.getctime() 函數是獲取文件最后創建時間
            dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x)))
            return dir_list

    def shichaByMin(self, t1, t2):
        '''
        計算以下三種類型之間的時間差
    time.time()-浮點型,time.localtime()-struct_time型、datetime.now()-datetime型
        '''
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return round(self.total_seconds(t2 - t1) / 60)

    def total_seconds(self, time_delta):
        '''
        python 2.6 has not total_seconds method
        '''
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6


class Properties:
    # 腳本默認配置路徑
    oscheck_properties = '/tmp/.python-eggs/.oscheck.properties'
    file_name = ''
    oscmd = OSCmd()

    def __init__(self, file_name='.oscheck.properties'):
        '''
        將配置文件轉化為列表,以列表的讀取方式進行值的替換
        '''
        dr = '/tmp/.python-eggs/'
        if not os.path.exists(dr):  # 當目錄以~開頭時該行永為True,但下面的語句會自判斷
            self.oscmd.mkdir(dr,'777')

        if not os.path.exists(file_name):  # 當目錄以~開頭時該行永為True,但下面的語句會自判斷
            file_name = self.oscmd.mknod(file_name,'666')
            # os.mknod(file_name)   ~

        self.file_name = file_name
        self.properties = {}
        try:
            fopen = open(self.file_name, 'r')
            for line in fopen:
                line = line.strip()
                if line.find('=') > 0 and not line.startswith('#'):
                    strs = line.split('=')
                    self.properties[strs[0].strip()] = strs[1].strip()
        except Exception as e:
            raise e
        else:
            fopen.close()

    def has_key(self, key):
        return key in self.properties

    def keys(self, key):
        return self.properties.keys();

    def get(self, key, default_value=''):
        if key in self.properties:
            return self.properties[key]
        return default_value

    def put(self, key, value):
        self.properties[key] = value
        self.replace_property(self.file_name, key + '=.*', key + '=' + str(value), True)

    def putjson(self, key, values):
        '''
        以json的形式存儲prop中的value,可以處理一些特殊字符,比如=
        '''
        pj = {key: values}
        value = json.dumps(pj)
        self.put(key, value)

    def getjsonvalue(self, key, default_value=''):
        '''
        以json的形式存儲prop中的value,可以處理一些特殊字符,比如=
        '''
        if key in self.properties:
            value = self.get(key)
            valuedict = json.loads(value)
            return valuedict[key]
        return default_value

    def toasc(self, ss):
        asclst = []
        for em in bytearray(ss):
            asclst.append(em)
        return asclst

    def asctostr(self, asclst):
        ss = ''
        for em in asclst:
            ss += str(chr(em))
        return ss

    def putpwd(self, key, value):
        '''
        字符串以ASCII碼存儲
        '''
        asclst = self.toasc(value)
        self.putjson(key, asclst)

    def getpwd(self, key):
        asclst = self.getjsonvalue(key)
        pwd = self.asctostr(asclst)
        return pwd

    def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True):
        '''
        新寫入數據后,替換文件以永久保存
        :param file_name:
        :param from_regex:
        :param to_str:
        :param append_on_not_exists:
        :return:
        '''
        import tempfile
        tmpfile = tempfile.TemporaryFile()

        if os.path.exists(file_name):
            r_open = open(file_name, 'r')
            pattern = re.compile(r'' + from_regex)
            found = None
            for line in r_open:
                if pattern.search(line) and not line.strip().startswith('#'):
                    found = True
                    line = re.sub(from_regex, to_str, line)
                if pyversion == 3:
                    line = line.encode(encoding="utf-8")
                tmpfile.write(line)
            if not found and append_on_not_exists:
                to_str = '\n' + to_str
                if pyversion == 3:
                    to_str = to_str.encode(encoding="utf-8")
                tmpfile.write(to_str)
            r_open.close()
            tmpfile.seek(0)

            content = tmpfile.read()

            if os.path.exists(file_name):
                os.remove(file_name)

            w_open = open(file_name, 'w')
            if pyversion == 3:
                content = content.decode('utf-8')
            w_open.write(content)
            w_open.close()
            tmpfile.close()
        else:
            print("file %s not found" % (file_name))


def parsePro(file_name='/tmp/.python-eggs/.oscheck.properties'):
    return Properties(file_name)


class Log:
    def __init__(self):
        '''
        Constructor
        '''

    logfile = os.path.join(localpath, 'scheck.log')

    def setlog(self, logfile):
        self.logfile = logfile

    def log(self, strs, sp='a'):
        open(self.logfile, sp).write('%s\n' % (strs))

    def inserttime(self, sp='a'):
        now = datetime.now()
        strs = now.strftime('%Y-%m-%d %H:%M:%S')
        oscmd = OSCmd()
        oscmd.mknod(self.logfile, '666')
        open(self.logfile, sp).write('%s\n' % (strs))

    def frmMsg(self,content):
        '''
        格式化信息輸出
        :param content:
        :return:
        '''
        curtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        hname = socket.gethostname()
        IPADDR = socket.gethostbyname(hname)
        cnt = "內容:{0} <br/>時間:{1} <br/>信息來自 {2} {3}".format(content,curtime,hname,IPADDR)
        return cnt


class WeiXin(object):
    '''
    發送微信
    '''
    token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
    cropmsg ={
        'corpid' : 'wwe***2ed*********',
        'corpsecret' : 'Mgyi*****ahx3O-******HkLfg' 
        }
    sendmsg = {}
    access_token_key="weixin_access_token"
    time_token_key="weixin_tokenkey_gettime"
    oscmd = OSCmd()
    prop = parsePro()
    log = Log()

    def __init__(self):
        '''
        Constructor
        '''

    def formatContent(self,content):
        cnt=self.log.frmMsg(content)
        return cnt
        
    def setMsg(self,param):
        arg_num=len(param)
        content = param[0]
        content = self.formatContent(content)
        if pyversion == 2:
            content = content.decode('utf-8')


        if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            
        self.sendmsg = {
              "touser":touser,
              "agentid":agentid,
              "msgtype": "text",
              "text":{
                      "content":content
                      }
              }

    def updateToken(self):
        access_token_response = self.geturl(self.token_url, self.cropmsg)
        access_token = (json.loads(access_token_response)['access_token']).encode('utf-8')
        self.prop.put(self.access_token_key, access_token)
        self.prop.put(self.time_token_key, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


    def get_access_token(self):
        '''
        獲取訪問憑證,經過編碼的一串字符串數據
        每兩個小時取一次即可
        '''
        if not self.prop.has_key(self.time_token_key):
            self.updateToken()
        else:
            curtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            oldtime = self.prop.get(self.time_token_key)
            shicha = self.oscmd.shichaByMin(oldtime,curtime)
            if shicha > 110:
                self.updateToken()

        return self.prop.get(self.access_token_key)
    

    def encodeurl(self,dic):
        '''
        將字典轉換為url參數傳值方式,key1=value1&key2=value2
        '''
        data = ''
        for k,v in dic.items():
            data += '%s=%s%s' % (k,v,'&')
        return data
    
    
    def geturl(self,url,data):
        '''
        data為字典類型的參數,
        返回類型<type 'unicode'>,json
        '''
        data = self.encodeurl(data)
        response = urllib2.urlopen('%s?%s' % (url,data))
        return response.read().decode('utf-8')
                
    def posturl(self,url,data,isjson = True):
        '''
        發送json數據
        返回類型<type 'unicode'>,json
        '''
        if isjson:
            data = json.dumps(data)    #dict
            if pyversion == 3 :
                data = data.encode(encoding="utf-8")
            response = urllib2.urlopen(url,data)
            return response.read().decode('utf-8')
        
        
    def sampleSend(self,content):
        self.setMsg(content)
        # 獲取企業訪問憑證
        access_token = self.get_access_token()
        sendmsg_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' % access_token 
        print (self.posturl(sendmsg_url,self.sendmsg))
        
    def showExample(self):
        if len(sys.argv) == 4:
            touser,notuse,content = sys.argv[1:] 
        else:
            print ('error segments, now exit')
            sys.exit()
        
    def send(self,param):
        ll = Log()
        arg_num=len(param)
        if arg_num >=1:
            self.sampleSend(param)
            #ll.log('%s'%(param))
            
        return ''
    
    def showUsing(self):
        print ('There must be more than 1 params. For example:')
        print ('python sendweixin.py  "微信發送信息調試 "  ')
        print ('python sendweixin.py  "微信發送信息調試 " "微信用戶ID" ')
        print ('注意事項,該腳本尚存在一個問題,首次發送微信時會報access_token missing,第二次無此問題')
        print ('python sendweixin.py  "微信發送信息調試 " "微信用戶ID" "企業微信號"')
        print ('error segments, now exit')
        

if __name__ == '__main__':

    inp = sys.argv
    arg_num = len(inp)
    wxmsg = WeiXin()

    if arg_num > 1 :
        res = wxmsg.send(sys.argv[1:])
    else:
        wxmsg.showUsing()
        sys.exit()

 

重點為以下兩段代碼,企業與應用的標識

  cropmsg ={
        'corpid' : 'wwe***2ed*********',
        'corpsecret' : 'Mgyi*****ahx3O-******HkLfg' 
        }

如果輸入一個參數,則默認發送給企業微信中可以訪問該應用的所有用戶;第二個參數指定具體的微信號

  if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            

示例

$ python sendwx.py "陽光、沙灘、海浪、老船長……"
{"errcode":0,"errmsg":"ok","invaliduser":""}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM