Django 不通過外鍵實現多表關聯查詢


Django不通過外鍵實現多表關聯查詢

by:授客 QQ1033553122

 

測試環境

Win 10

 

Python 3.5.4

 

Django-2.0.13.tar.gz

 

 

需求

不通過外鍵,使用django orm語法實現多個表之間的關聯查詢,類似如下sql的查詢效果:

SELECT tb_project_version.*, tb_sprint.name, tb_project.name

FROM tb_project_version

JOIN tb_sprint ON tb_sprint.id=tb_project_version.sprint_id

JOIN tb_project ON tb_project.id=tb_project_version.project_id

 

數據表Model設計

 

class Sprint(models.Model):

    id = models.AutoField(primary_key=True, verbose_name='自增id')

    name = models.CharField(max_length=50, verbose_name='迭代名稱')

    ...略   

 

    class Meta:

        db_table = 'tb_sprint'

        verbose_name = '產品迭代表'

        verbose_name_plural = verbose_name

 

class Project(models.Model):

    id = models.AutoField(primary_key=True, verbose_name='自增id')

    name = models.CharField(max_length=50, verbose_name='項目名稱')

    ...略

 

    class Meta:

        db_table = 'tb_project'

        verbose_name = '項目表'

        verbose_name_plural = verbose_name

 

 

class ProjectVersion(models.Model):

    id = models.AutoField(primary_key=True, verbose_name='自增id')

    name = models.CharField(max_length=50, verbose_name='版本名稱')

    project_id = models.IntegerField(verbose_name='關聯的項目ID')

    sprint_id = models.IntegerField(verbose_name='關聯的迭代ID')

    ...略

   

    class Meta:

        db_table = 'tb_project_version'

        verbose_name = '項目版本表'

        verbose_name_plural = verbose_name

 

實現方法1-通過extra api函數實現

 

如下,帶背景色部分的內容為核心

 

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

    project = serializers.CharField(required=True)

    sprint = serializers.CharField(required=True)

 

    class Meta:

        model = ProjectVersion

        fields = '__all__'

        read_only_fields = ['project', 'sprint']

 

說明:如上,如果使用了django rest framework序列化,則需要為其序列化器添加model中不存在的字段,否則序列化后還是看不到對應的目標字段

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

    '''

    項目視圖-版本管理

    '''

    # 查詢列表數據

    def get(self, request, format=None):

        result = {}

        try:

            params =  request.GET

            page_size = int(params.get('pageSize'))

            page_no = int(params.get('pageNo'))

            name = params.get('name')

            project_id = params.get('projectId')

            sort = params.get('sort')

            if sort:

                sort_list = sort.split(',')

            else:

                sort_list = ['-id']

 

            startIndex = (page_no - 1) * page_size

            endIndex = startIndex + page_size

            filters = {'is_delete':0}

            if name:

                filters['name__startswith'] = name

            if project_id:

                filters['project_id'] = project_id

            projectVersions = ProjectVersion.objects.filter(**filters).extra(

                select={'project': 'SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id',

                        'sprint':'SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id'},

            )

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

            rows = ProjectVersionSerializer(rows, many=True).data

            total = projectVersions.count()

 

            result['msg'] =  '獲取成功'

            result['success'] =  True

            result['data'] = {}

            result['data']['rows'] = rows

            result['data']['total'] = total

            return Response(result, status.HTTP_200_OK)

        except Exception as e:

            result['msg'] =  '%s' % e

            result['success'] =  False

            return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

說明:

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等價於

 

SELECT (SELECT tb_project.name FROM tb_project WHERE tb_project.id = tb_project_version.project_id) AS `project`,

(SELECT tb_sprint.name FROM tb_sprint WHERE tb_sprint.id = tb_project_version.sprint_id) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假設startIndex=0, endIndex=10

 

projectVersions.count()

等價於

SELECT COUNT(*) AS `__count` FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0

 

 

 

上述查詢代碼的另一種實現

projectVersions =  Project.objects.filter(**filters).extra(

select={'project:'tb_project.name',

        'sprint':' tb_sprint.name',

tables=['tb_project', 'tb_sprint'],

where=['tb_project.id=tb_project_version.project_id', 'tb_sprint.id = tb_project_version.sprint_id']

)

rows = projectVersions.order_by(*sort_list)[startIndex:endIndex]

rows = ProjectVersionSerializer(rows, many=True).data

total = projectVersions.count()

 

 

projectVersions.order_by(*sort_list)[startIndex:endIndex]

 

等價於

 

SELECT (tb_project.name) AS `project`,

(tb_sprint.name) AS `sprint`,

`tb_project_version`.`id`,

`tb_project_version`.`name`,

`tb_project_version`.`project_id`,

`tb_project_version`.`sprint_id`,

...略

FROM `tb_project_version`

WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

ORDER BY `tb_project`.`id` DESC LIMIT 10 # 假設startIndex=0, endIndex=10

 

 

projectVersions.count()

等價於

SELECT COUNT(*) AS `__count` FROM `tb_project_version` , `tb_project` , `tb_sprint` WHERE `tb_project_version`.`is_delete` = 0 AND (tb_project.id=tb_project_version.project_id) AND (tb_sprint.id = tb_project_version.sprint_id)

 

 

實現方法2-通過django rest framework實現

serializers.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

from rest_framework import serializers

from backend.models import ProjectVersion

from backend.models import Sprint

from backend.models import Project

 

 

# ProjectVersion model 序列化器

class ProjectVersionSerializer(serializers.ModelSerializer):

    project = serializers.SerializerMethodField()

    sprint = serializers.SerializerMethodField()

 

    def get_sprint(self, obj):

        """

        :param obj: 當前ProjectVersion的實例

        """

        current_project_version = obj

        sprint = Sprint.objects.filter(id=current_project_version.sprint_id).first()

        if sprint:

            return sprint.name

        else:

            return '--'

 

    def get_project(self, obj):

        """

        :param obj: 當前ProjectVersion的實例

        """

        current_project_version = obj

        project = Project.objects.filter(id=current_project_version.project_id).first()

        if project:

            return project.name

        else:

            return '--'

 

    class Meta:

        model = ProjectVersion

        fields = '__all__'

        read_only_fields = ['project', 'sprint']

 

project_version_views.py

#!/usr/bin/env python

# -*- coding:utf-8 -*-

 

__author__ = '授客'

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

 

from backend.models import ProjectVersion

from backend.serializers import ProjectVersionSerializer

 

 

 

class ProjectVersionListAPIView(APIView):

    '''

    項目視圖-版本管理

    '''

    # 查詢列表數據

    def get(self, request, format=None):

        result = {}

        try:

            params =  request.GET

            page_size = int(params.get('pageSize'))

            page_no = int(params.get('pageNo'))

            name = params.get('name')

            project_id = params.get('projectId')

            sort = params.get('sort')

            if sort:

                sort_list = sort.split(',')

            else:

                sort_list = ['-id']

 

            startIndex = (page_no - 1) * page_size

            endIndex = startIndex + page_size

            filters = {'is_delete':0}

            if name:

                filters['name__startswith'] = name

            if project_id:

                filters['project_id'] = project_id

            rows = ProjectVersion.objects.filter(**filters).order_by(*sort_list)[startIndex:endIndex]

            rows = ProjectVersionSerializer(rows, many=True).data

            total = ProjectVersion.objects.filter(**filters).count()

 

            result['msg'] =  '獲取成功'

            result['success'] =  True

            result['data'] = {}

            result['data']['rows'] = rows

            result['data']['total'] = total

            return Response(result, status.HTTP_200_OK)

        except Exception as e:

            result['msg'] =  '%s' % e

            result['success'] =  False

            return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

方法3-通過raw函數執行原生sql

以下是項目中的一個實例,和本文上述內容沒有任何關聯,關鍵部分背景已着色,筆者偷懶,不做過多解釋了,簡單說下下面這段代碼對用途:

 

主要是實現類似以下查詢,獲取指定分頁對數據以及滿足條件的記錄記錄總數。

 

SELECT tb_project.*, project_name_associated, project_id_associated, platform FROM tb_project

LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id

ORDER BY id DESC

LIMIT 0,10

 

 

from rest_framework.views import APIView

from rest_framework.response import Response

from rest_framework import status

from backend.models import Project

from backend.serializers import ProjectSerializer

 

import logging

 

 

logger = logging.getLogger('mylogger')

 

class ProjectListAPIView(APIView):

    '''

    項目視圖-項目管理-項目列表

    '''

 

    # 查詢列表數據

    def get(self, request, format=None):

        result = {}

        try:

            params =  request.GET

            page_size = int(params.get('pageSize'))

            page_no = int(params.get('pageNo'))

            name = params.get('name')

            project_status = params.get('status')

            sort = params.get('sort')

 

            order_by = 'id desc'

            if sort:

                order_by = sort

          

            startIndex = (page_no - 1) * page_size

 

            where = 'WHERE tb_project.is_delete=0 '

            filters = {'is_delete':0}

            if name:

                filters['name__startswith'] = name

                where += 'AND locate("%s", name) ' % name

 

            if project_status:

                where += "AND status='%s'" % project_status

 

            sql = 'SELECT tb_project.id, COUNT(1) AS count FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id '

            query_rows = Project.objects.raw(sql)

            total = query_rows[0].__dict__.get('count') if query_rows else 0

 

            sql =  'SELECT tb_project.*,project_name_associated, project_id_associated, platform FROM tb_project LEFT JOIN tb_project_associated ON tb_project.id=tb_project_associated.project_id ' \

                    '%s ORDER BY %s ' \

                    'LIMIT %s,%s ' % (where,order_by, startIndex, page_size)

            query_rows = Project.objects.raw(sql)

            rows = []

            for item in query_rows:

                item.__dict__.pop('_state')

                item.__dict__['create_time'] = item.__dict__['create_time'].strftime('%Y-%m-%d %H:%M:%S')

                item.__dict__['update_time'] = item.__dict__['update_time'].strftime('%Y-%m-%d %H:%M:%S')

                item.__dict__['begin_time'] = item.__dict__['begin_time'].strftime('%Y-%m-%d')

                item.__dict__['end_time'] = item.__dict__['end_time'].strftime('%Y-%m-%d')

                rows.append(item.__dict__)

        

            result['msg'] =  '獲取成功'

            result['success'] =  True

            result['data'] = {}

            result['data']['rows'] = rows

            result['data']['total'] = total

            return Response(result, status.HTTP_200_OK)

        except Exception as e:

            result['msg'] =  '%s' % e

            result['success'] =  False

            return Response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)

 

 

參考鏈接

https://docs.djangoproject.com/en/1.11/ref/models/querysets/#django.db.models.query.QuerySet.extra

https://www.jianshu.com/p/973971880da7


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM