基于 codo-cron 修改原来apscheduler任务传入exec_shell的修改实践



  • 修改 job_from.py 函数

    def job_from(**jobargs):
        """
        :param jobargs: job 参数包括 cron params job_id args
        :return:
        """
        job_id = jobargs['job_id']
        func = getattr(cron_jobs, 'exec_task')
        args = jobargs['params']
        cron = jobargs['cron'].split(' ')
        cron_rel = dict(second=cron[0], minute=cron[1], hour=cron[2], day=cron[3], month=cron[4], day_of_week=cron[5])
        scheduler.add_job(func=func, id=job_id, kwargs={'params': args, 'job_id': job_id}, trigger='cron', **cron_rel,
                          replace_existing=True)
        return job_id
    
    

    cron_jobs.py 增加cron_task

    cron_jobs 修改exec_cmd 增加 exec_task(params, job_id)
    这样可以执行本地任意代码注册的脚本任务。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import subprocess
    from .models import CronLog
    try:
        from ops.libs.opssdk.logs import Log
        logging = Log(log_flag='apsd_running')
    except:
        import logging
    
    
    def exec_shell(cmd):
        """执行shell命令函数"""
        sub = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        stdout, stderr = sub.communicate()
        ret = sub.returncode
        print(stdout)
        if ret == 0:
            return ret, stdout.decode('utf-8').split('\n')
        else:
            return ret, stdout.decode('utf-8').replace('\n', '')
    
    
    def exec_cmd(cmd, job_id):
        """执行CMD命令,记录日志"""
        recode, stdout = exec_shell(cmd)
        print('cmd', recode, stdout)
        CronLog(job_id=job_id, status='success' if recode == 0 else 'faild', task_cmd=cmd, task_log=str(stdout)).save()
        if recode != 0:
            print('[Error] (%s) failed' % cmd)
            exit(407)
        print('[Success] (%s) success' % cmd)
        return stdout
    
    
    def exec_task(params, job_id):
        func_path, args = 'ops.tasks', ('Default Task OutPut!', )
    
        if type(params) == str:
            return exec_cmd(params, job_id)
    
        if type(params) == list:
            func_path, args = tuple(params)
            # print(task_name)
    
        if type(params) == dict:
            try:
                func_path = params["func_path"]
            except:
                raise AttributeError("错误的命名传入")
            args = params['args'] if 'args' in params.keys() else []
            # kwargs = params['kwargs'] if 'kwargs' in params.keys() else {}
        # 执行本地注册的task任务
        try:
            func_name = func_path.split('.')[-1]
            func_mudule = __import__('.'.join(func_path.split('.')[:-1]), fromlist=True)
            __func = getattr(func_mudule, func_name)
            try:
                __func.delay(*args)
            except Exception as e:
                logging.warn(e)
                __func(*args)
                logtxt = str(e)
                logging.info('异步任务执行失败,改同步成功!')
            run_code = 0
        except Exception as e:
            logtxt = str(e)
            run_code = 1
            return "执行错误!传入的参数不合法"
        finally:
            # 开始记录执行状态的日志
            CronLog(job_id=job_id,
                    status='success' if run_code == 0 else 'faild',
                    task_cmd=str(params), task_log=str(logtxt)).save()
    
    
    if __name__ == '__main__':
        pass
    
    

    测试示例

    from requests.api import request
    
    
    def test_add_job():
        resp = request(method="post",
                       url="http://192.168.2.41:18033/v1/cron/job/",
                       json=dict(cron="*/12 * * * * *", job_id="10732",
                                 params={"func_path": "ops.apscheduler.registerd_tasks.print5", "args": ['dayingffff', ]}),
                       headers={'Content-Type': 'application/json'},
                       verify=False)
    
        print(resp.status_code)
        print(resp.content.decode('utf-8'))
    
    
    # http patch http://192.168.2.228:18033/v1/cron/job/ job_id=10732
    def test_list_jobs():
        resp = request(method="get",
                       url="http://localhost:4033/v1/cron/job/",
                       headers={'Content-Type': 'application/json'},
                       verify=False)
    
        print(resp.status_code)
        print(resp.content.decode('utf-8'))
    
    
    if __name__ == '__main__':
        test_add_job()
    
    
    


  • 很棒!可以把固定的逻辑写进去


登录后回复