当前位置: 首页 > news >正文

python发送微信

申请企业微信

使用python发送信息到企业微信,同时支持python2与python3环境,需要先申请一个企业微信,然后创建应用,获取以下三个信息

企业IP、Agentid、Secret

网信为创建的应用名称

 

 脚本描述

 将以上三个信息替换到脚本中,主要是

class WeiXin(object):部分,其他的辅助性工具类,收集的一些常用脚本可不用关注
#!/usr/bin/env python 
#coding=utf-8
'''
Created on 2018年2月8日

@author: root
'''


from datetime import datetime
import sys, os, re, json,socket,time
from subprocess import Popen, PIPE
from sys import version_info
if version_info.major == 3 and version_info.minor >=3:
    import urllib.request as urllib2
    pyversion = 3
elif version_info.major == 3:
    pyversion = 3
else:
    import urllib2
    pyversion = 2


try:
    if version_info.major and version_info.major == 3:
        pyversion=3
    elif version_info.major and version_info.major == 2:
        pyversion=2
    else:
        pyversion=2
except Exception as e:
    pyversion = 2

localpath = os.path.split(os.path.realpath(__file__))[0]


class OSCmd():
    '''
    OS Command:直接可调用执行命令的方法,不包括业务逻辑
    本脚本为分好层次的项目中抽出出来的方法,归为3个类,一个是命令类OSCmd,一个是系统检查类;
    为保持代码统计,命令类OSCmd是项目是调试好的代码复制过来的,不在此脚本中修改,每次都去项目中取相应的方法
    系统检查逻辑类可以修改
    '''

    def __init__(self):
        '''
        Constructor
        '''

    def exes(self, cmd_shell):
        '''
        call shell command
        '''
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        if pyversion == 3:
            s = s.encode(encoding="utf-8")
        return (s.communicate()[0]).strip('\n')

    def getexesfstval(self, cmd_shell):
        '''
        call shell command
        比如在通过ps -ef过滤进程号的时候,在eclipse中执行可以返回正确的结果,然后在shell在测试脚本时却多返回一个数字(比如13742),这里取第一个数字,舍弃多返回的数字
        '''
        s = Popen(cmd_shell, shell=True, stdout=PIPE);
        res = (s.communicate()[0]).strip('\n')
        ress = res.split('\n')
        return ress[0]

    def exef(self, filename, args):
        '''
        filename : the file is needed to exec as the way like "./filename args"
        args: list []
        for exp: oscmd.exef('/scripts/test/t2.py', ['a','b'])
        '''
        args.insert(0, '')
        if os.path.exists(filename):
            os.execv(filename, args)
        else:
            print('The {0} is not exist'.format(filename))

    def getLineFromFile(self, targetFile, *param):
        '''
        文件中,返回某行记录,适合小文件
        f:返回首行
        l:返回末行
        n:返回第n行,n为正整数
        默认返回最后一行数据
        '''
        global f
        try:
            f = open(targetFile);
            pnum = len(param);
            with open(targetFile, 'r') as f:  # 打开文件,适合小文件
                lines = f.readlines()  # 读取所有行
                first_line = lines[0]  # 取第一行
                last_line = lines[-1]  # 取最后一行

            if pnum > 0:
                if type(param[0]) == type('a') and param[0].lower() == 'f':
                    return first_line
                elif type(param[0]) == type('a') and param[0].lower() == 'l':
                    return last_line
                else:
                    return lines[int(param[0]) - 1]
            return last_line
        finally:
            f.close();

    

    def timeminustoS(self, t1, t2):
        t1 = time.localtime(t1)
        t1 = time.strftime("%Y-%m-%d %H:%M:%S", t1)
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")

        # t2=time.localtime(t2)
        t2 = time.strftime("%Y-%m-%d %H:%M:%S", t2)
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return self.total_seconds(t2 - t1)

    def total_seconds(self, time_delta):
        '''
        python 2.6 has not total_seconds method
        '''
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6

    def rmfileFrmNow(self,p_savenum, targetDir, mins, *contain):
        '''
        删除指定目录下指定分钟之前的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过30个小时的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60)

        删除/backup/rman/backdb目录下超过30个小时,包含_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60,'_MEMDB_20')

        删除/backup/rman/backdb目录下超过30个小时,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 30*60,'back_full','_MEMDB_20')
        '''
        clen = len(contain)
        defilist = []
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                if clen > 0:
                    for c in contain:
                        if c in str(fil):
                            defilist.append(fil)
        #排序
        defilist = self.get_filist_bytime(defilist)
        lsz = len(defilist)
        if  lsz > p_savenum:
            defilist = defilist[0,lsz - p_savenum]

        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isBeforeMins(fil, mins):
                            os.remove(fil)

    def isBeforeMins(self, fil, mins):
        '''
        判断一个文件的最后修改时间距离当前时间,是否超过指定的分钟数
        '''
        if os.path.isfile(fil):
            mtfile = os.path.getmtime(fil)
            tnow = time.localtime()
            sec = self.timeminustoS(mtfile, tnow)
            mms = round(sec / 60)
            mins = eval(mins)
            if mms - mins > 0:
                return True
            return False

    def isMorthanSize(self, fil, siz):
        '''
        判断一个文件是否超过指定的大小,单位为M
        '''
        if os.path.isfile(fil):
            filsiz = os.path.getsize(fil)
            fsiz = eval(siz)*1024*1024
            if filsiz - fsiz > 0:
                return True
            return False

    def rmfilMorthanSize(self, targetDir, siz, *contain):
        '''
        删除指定目录下超过指定大小的文件,不删除目录,也不递归目录

        删除/backup/rman/backdb目录下超过2G大小的文件
        rmfileFrmNow('/backup/rman/backdb', 2*1024)

        删除/backup/rman/backdb目录下超过10G大小,包含_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 10*1024,'_MEMDB_20')

        删除/backup/rman/backdb目录下超过3G大小,同时包含back_full_、_MEMDB_20字符的文件
        rmfileFrmNow('/backup/rman/backdb', 3*1024,'back_full','_MEMDB_20')
        '''
        clen = len(contain)
        if os.path.isdir(targetDir):
            for fil in os.listdir(targetDir):
                flag = True
                if clen > 0:
                    for c in contain:
                        if not c in str(fil):
                            flag = False
                if flag:
                    fil = os.path.join(targetDir, fil)
                    if os.path.isfile(fil):
                        if self.isMorthanSize(fil, siz):
                            os.remove(fil)


    def mkdir(self, dr, *mod):
        # import stat
        s1 = str(dr)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ['HOME']
            s1 = '%s%s' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = 'mkdir -p %s' % (s1)
            # os.mkdir(dir)  这个命令不识别linux 用户home目录“~”符号
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = 'chmod -R 755 %s' % (s1)
            if p_num == 1:
                chmod = 'chmod -R %s %s' % (mod[0], s1)
                self.exes(chmod)
                # os.chmod(dir, mod)
            else:
                # os.chmod(dir, stat.S_IRWXU|stat.S_IRGRP|stat.S_IROTH)   该行会抛出异常 TypeError: coercing to Unicode: need string or buffer, builtin_function_or_method found
                self.exes(chmod)
            return s1

    def mknod(self, filename, *mod):
        s1 = str(filename)
        if s1.startswith("~/"):
            s1 = s1[1:]
            homedir = os.environ['HOME']
            s1 = '%s%s' % (homedir, s1)
        if not os.path.exists(s1):
            cmd_shell = 'touch %s' % (s1)
            self.exes(cmd_shell)
            p_num = len(mod)
            chmod = 'chmod -R 644 %s' % (s1)
            if p_num == 1:
                chmod = 'chmod -R %s %s' % (mod[0], s1)
                self.exes(chmod)
            else:
                self.exes(chmod)
        return s1

    def get_filist_bytime(self,file_path):
        dir_list = os.listdir(file_path)
        if not dir_list:
            return
        else:
            # 注意,这里使用lambda表达式,将文件按照最后修改时间顺序升序排列
            # os.path.getmtime() 函数是获取文件最后修改时间
            # os.path.getctime() 函数是获取文件最后创建时间
            dir_list = sorted(dir_list, key=lambda x: os.path.getmtime(os.path.join(file_path, x)))
            return dir_list

    def shichaByMin(self, t1, t2):
        '''
        计算以下三种类型之间的时间差
    time.time()-浮点型,time.localtime()-struct_time型、datetime.now()-datetime型
        '''
        t1 = datetime.strptime(t1, "%Y-%m-%d %H:%M:%S")
        t2 = datetime.strptime(t2, "%Y-%m-%d %H:%M:%S")
        return round(self.total_seconds(t2 - t1) / 60)

    def total_seconds(self, time_delta):
        '''
        python 2.6 has not total_seconds method
        '''
        return 1.0 * (time_delta.microseconds + (time_delta.seconds + time_delta.days * 24 * 3600) * 10 ** 6) / 10 ** 6


class Properties:
    # 脚本默认配置路径
    oscheck_properties = '/tmp/.python-eggs/.oscheck.properties'
    file_name = ''
    oscmd = OSCmd()

    def __init__(self, file_name='.oscheck.properties'):
        '''
        将配置文件转化为列表,以列表的读取方式进行值的替换
        '''
        dr = '/tmp/.python-eggs/'
        if not os.path.exists(dr):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            self.oscmd.mkdir(dr,'777')

        if not os.path.exists(file_name):  # 当目录以~开头时该行永为True,但下面的语句会自判断
            file_name = self.oscmd.mknod(file_name,'666')
            # os.mknod(file_name)   ~

        self.file_name = file_name
        self.properties = {}
        try:
            fopen = open(self.file_name, 'r')
            for line in fopen:
                line = line.strip()
                if line.find('=') > 0 and not line.startswith('#'):
                    strs = line.split('=')
                    self.properties[strs[0].strip()] = strs[1].strip()
        except Exception as e:
            raise e
        else:
            fopen.close()

    def has_key(self, key):
        return key in self.properties

    def keys(self, key):
        return self.properties.keys();

    def get(self, key, default_value=''):
        if key in self.properties:
            return self.properties[key]
        return default_value

    def put(self, key, value):
        self.properties[key] = value
        self.replace_property(self.file_name, key + '=.*', key + '=' + str(value), True)

    def putjson(self, key, values):
        '''
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        '''
        pj = {key: values}
        value = json.dumps(pj)
        self.put(key, value)

    def getjsonvalue(self, key, default_value=''):
        '''
        以json的形式存储prop中的value,可以处理一些特殊字符,比如=
        '''
        if key in self.properties:
            value = self.get(key)
            valuedict = json.loads(value)
            return valuedict[key]
        return default_value

    def toasc(self, ss):
        asclst = []
        for em in bytearray(ss):
            asclst.append(em)
        return asclst

    def asctostr(self, asclst):
        ss = ''
        for em in asclst:
            ss += str(chr(em))
        return ss

    def putpwd(self, key, value):
        '''
        字符串以ASCII码存储
        '''
        asclst = self.toasc(value)
        self.putjson(key, asclst)

    def getpwd(self, key):
        asclst = self.getjsonvalue(key)
        pwd = self.asctostr(asclst)
        return pwd

    def replace_property(self, file_name, from_regex, to_str, append_on_not_exists=True):
        '''
        新写入数据后,替换文件以永久保存
        :param file_name:
        :param from_regex:
        :param to_str:
        :param append_on_not_exists:
        :return:
        '''
        import tempfile
        tmpfile = tempfile.TemporaryFile()

        if os.path.exists(file_name):
            r_open = open(file_name, 'r')
            pattern = re.compile(r'' + from_regex)
            found = None
            for line in r_open:
                if pattern.search(line) and not line.strip().startswith('#'):
                    found = True
                    line = re.sub(from_regex, to_str, line)
                if pyversion == 3:
                    line = line.encode(encoding="utf-8")
                tmpfile.write(line)
            if not found and append_on_not_exists:
                to_str = '\n' + to_str
                if pyversion == 3:
                    to_str = to_str.encode(encoding="utf-8")
                tmpfile.write(to_str)
            r_open.close()
            tmpfile.seek(0)

            content = tmpfile.read()

            if os.path.exists(file_name):
                os.remove(file_name)

            w_open = open(file_name, 'w')
            if pyversion == 3:
                content = content.decode('utf-8')
            w_open.write(content)
            w_open.close()
            tmpfile.close()
        else:
            print("file %s not found" % (file_name))


def parsePro(file_name='/tmp/.python-eggs/.oscheck.properties'):
    return Properties(file_name)


class Log:
    def __init__(self):
        '''
        Constructor
        '''

    logfile = os.path.join(localpath, 'scheck.log')

    def setlog(self, logfile):
        self.logfile = logfile

    def log(self, strs, sp='a'):
        open(self.logfile, sp).write('%s\n' % (strs))

    def inserttime(self, sp='a'):
        now = datetime.now()
        strs = now.strftime('%Y-%m-%d %H:%M:%S')
        oscmd = OSCmd()
        oscmd.mknod(self.logfile, '666')
        open(self.logfile, sp).write('%s\n' % (strs))

    def frmMsg(self,content):
        '''
        格式化信息输出
        :param content:
        :return:
        '''
        curtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        hname = socket.gethostname()
        IPADDR = socket.gethostbyname(hname)
        cnt = "内容:{0} <br/>时间:{1} <br/>信息来自 {2} {3}".format(content,curtime,hname,IPADDR)
        return cnt


class WeiXin(object):
    '''
    发送微信
    '''
    token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
    cropmsg ={
        'corpid' : 'wwe***2ed*********',
        'corpsecret' : 'Mgyi*****ahx3O-******HkLfg' 
        }
    sendmsg = {}
    access_token_key="weixin_access_token"
    time_token_key="weixin_tokenkey_gettime"
    oscmd = OSCmd()
    prop = parsePro()
    log = Log()

    def __init__(self):
        '''
        Constructor
        '''

    def formatContent(self,content):
        cnt=self.log.frmMsg(content)
        return cnt
        
    def setMsg(self,param):
        arg_num=len(param)
        content = param[0]
        content = self.formatContent(content)
        if pyversion == 2:
            content = content.decode('utf-8')


        if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            
        self.sendmsg = {
              "touser":touser,
              "agentid":agentid,
              "msgtype": "text",
              "text":{
                      "content":content
                      }
              }

    def updateToken(self):
        access_token_response = self.geturl(self.token_url, self.cropmsg)
        access_token = (json.loads(access_token_response)['access_token']).encode('utf-8')
        self.prop.put(self.access_token_key, access_token)
        self.prop.put(self.time_token_key, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


    def get_access_token(self):
        '''
        获取访问凭证,经过编码的一串字符串数据
        每两个小时取一次即可
        '''
        if not self.prop.has_key(self.time_token_key):
            self.updateToken()
        else:
            curtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            oldtime = self.prop.get(self.time_token_key)
            shicha = self.oscmd.shichaByMin(oldtime,curtime)
            if shicha > 110:
                self.updateToken()

        return self.prop.get(self.access_token_key)
    

    def encodeurl(self,dic):
        '''
        将字典转换为url参数传值方式,key1=value1&key2=value2
        '''
        data = ''
        for k,v in dic.items():
            data += '%s=%s%s' % (k,v,'&')
        return data
    
    
    def geturl(self,url,data):
        '''
        data为字典类型的参数,
        返回类型<type 'unicode'>,json
        '''
        data = self.encodeurl(data)
        response = urllib2.urlopen('%s?%s' % (url,data))
        return response.read().decode('utf-8')
                
    def posturl(self,url,data,isjson = True):
        '''
        发送json数据
        返回类型<type 'unicode'>,json
        '''
        if isjson:
            data = json.dumps(data)    #dict
            if pyversion == 3 :
                data = data.encode(encoding="utf-8")
            response = urllib2.urlopen(url,data)
            return response.read().decode('utf-8')
        
        
    def sampleSend(self,content):
        self.setMsg(content)
        # 获取企业访问凭证
        access_token = self.get_access_token()
        sendmsg_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' % access_token 
        print (self.posturl(sendmsg_url,self.sendmsg))
        
    def showExample(self):
        if len(sys.argv) == 4:
            touser,notuse,content = sys.argv[1:] 
        else:
            print ('error segments, now exit')
            sys.exit()
        
    def send(self,param):
        ll = Log()
        arg_num=len(param)
        if arg_num >=1:
            self.sampleSend(param)
            #ll.log('%s'%(param))
            
        return ''
    
    def showUsing(self):
        print ('There must be more than 1 params. For example:')
        print ('python sendweixin.py  "微信发送信息调试 "  ')
        print ('python sendweixin.py  "微信发送信息调试 " "微信用户ID" ')
        print ('注意事项,该脚本尚存在一个问题,首次发送微信时会报access_token missing,第二次无此问题')
        print ('python sendweixin.py  "微信发送信息调试 " "微信用户ID" "企业微信号"')
        print ('error segments, now exit')
        

if __name__ == '__main__':

    inp = sys.argv
    arg_num = len(inp)
    wxmsg = WeiXin()

    if arg_num > 1 :
        res = wxmsg.send(sys.argv[1:])
    else:
        wxmsg.showUsing()
        sys.exit()

 

重点为以下两段代码,企业与应用的标识

  cropmsg ={
        'corpid' : 'wwe***2ed*********',
        'corpsecret' : 'Mgyi*****ahx3O-******HkLfg' 
        }

如果输入一个参数,则默认发送给企业微信中可以访问该应用的所有用户;第二个参数指定具体的微信号

  if arg_num == 1 :
            touser="@all"
            agentid="1000009"
        elif arg_num == 2 :
            touser=param[1]
            agentid="1000009"
        elif arg_num == 3 :
            touser=param[1]
            agentid=param[2]
            

示例

$ python sendwx.py "阳光、沙滩、海浪、老船长……"
{"errcode":0,"errmsg":"ok","invaliduser":""}

 

转载于:https://www.cnblogs.com/perfei/p/10348930.html

相关文章:

  • 关于ios下字体描边的一个细节
  • tkinter学习系列(二)之窗口的设置
  • 多年父子成兄弟
  • p2519 [HAOI2011]problem a
  • SQL Server 变更数据捕获(CDC)监控表数据
  • java写文件实现换行
  • mount --bind使用方法
  • react 项目中 引入 bootstrap
  • “Usage of API documented as @since 1.8+”报错的解决办法
  • 【Spring系列】spring mvc整合任务调度
  • 【BZOJ2301】Problem B
  • linux 全部卸载python yum 重新安装
  • 【进阶4-4期】Lodash是如何实现深拷贝的
  • 提问的艺术
  • git学习(一) 如何将项目上传到github
  • 《用数据讲故事》作者Cole N. Knaflic:消除一切无效的图表
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • ECMAScript入门(七)--Module语法
  • es的写入过程
  • Javascript基础之Array数组API
  • Mysql优化
  • Redis在Web项目中的应用与实践
  • spring boot 整合mybatis 无法输出sql的问题
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • UEditor初始化失败(实例已存在,但视图未渲染出来,单页化)
  • underscore源码剖析之整体架构
  • win10下安装mysql5.7
  • 从地狱到天堂,Node 回调向 async/await 转变
  • ------- 计算机网络基础
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 小程序01:wepy框架整合iview webapp UI
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • 智能网联汽车信息安全
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • ​软考-高级-系统架构设计师教程(清华第2版)【第9章 软件可靠性基础知识(P320~344)-思维导图】​
  • # Swust 12th acm 邀请赛# [ K ] 三角形判定 [题解]
  • #100天计划# 2013年9月29日
  • #pragma once与条件编译
  • $分析了六十多年间100万字的政府工作报告,我看到了这样的变迁
  • (LeetCode C++)盛最多水的容器
  • (第27天)Oracle 数据泵转换分区表
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)计算机毕业设计ssm-Java网名推荐系统
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (汇总)os模块以及shutil模块对文件的操作
  • (论文阅读11/100)Fast R-CNN
  • (论文阅读30/100)Convolutional Pose Machines
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (删)Java线程同步实现一:synchronzied和wait()/notify()
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • .NET 命令行参数包含应用程序路径吗?
  • .project文件