Django+Celery框架自動化定時任務開發


  本章介紹使用DjCeleryDjango+Celery框架開發定時任務功能,在Autotestplat平台上實現單一接口自動化測試腳本、業務場景接口自動化測試腳本、App自動化測試腳本、Web自動化測試腳本等任務的定時執行、調度、管理等,從而取代Jenkins上的定時執行腳本和發送郵件等功能。

自動化測試邏輯流程圖11.1所示。       

640?wx_fmt=png

    ▲圖11.1

11.1  環境搭建

1.安裝

步驟1  安裝Celerypyramid_celery-3.0.0

    配置https://pypi.python.org/pypi/pyramid_celery/

步驟2  安裝django-clerydjango-celery-3.2.2

    配置https://pypi.python.org/pypi/django- celery

         INSTALLED_APPS= []

   加入2

             'djcelery',

               運行 Python manage.py migrate

步驟 3  安裝celery-with-redis-3.0

    地址為https://pypi.python.org/pypi/celery-with-redis/

步驟 4  安裝django-clery-beatdjango-celery-beat-1.1.0

          https://pypi.python.org/pypi/ django_celery_beat

步驟5 下載Redis-x64-3.2.100

 Redis-x64-3.2.100.zip         https://github.com/MicrosoftArchive/redis/releases

2.配置

步驟1  Settings.py中增加如下內容。

加入1

import djcelery 

djcelery.setup_loader()    #加載djcelery 

加入2:

#數據庫調度

CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'

加入3:

BROKER_URL = 'redis://127.0.0.1:6379/0'

BROKER_TRANSPORT = 'redis'

步驟2  在應用Apitest目錄下新建celery.py文件1,加入如下內容。

from __future__ import absolute_import

import os

import django

from celery import Celery

from django.conf import settings

 

os.environ.setdefault('DJANGO_SETTINGS_MODULE','autotest.settings')

django.setup()

 

app = Celery('autotest')

 

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

步驟3  新建tasks.py文件,加入如下內容。

# -*-coding:utf-8 -*-

importrequests, time, sys, re

importurllib, zlib#,

importpymysql

importunittest

from traceimport CoverageResults

importjson

fromidlelib.rpc import response_queue

fromapitest.celery import app

from timeimport sleep

 

@app.task

def hello_world():

    print('已運行')

步驟4  啟動服務python manage.py runserver

步驟5  解壓縮后,運行CMD,切換到相應目錄,輸入啟動Redis指令redis-server redis. windows.conf,成功后出現如圖11.2所示界面。

640?wx_fmt=png

▲圖11.2

步驟6  啟動指令python manage.py celery worker -l info

步驟7  啟動指令python manage.py celery beat

11.2  前端功能實現

1.功能描述

完成實現單一接口測試用例、業務場景接口API測試用例、AppUI測試用例、WebUI測試用例的自動化定時任務。

2.程序清單

autotest\apitest\templates目錄下新建periodic_task.html文件,加入如下內容。

<html>

<head>

{% load bootstrap4 %}

{% bootstrap_css %}

{% bootstrap_javascript %}

<title>產品自動化測試平台</title>

<link rel="stylesheet"type="text/css" href="/static/admin/css/forms.css" />

<script type="text/javascript"src="/admin/jsi18n/"></script>

<script type="text/javascript"src="/static/admin/js/vendor/jquery/jquery.js"></script>

<script type="text/javascript"src="/static/admin/js/jquery.init.js"></script>

<script type="text/javascript"src="/static/admin/js/core.js"></script>

<script type="text/javascript"src="/static/admin/js/admin/RelatedObjectLookups.js"></script>

<script type="text/javascript"src="/static/admin/js/actions.js"></script>

<script type="text/javascript"src="/static/admin/js/urlify.js"></script>

<script type="text/javascript"src="/static/admin/js/prepopulate.js"></script>

<script type="text/javascript"src="/static/admin/js/vendor/xregexp/xregexp.js"></script>

<meta name="viewport"content="user-scalable=no, width=device-width, initial-scale=1.0,maximum-scale=1.0">

   

<link rel="stylesheet"type="text/css" href="/static/admin/css/responsive.css"/>

 

<meta name="robots"content="NONE,NOARCHIVE" />

 

</head>

<body role="document">

<!-- 導航欄-->

<nav class="navbar navbar-expand-smbg-dark navbar-dark fixed-top">

<div>

<ahref="#">&nbsp;</a>

<ul>

</ul>

<ul>

<li><astyle='color:white' href="#"></a></li>

<li><astyle='color:white' href="/logout/"></a></li>

</ul>

</div>

</nav>

<!-- 搜索欄-->

<div>

<formmethod="get" action="/tasksearch/">

 

{% csrf_token %}    

<input type="search"name="task" placeholder="名稱" required>

 

<button type="submit">搜索</button>

  

<!-- 增加定時任務-->

<div >

 

<select name="PeriodicTask"id="PeriodicTask">

<option value="" selected>----定時任務----</option>

</select>

<a id="change_id_PeriodicTask"data-href-template="/admin/djcelery/periodictask/__fk__/change/?_to_field=id&amp;_popup=1"title="更改選中的定時任務">

<imgsrc="/static/admin/img/icon-changelink.svg" alt="修改"/>

</a>

<a style='color:light blue' id="add_id_PeriodicTask" href="/admin/djcelery/periodictask/add/?_to_field=id&amp;_popup=1"title="增加另一個測試用例">

<imgsrc="/static/admin/img/icon-addlink.svg" alt="增加"/>增加

</a>

</form>

</div>

 

<!-- 任務計划列表-->

<div>

<div>

<table class="table table-striped">

<thead>

 

<tr>

<th>ID</th><th>任務名稱</th><th>任務模塊</th><th>時間計划</th><th>修改時間</th><th>開啟</th><th>立即</th><th>編輯</th><th>刪除</th>

</tr>

</thead>

<tbody>

{% for task in tasks %}{% for periodic inperiodics %}

<tr>

 

{% if task.interval_id != null andtask.interval_id == periodic.id %}

<td>{{ task.id }}</td>

<td>{{ task.name }}</td>

<td>{{ task.task }}</td>

<td><a style='color:green'>{{ periodic.period }} {{ periodic.every}}</a></td>

<td>{{ task.date_changed }}</td>

<td>{{ task.enabled }}</td>

<td>{% if task.id == 1 %}

<a href="../task_apis"target="mainFrame">運行</a>

{% elif task.id == 2 %}

<a href="../task_apitest"target="mainFrame">運行</a>

{% else %}

{% endif %}

</td>

<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/change/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-changelink.svg"/></a></td>

<td><a style='color:light blue'class="related-widget-wrapper-link add-related" id="add_id_Apitest"href="../admin/djcelery/periodictask/{{ task.id}}/delete/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-deletelink.svg"/></a></td>

 

{% else %}

 

{% endif %}

 

{% for crontab in crontabs %}

{% if task.crontab_id != null and task.crontab_id ==crontab.id and task.interval_id == 1 %}

<td>{{ task.id }}</td>

<td>{{ task.name }}</td>

<td>{{ task.task }}</td>

<td><a style='color:green'>{{crontab.month_of_year }}{{crontab.day_of_month }}{{crontab.day_of_week }}{{crontab.hour }}{{ crontab.minute}}</a></td>

<td>{{ task.date_changed }}</td>

<td>{{ task.enabled }}</td>

<td><a href="../task_apis"target="mainFrame">運行</a></td>

<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/change/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-changelink.svg"/></a></td>

<td><a style='color:light blue'class="related-widget-wrapper-link add-related"id="add_id_Apitest" href="../admin/djcelery/periodictask/{{task.id }}/delete/?_to_field=id&_popup=1"><imgsrc="/static/admin/img/icon-deletelink.svg"/></a></td>

{% else %}

{% endif %}

{% endfor %}{% endfor %}{% endfor %}

 

</tbody>

</table>

</div>

</div>

 

<span >   {# 把翻頁功能固定顯示在右下角#}

 

<div >

<tr><th>總數</th><td>{{ taskcounts }}</td></tr> {# 前端讀取定義的變量#}

</div>

 

<div>

    <ulclass="pagination" id="pager">

          {#上一頁鏈接開始#}

        {%if tasks.has_previous %}

           {#  如果有上一頁,則正常顯示上一頁鏈接#}

           <li><ahref="/periodic_task/?page={{ tasks.previous_page_number }}">上一頁</a></li>    {#  上一頁標簽 #}

        {%else %}

           <li class="previous disabled"><ahref="#">上一頁</a></li>{# 如果當前不存在上一頁,則上一頁的鏈接不可單擊#}

        {%endif %}

        {# 上一頁鏈接開始#}

       

        {%for num in tasks.paginator.page_range %}

        

           {% if num == currentPage %}

                <li><a href="/periodic_task/?page={{ num }}">{{ num}}</a></li> {#顯示當前頁數鏈接#}

            {% else %}

                <liclass="item"><a href="/periodic_task/?page={{ num}}">{{ num }}</a></li>

            {% endif %}

        {% endfor %}

       

        {# 下一頁鏈接開始#}

        {% if tasks.has_next %} {#  如果有下一頁,則正常顯示下一頁鏈接#}

            <liclass="next"><a href="/periodic_task/?page={{tasks.next_page_number }}">下一頁</a></li>

        {% else %}

            <li><a href="#">下一頁</a></li>

        {% endif %}

        {# 下一頁鏈接結束#}

    </ul>

</div>

 

</body>

</html>

功能描述:實現自動化測試任務調度執行,包括單一接口、掃描、流程接口、業務場景、Web搜索、自動化平台測試開發、App登錄,CSDN定時任務注冊,定時任務執行等功能。

程序清單:在apitest/views.py中加入如下內容。

from .tasks importhello_world

from .tasks importtest_readSQLcase

from djcelery.modelsimport PeriodicTask,CrontabSchedule,IntervalSchedule

 

 

# 任務計划

@login_required

defperiodic_task(request):

    username = request.session.get('user', '')

    task_list = PeriodicTask.objects.all()

    task_count =PeriodicTask.objects.all().count()  #統計數

    periodic_list =IntervalSchedule.objects.all()  # 周期任務(如每隔1小時執行1次)

    crontab_list =CrontabSchedule.objects.all()    # 定時任務(如某年某月某日的某時,每# 天的某時)

    paginator = Paginator(task_list, 5)  #生成paginator對象,設置每頁顯示5條記錄

    page = request.GET.get('page',1)  #獲取當前的頁碼數,默認為第1

    currentPage=int(page)  #把獲取的當前頁碼數轉換成整數類型

    try:

        task_list = paginator.page(page)#獲取當前頁碼數的記錄列表

    except PageNotAnInteger:

        task_list = paginator.page(1)#如果輸入的頁數不是整數,則顯示第1頁內容

    except EmptyPage:

        task_list =paginator.page(paginator.num_pages)#如果輸入的頁數不在系統的頁數中,# 則顯示最后一頁內容

    return render(request,"periodic_task.html", {"user": username,"tasks":task_list,"taskcounts": task_count, "periodics":periodic_list,"crontabs": crontab_list })

 

# 搜索功能

@login_required

deftasksearch(request):

    username = request.session.get('user', '')# 讀取瀏覽器登錄Session

    search_name =request.GET.get("task", "")

    task_list = PeriodicTask.objects.filter(task__icontains=search_name)

    periodic_list =IntervalSchedule.objects.all()  # 周期任務(如每隔1小時執行1次)

    crontab_list =CrontabSchedule.objects.all()    # 定時任務(如某年某月某日的某時,每# 天的某時)

    return render(request,'periodic_task.html',{"user": username,"tasks":task_list,"periodics":periodic_list,"crontabs": crontab_list })

 

autotest/urls.py中加入:

path('periodic_task/',views.periodic_task),

    path('tasksearch/', views.tasksearch),

 

apitest/left.html中加入:

<tr> <td>

                    <li>

                        <a  href="../periodic_task"target="mainFrame">

                            <iclass="glyphicon glyphicon-fire"></i>

                           任務計划       

                        </a>

                    </li>

</tr> </td>

            <tr><td>&nbsp;</td></tr>

查看前端頁面效果,如圖11.3所示。

640?wx_fmt=png

▲圖11.3

本篇選自:《自動化平台測試開發-Python測試開發實戰》

- To Be Continued -


640?wx_fmt=png

👇戳【閱讀原文】加入星球,即刻同行!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM