Veo 3 模型架构
第二阶段是 Veo 3 模型架构的编码实现。此阶段是整个项目的核心与难点所在,涉及 Veo 3 模型具体架构的设计与代码实现。
Veo 3 支持通过以下两种类型的输入生成高质量视频:
文本提示 (Text prompts):对期望视频内容的自然语言描述,例如“向日葵绽放的延时摄影 (A timelapse of a sunflower blooming)。”
图像提示 + 文本提示 (Image prompts + Text prompts):图像与文本描述的组合,例如,提供一张向日葵图片,并配合文本“向日葵绽放的延时摄影 (A timelapse of a sunflower blooming)”。
其核心流程可图示如下:
流程概述:
视频与音频分别通过视频编码器 (Video Encoder)与音频编码器 (Audio Encoder)进行编码。
图像与文本则利用 Google 提供的 UL2 编码器进行编码。
编码后的视频与音频(通常会加入一定的噪声),连同嵌入式图像与文本信息,共同输入到一个基于 Transformer 架构的联合去噪器 (Joint Denoiser)。
该联合去噪器处理这些多模态输入,并生成视频与音频的联合表示。
接下来将逐步实现这些组件。首先,导入此阶段所需的 Python 库。
import imageio
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import Dataset, DataLoader
import torchaudio
from torchvision import transforms
from diffusers import DDPMScheduler
from transformers import CLIPProcessor, CLIPModel
from einops import rearrange
import numpy as np
import skimage.transform
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
数据重组
在数据预处理阶段,最终的数据集仅包含视频文件。然而,Veo 3 模型支持图像与文本结合的输入方式,并且音频数据需要单独处理。因此,首要任务是对数据集进行重组,为每种数据类型建立清晰的结构。
目标数据结构如下: 📁 dataset/ ├── 📁 t2v_pairs/ # dir containing text to videos pairs training data # 包含文本到视频对训练数据的目录 │ ├── 📄 video_1.mp4 # video file # 视频文件 │ ├── 📄 video_1.wav # audio file (extracted from video_1.mp4) # 音频文件 (从 video_1.mp4 提取) │ ├── 📄 video_1.txt # text prompt or annotation for video_1.mp4 # video_1.mp4 的文本提示或标注 │ └── ... └── 📁 i2v_pairs/ # dir containing image to videos pairs training data # 包含图像到视频对训练数据的目录 ├── 📄 image_1.jpg # input image # 输入图像 ├── 📄 video_1.mp4 # generated video file # 生成的视频文件 ├── 📄 video_1.wav # audio file (extracted from video_1.mp4) # 音频文件 (从 video_1.mp4 提取) └── ...
此步骤主要涉及常规的 Python 文件操作与数据组织,其具体代码实现已整合至 transform_data.py 文件中,可用于将前述处理得到的修剪后视频数据转换为上述 dataset/ 目录结构。此处不再赘述其实现细节。
dataset/ 目录准备就绪后,需进一步依据音频、图像和视频文件的路径,创建结构化的数据集变量。这将便于在模型训练和推理阶段高效地访问数据。
首先,定义包含成对数据集的目录路径: # Paths # 路径定义 i2v_dir = 'dataset/i2v_pairs' t2v_dir = 'dataset/t2v_pairs'
i2v_files = os.listdir(i2v_dir)
t2v_files = os.listdir(t2v_dir)
构建文本到视频 (Text-to-Video, T2V) 的数据对信息。
t2v_info = []
t2v_videos = [f for f in t2v_files if f.endswith('.mp4')]
for idx, video_file in enumerate(t2v_videos, start=1):
base_name = os.path.splitext(video_file)[0]
audio_file = f"{base_name}.wav"
annotation = next((a for a in video_annotations if a['video_path'].split('\\')[-1] == video_file), None)
caption = annotation['summary'] if annotation else ''
t2v_info.append({
'id': f"t2v_{idx:03d}",
'video_path': video_file,
'audio_path': audio_file,
'initial_caption': caption
})
类似地,构建图像加文本到视频 (Image+Text-to-Video, I2V) 的数据对信息。
i2v_info = []
i2v_images = [f for f in i2v_files if f.endswith('.jpg')]
for idx, image_file in enumerate(i2v_images, start=1):
base_name = os.path.splitext(image_file)[0]
video_file = f"{base_name}.mp4"
audio_file = f"{base_name}.wav"
annotation = next((a for a in video_annotations if a['video_path'].split('\\')[-1] == video_file), None)
prompt = annotation['summary'] if annotation else ''
i2v_info.append({
'id': f"i2v_{idx:03d}",
'image_path': image_file,
'target_video_path': video_file,
'target_audio_path': audio_file,
'prompt': prompt
})
检查生成的数据对数量。
len(i2v_info), len(t2v_info)
(5, 17)
为进行模型训练,需将训练数据(包括视频、音频、图像)转换为张量 (tensors)。张量是可供模型处理并在训练过程中优化的多维数组。本实现将使用 torch 库完成数据到张量的转换。
定义两个核心的预处理函数:一个用于视频数据,另一个用于音频数据。同时,声明一些后续流程中将使用的常量与参数。
BASE_VIDEO_FRAMES = 16
def preprocess_video(path, target_height, target_width):
reader = imageio.get_reader(path, 'ffmpeg')
frames = []
for i, f in enumerate(reader):
if i >= BASE_VIDEO_FRAMES:
break
resized = skimage.transform.resize(f, (target_height, target_width), anti_aliasing=True)
tensor = torch.from_numpy(resized).permute(2, 0, 1).float()
frames.append(tensor)
if len(frames) < BASE_VIDEO_FRAMES and frames:
pad_frame = torch.zeros_like(frames[0])
frames.extend([pad_frame] * (BASE_VIDEO_FRAMES - len(frames)))
return torch.stack(frames, dim=1) if frames else None
视频预处理函数定义完毕,其中包含一个关键参数 BASE_VIDEO_FRAMES。接下来定义音频文件的预处理函数,并设定音频相关的特定参数。
AUDIO_SAMPLE_RATE = 16000
AUDIO_DURATION_SEC = 2
AUDIO_SAMPLES = AUDIO_SAMPLE_RATE * AUDIO_DURATION_SEC
def preprocess_audio(path):
w, sr = torchaudio.load(path)
if w.shape[0] > 1:
w = w[:1, :]
if sr != AUDIO_SAMPLE_RATE:
w = torchaudio.transforms.Resample(sr, AUDIO_SAMPLE_RATE)(w)
if w.shape[1] < AUDIO_SAMPLES:
w = F.pad(w, (0, AUDIO_SAMPLES - w.shape[1]))
else:
w = w[:, :AUDIO_SAMPLES]
max_val = torch.max(torch.abs(w))
if max_val > 0:
w = w / max_val
return w
训练过程关注两种视频张量:基础视频张量 (base video tensor) 与上采样视频张量 (upsampled video tensor)。基础视频张量由原始视频帧构成,而上采样视频张量则通过重复帧以匹配目标帧数来创建。
为统一处理图像、视频和音频数据,需在现有预处理函数的基础上构建两个新的数据加载函数。
UPSAMPLED_VIDEO_HEIGHT = 64
UPSAMPLED_VIDEO_WIDTH = 64
BASE_VIDEO_HEIGHT = 32
BASE_VIDEO_WIDTH = 32
T2V_DATA_DIR = 'dataset/t2v_pairs'
I2V_DATA_DIR = 'dataset/i2v_pairs'
参数定义完成后,编写 T2V 和 I2V 数据集的加载逻辑。
def load_i2v_item(info):
img_pil = Image.open(
os.path.join(I2V_DATA_DIR, info['image_path'])
).convert('RGB')
img_base = transforms.ToTensor()(
img_pil.resize((BASE_VIDEO_WIDTH, BASE_VIDEO_HEIGHT))
)
img_upsampled = transforms.ToTensor()(
img_pil.resize((UPSAMPLED_VIDEO_WIDTH, UPSAMPLED_VIDEO_HEIGHT))
)
vid_base = preprocess_video(
os.path.join(I2V_DATA_DIR, info['target_video_path']),
BASE_VIDEO_HEIGHT, BASE_VIDEO_WIDTH
)
vid_upsampled = preprocess_video(
os.path.join(I2V_DATA_DIR, info['target_video_path']),
UPSAMPLED_VIDEO_HEIGHT, UPSAMPLED_VIDEO_WIDTH
)
aud = preprocess_audio(
os.path.join(I2V_DATA_DIR, info['target_audio_path'])
)
return {
'modality': 'i2v',
'input_image_base': img_base,
'input_image_upsampled': img_upsampled,
'target_video_base': vid_base,
'target_video_upsampled': vid_upsampled,
'target_audio': aud,
'caption': info['prompt'],
'id': info['id']
}
以及 T2V 数据项的加载逻辑:
def load_t2v_item(info):
base_vid = preprocess_video(
os.path.join(T2V_DATA_DIR, info['video_path']),
BASE_VIDEO_HEIGHT, BASE_VIDEO_WIDTH
)
upsampled_vid = preprocess_video(
os.path.join(T2V_DATA_DIR, info['video_path']),
UPSAMPLED_VIDEO_HEIGHT, UPSAMPLED_VIDEO_WIDTH
)
audio = preprocess_audio(
os.path.join(T2V_DATA_DIR, info['audio_path'])
)
if base_vid is not None and upsampled_vid is not None and audio is not None:
return {
'modality': 't2v',
'video_base': base_vid,
'video_upsampled': upsampled_vid,
'audio': audio,
'caption'
: info['initial_caption'],
'id': info['id']
}
return None
至此,数据重组的最后一步——将数据转换为适合训练的张量格式——已准备就绪。
t2v_data = [
d for d in [
load_t2v_item(i) for i in tqdm(t2v_info, desc="Loading T2V data")
] if d
]
i2v_data = [
d for d in [
load_i2v_item(i) for i in tqdm(i2v_info, desc="Loading I2V data")
] if d
]
将更新后的张量数据合并为一个统一的数据变量。
raw_data = t2v_data + i2v_data
原始数据已成功转换为适合训练的格式。在进入视频或音频编码器实现之前,需创建一个 MultiModalDataset 类,用于统一管理数据加载与批处理。该类将以结构化方式封装所有转换后的数据,便于访问不同模态(视频、音频、图像、文本)及其对应的字幕信息。
创建多模态数据集 (MultiModalDataset)
采用面向对象编程(OOP)是组织训练数据的恰当方式。此处将创建一个 MultiModalDataset 类,负责 T2V 和 I2V 数据集的数据加载与处理。
BATCH_SIZE = 1
class MultiModalDataset(Dataset):
def __init__(self, data_list):
self.data = data_list
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
item = self.data[idx]
caption_string = item['caption']
if item['modality'] == 't2v':
return {
'modality': 't2v',
'video_base': item['video_base'],
'video_upsampled': item['video_upsampled'],
'audio': item['audio'],
'raw_caption': caption_string,
'input_image_base': torch.zeros_like(item['video_base'][:, 0]),
}
else:
return {
'modality': 'i2v',
'video_base': item['target_video_base'],
'video_upsampled': item['target_video_upsampled'],
'audio': item['target_audio'],
'raw_caption': caption_string,
'input_image_base': item['input_image_base'],
}
通过在原始数据之上实例化 MultiModalDataset 类,即可构建数据集对象。该类继承自 torch.utils.data.Dataset,后者是 PyTorch 提供的标准接口,用于以批处理方式加载和处理数据。
train_dataset = MultiModalDataset(raw_data)
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
至此,数据已准备就绪。接下来,我们将着手实现 Veo 3 模型的具体架构,包括视频编码器、音频编码器、条件编码器、联合去噪器等多个核心组件。
视频变分自编码器 (Video VAE)
原始数据经过预处理并重组为 PyTorch Dataset 后,为训练视频生成模型奠定了基础。现在,我们开始构建模型的第一个组件:视频变分自编码器 (Video Variational Autoencoder, Video VAE)。
变分自编码器(VAE)是一种广泛应用于无监督学习,特别是生成模型领域的神经网络架构。它主要由两部分构成:
编码器 (Encoder):将输入数据(如图像或视频)映射到一个低维的潜在空间 (Latent Space) 表示。
解码器 (Decoder):将潜在空间表示映射回原始数据空间,从而实现对输入数据的重建。
其基本架构如下图所示:
其工作流程如下:
编码器接收输入张量,并将其编码为一个潜在向量 (Latent Vector)。该向量捕获了输入数据的关键特征(例如,图像或视频帧的形状、颜色、纹理等)。
编码器通常由一系列卷积层组成,这些卷积层在提取特征的同时,逐步对输入张量进行下采样。
解码器接收此潜在向量,并重建原始输入数据。它通常采用转置卷积层 (Transposed Convolutional Layers) 将潜在表示上采样回原始维度。
以下是 VideoVAE 的代码实现
VIDEO_LATENT_CHANNELS = 4
class VideoVAE(nn.Module):
def __init__(self):
super().__init__()
self.e = nn.Sequential(
nn.Conv3d(3, 32, kernel_size=3, stride=(2, 2, 2), padding=1), nn.SiLU(),
nn.Conv3d(32, 64, kernel_size=3, stride=(1, 2, 2), padding=1), nn.SiLU(),
nn.Conv3d(64, 128, kernel_size=3, stride=(1, 2, 2), padding=1), nn.SiLU(),
nn.Conv3d(128, VIDEO_LATENT_CHANNELS, kernel_size=3, stride=1, padding=1)
)
self.d = nn.Sequential(
nn.ConvTranspose3d(VIDEO_LATENT_CHANNELS, 128, kernel_size=3, stride=1, padding=1), nn.SiLU(),
nn.ConvTranspose3d(128, 64, kernel_size=3, stride=(1, 2, 2), padding=1, output_padding=(0, 1, 1)), nn.SiLU(),
nn.ConvTranspose3d(64, 32, kernel_size=3, stride=(1, 2, 2), padding=1, output_padding=(0, 1, 1
)), nn.SiLU(),
nn.ConvTranspose3d(32, 3, kernel_size=3, stride=(2, 2, 2), padding=1, output_padding=1), nn.Sigmoid()
)
def encode(self, x):
return self.e(x)
def decode(self, x):
return self.d(x)
所设计的 VideoVAE 包含一个编码器(encoder)和一个解码器(decoder)。编码器负责将输入视频压缩为低维的潜在表示(latent representation),而解码器则从该潜在表示重建原始视频。编码器采用 3D 卷积处理时间维度(帧)以及空间维度(高和宽),解码器则利用转置卷积将潜在表示上采样回原始视频的形状。
Veo 3 同样具备音频生成能力,因此,我们接下来为音频数据构建一个类似的 VAE。
音频变分自编码器 (Audio VAE)
音频数据的处理方式与 VideoVAE 类似,但由于音频是一维时序数据,因此采用 1D 卷积。编码器接收音频输入并将其压缩为潜在表示,解码器则从该潜在表示重建音频。
定义 AudioVAE 类,其功能与 VideoVAE 类似,但专用于处理音频数据。
AUDIO_LATENT_CHANNELS = 16
class AudioVAE(nn.Module):
def __init__(self):
super().__init__()
self.e = nn.Sequential(
nn.Conv1d(1, 16, kernel_size=32, stride=8, padding=12), nn.SiLU(),
nn.Conv1d(16, AUDIO_LATENT_CHANNELS, kernel_size=32, stride=4, padding=14)
)
self.d = nn.Sequential(
nn.ConvTranspose1d(AUDIO_LATENT_CHANNELS, 16, kernel_size=32, stride=4, padding=14), nn.SiLU(),
nn.ConvTranspose1d(16, 1, kernel_size=32, stride=8, padding=12),
nn.Tanh()
)
def encode(self, x):
return self.e(x)
def decode(self, x):
return self.d(x)
AudioVAE 类专为处理音频数据而设计,特别是针对采样率为 16kHz、时长为 2 秒的音频文件。该模型将音频编码为潜在表示,并能将其解码回原始音频格式。
至此,Veo 3 的 VideoVAE 和 AudioVAE 两个核心组件已实现。接下来,需要实现负责处理文本和图像输入的编码器。
条件编码器 Conditional Encoder (采用 CLIP替代 UL2)
Google Veo 3 采用 UL2 模型进行条件编码。考虑到 UL2 模型规模较大,不易在本地设备运行,本实现将采用 OpenAI 的 CLIP 模型作为替代。CLIP 模型相对轻量,且同样具备强大的文本与图像编码能力。
训练过程中的文本与图像数据将输入条件编码器,该编码器负责将文本和图像编码为统一的联合表示 (Joint Representation)。此联合表示随后将作为条件,引导视频生成模型的行为。
创建一个条件编码器类,能够接收字幕和图像作为输入,并输出编码后的文本与图像特征。
TEXT_MAX_LENGTH = 77
class ConditioningEncoder(nn.Module):
def __init__(self, model_name="openai/clip-vit-large-patch14"):
super().__init__()
self.model = CLIPModel.from_pretrained(model_name)
self.processor = CLIPProcessor.from_pretrained(model_name)
def get_text_embeds(self, text):
inputs = self.processor(
text=text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=TEXT_MAX_LENGTH
).to(DEVICE)
return self.model.get_text_features(**inputs)
def get_image_embeds(self, image):
if isinstance(image, torch.Tensor):
inputs = self.processor(
images=image,
return_tensors="pt",
do_rescale=False
).to(DEVICE)
else:
inputs = self.processor(
images=image,
return_tensors="pt"
).to(DEVICE)
return self.model.get_image_features(**inputs)
处理不同类型数据(视频、音频、文本、图像)的编码器类均已定义完毕。根据 Google 的技术报告,Veo 3 架构中采用了一个基于 Transformer 的去噪组件。接下来,我们将构建一个基于 Transformer 的模型,负责对视频、音频和文本嵌入进行去噪处理。该模型将接收来自 VideoVAE、AudioVAE 和 ConditioningEncoder 的潜在表示,并通过一系列 Transformer 层进行处理。
Transformer 模块 (Transformer Block)
Transformer 是一种基于自注意力机制的神经网络架构,最初为处理序列数据(如文本)而设计,现已广泛应用于图像、视频等多种模态。它通过自注意力机制权衡输入序列中不同部分的重要性,从而有效捕捉数据内部的长程依赖关系。
Transformer 的具体实现可以非常复杂,也可以相对简化,具体取决于应用场景和可用训练数据量。考虑到本项目为学习性质,我们将为多模态模型实现一个简化版的 Transformer 架构。
def modulate(x, shift, scale):
return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)
class TransformerBlock(nn.Module):
def __init__(self, dim, heads):
super().__init__()
self.norm1 = nn.LayerNorm(dim)
self.attn = nn.MultiheadAttention(dim, heads, batch_first=True)
self.norm2 = nn.LayerNorm(dim)
self.mlp = nn.Sequential(
nn.Linear(dim, dim * 4),
nn.GELU(),
nn.Linear(dim * 4, dim)
)
self.adaLN_modulation = nn.Sequential(
nn.SiLU(),
nn.Linear(dim, 6 * dim, bias=True)
)
def forward(self, x, c):
shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = self.adaLN_modulation(c).chunk(6, dim=1)
x = x + gate_msa.unsqueeze(1) * self.attn(
modulate(self.norm1(x), shift_msa, scale_msa), x, x
)[0]
x = x + gate_mlp.unsqueeze(1) * self.mlp(
modulate(self.norm2(x), shift_mlp, scale_mlp)
)
return x
class FinalLayer(nn.Module):
def __init__(self, hidden_size, patch_size, out_channels):
super().__init__()
self.norm_final = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6)
self.linear = nn.Linear(hidden_size, patch_size, bias=True)
self.adaLN_modulation = nn.Sequential(
nn.SiLU(),
nn.Linear(hidden_size, 2 * hidden_size, bias=True)
)
def forward(
self, x, c):
shift, scale = self.adaLN_modulation(c).chunk(2, dim=1)
x = modulate(self.norm_final(x), shift, scale)
x = self.linear(x)
return x
本实现的 Transformer 模型包含以下核心组件:
ConditioningEncoder (条件编码器):已在前述章节定义,负责将文本和图像输入编码为嵌入向量。
TransformerBlock (Transformer 模块):实现多头自注意力机制和带有自适应层归一化(Adaptive LayerNorm)调制的前馈网络。
FinalLayer (最终层):对 Transformer 模块的输出进行归一化处理,并通过线性投影将其映射到期望的输出维度(例如,补丁大小),同样应用调制。
这是一个相对简化的 Transformer 实现,但已涵盖了处理文本和图像输入所需的基本要素。
时间步嵌入生成 (Timestep Embedding Generation)
在扩散模型中,时间步嵌入 (Timestep Embedding) 至关重要,它负责对扩散过程中的各个时间步进行编码,使模型能够感知和利用去噪过程的进展信息。
该模块通常基于时间步索引生成正弦位置编码(Sinusoidal Positional Encoding),然后通过一个小型多层感知机(MLP)将其投影到一个更高维度的空间,以便有效地作为模型的条件输入。
class TimestepEmbedding(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
self.mlp = nn.Sequential(
nn.Linear(dim, dim * 4),
nn.SiLU(),
nn.Linear(dim * 4, dim * 4)
)
def forward(self, t):
half_dim = self.dim // 2
emb = torch.exp(
torch.arange(half_dim, device=t.device) * -(np.log(10000.0) / (half_dim - 1))
)
emb = t.float()[:, None] * emb[None, :]
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1)
return self.mlp(emb)
TimestepEmbedding 类实现了计算时间步正弦嵌入的功能。其 forward 方法负责生成嵌入向量,并通过一个 MLP 将其投影到更高维度。
联合去噪模型 (Joint Denoising Model, JDM)
接下来,将实现 JointDenoisingTransformer 类,它将集成前述定义的所有组件。
JointDenoisingTransformer 的核心功能是接收带噪声的视频和音频输入、当前的时间步信息以及上下文(文本或图像嵌入),并输出去噪后的视频和音频。
JointDenoisingTransformer 的实现将分解为以下几个关键组件:
InputProjector:负责将带噪声的输入(视频和音频)投影为补丁 (Patches) 并进行嵌入,即将其转换为适合后续处理的格式。
TokenBuilder:基于投影得到的补丁构建词元 (Tokens) 序列,其中包含代表视频和音频补丁的词元,以及特殊的模态指示词元。
PatchTransformer:将 Transformer 架构应用于这些词元序列,通过自注意力机制捕捉序列内部的关系与依赖。
OutputProjector:将经过 Transformer 处理的词元序列投影回视频和音频的输出空间,即将处理后的词元转换回期望的去噪后视频和音频格式。
JointDenoisingTransformer:作为主类,集成上述所有组件并定义完整的前向传播逻辑,协调整个去噪过程。
首先定义 InputProjector 类,负责处理带噪声视频和音频输入的补丁化与嵌入,以及时间步和上下文信息的嵌入。
TEXT_EMBED_DIM = 768
VIDEO_PATCH_SIZE_F = 2
VIDEO_PATCH_SIZE_H = 2
VIDEO_PATCH_SIZE_W = 2
AUDIO_PATCH_SIZE = 16
class InputProjector(nn.Module):
def __init__(self, is_upsampler, embed_dim):
super().__init__()
self.is_upsampler = is_upsampler
self.embed_dim = embed_dim
self.t_embedder = nn.Sequential(TimestepEmbedding(embed_dim), nn.Linear(embed_dim * 4, embed_dim))
self.c_embedder = nn.Linear(TEXT_EMBED_DIM, embed_dim)
self.video_patch_size = VIDEO_PATCH_SIZE_F * VIDEO_PATCH_SIZE_H * VIDEO_PATCH_SIZE_W * VIDEO_LATENT_CHANNELS
self.video_patcher = nn.Conv3d(
VIDEO_LATENT_CHANNELS, embed_dim,
kernel_size=(VIDEO_PATCH_SIZE_F, VIDEO_PATCH_SIZE_H, VIDEO_PATCH_SIZE_W),
stride=(VIDEO_PATCH_SIZE_F, VIDEO_PATCH_SIZE_H, VIDEO_PATCH_SIZE_W)
)
self.audio_patch_size = AUDIO_PATCH_SIZE * AUDIO_LATENT_CHANNELS
self.audio_patcher = nn.Conv1d(
AUDIO_LATENT_CHANNELS, embed_dim,
kernel_size=AUDIO_PATCH_SIZE, stride=AUDIO_PATCH_SIZE
)
if self.is_upsampler:
self.low_res_patcher = nn.Conv3d(
VIDEO_LATENT_CHANNELS, embed_dim,
kernel_size=(VIDEO_PATCH_SIZE_F, VIDEO_PATCH_SIZE_H, VIDEO_PATCH_SIZE_W),
stride=(VIDEO_PATCH_SIZE_F, VIDEO_PATCH_SIZE_H, VIDEO_PATCH_SIZE_W)
)
def forward(self, noisy_video, noisy_audio, t, context, low_res_video=None):
video_patches = rearrange(self.video_patcher(noisy_video), 'b c f h w -> b (f h w) c')
audio_patches = rearrange(self.audio_patcher(noisy_audio), 'b c l -> b l c')
t_emb = self.t_embedder(t)
c_emb = self.c_embedder(context)
cond_emb = t_emb + c_emb
low_res_patches = None
if self.is_upsampler and low_res_video is not None:
low_res_patches = rearrange(self.low_res_patcher(low_res_video), 'b c f h w -> b (f h w) c')
return video_patches, audio_patches, cond_emb, low_res_patches
InputProjector 类实现了以下功能:
接下来定义 TokenBuilder 类,负责从投影的补丁创建词元序列。该类将处理视频和音频的特殊模态词元的创建,并在上采样模式下可选地为低分辨率视频创建特殊词元。
class TokenBuilder(nn.Module):
def __init__(self, is_upsampler, embed_dim):
super().__init__()
self.is_upsampler = is_upsampler
self.embed_dim = embed_dim
self.video_token = nn.Parameter(torch.randn(1, 1, embed_dim))
self.audio_token = nn.Parameter(torch.randn(1, 1, embed_dim))
if self.is_upsampler:
self.low_res_token = nn.Parameter(torch.randn(1, 1, embed_dim))
def forward(self, B, video_patches, audio_patches, low_res_patches=None):
tokens = [
self.video_token.repeat(B, 1, 1),
video_patches,
self.audio_token.repeat(B, 1, 1),
audio_patches
]
if self.is_upsampler and low_res_patches is not None:
tokens = [
self.low_res_token.repeat(B, 1, 1),
low_res_patches
] + tokens
return torch.cat(tokens, dim=1)
TokenBuilder 类实现了以下功能:
现在定义 PatchTransformer 类,它将 Transformer 架构应用于 TokenBuilder 创建的词元序列。该类负责处理位置嵌入以及对词元序列进行 Transformer 模块的堆叠处理。
VIDEO_LATENT_FRAMES = 8
BASE_VIDEO_LATENT_H = 4
BASE_VIDEO_LATENT_W = 4
UPSAMPLED_VIDEO_LATENT_H = 8
UPSAMPLED_VIDEO_LATENT_W = 8
AUDIO_LATENT_SAMPLES = AUDIO_SAMPLES // 32
DIT_EMBED_DIM = 256
DIT_DEPTH = 4
DIT_HEADS = 4
class PatchTransformer(nn.Module):
def __init__(self, is_upsampler, embed_dim):
super().__init__()
self.is_upsampler = is_upsampler
self.embed_dim = embed_dim
video_h = UPSAMPLED_VIDEO_LATENT_H if is_upsampler else BASE_VIDEO_LATENT_H
video_w = UPSAMPLED_VIDEO_LATENT_W if is_upsampler else BASE_VIDEO_LATENT_W
video_patches = (
(video_h // VIDEO_PATCH_SIZE_H) *
(video_w // VIDEO_PATCH_SIZE_W) *
(VIDEO_LATENT_FRAMES // VIDEO_PATCH_SIZE_F)
)
audio_patches = AUDIO_LATENT_SAMPLES // AUDIO_PATCH_SIZE
num_patches = video_patches + audio_patches + 2
if is_upsampler:
low_res_video_patches = (
(BASE_VIDEO_LATENT_H // VIDEO_PATCH_SIZE_H) *
(BASE_VIDEO_LATENT_W // VIDEO_PATCH_SIZE_W) *
(VIDEO_LATENT_FRAMES // VIDEO_PATCH_SIZE_F)
)
num_patches += low_res_video_patches + 1
self.pos_embed = nn.Parameter(torch.randn(1, int(num_patches), embed_dim))
self.transformer_blocks = nn.ModuleList([
TransformerBlock(embed_dim, DIT_HEADS)
for _ in range(DIT_DEPTH)
])
def forward(self, x, cond_emb):
if x.shape[1] > self.pos_embed.shape[1]:
extra_pos = torch.randn(1, x.shape[1] - self.pos_embed.shape[1], self.embed_dim, device=x.device)
pos_embed = torch.cat([self.pos_embed, extra_pos], dim=1)
else:
pos_embed = self.pos_embed[:, :x.shape[1]]
x += pos_embed
for block in self.transformer_blocks:
x = block(x, cond_emb)
return x
PatchTransformer 类实现了以下功能:
第四个组件是 OutputProjector 类,它负责将经过 Transformer 处理的词元序列投影回期望的去噪后视频和音频输出。该类将处理最终的层归一化和线性投影操作。
class OutputProjector(nn.Module):
def __init__(self, is_upsampler, embed_dim, video_patch_size, audio_patch_size):
super().__init__()
self.is_upsampler = is_upsampler
self.embed_dim = embed_dim
self.final_video = FinalLayer(embed_dim, video_patch_size, VIDEO_LATENT_CHANNELS)
self.final_audio = FinalLayer(embed_dim, audio_patch_size, AUDIO_LATENT_CHANNELS)
def forward(self, x, cond_emb, video_patches_shape, audio_patches_shape, noisy_audio_shape, low_res_patches_shape=None):
start_idx = 1
if self.is_upsampler and low_res_patches_shape is not None:
start_idx += 1 + low_res_patches_shape[1]
vid_out_patches = x[:, start_idx : start_idx + video_patches_shape[1]]
vid_pred = self.final_video(vid_out_patches, cond_emb)
vid_pred = rearrange(
vid_pred,
'b (f h w) (p1 p2 p3 c) -> b c (f p1) (h p2) (w p3)',
p1=VIDEO_PATCH_SIZE_F,
p2=VIDEO_PATCH_SIZE_H,
p3=VIDEO_PATCH_SIZE_W,
h=(UPSAMPLED_VIDEO_LATENT_H if self.is_upsampler else BASE_VIDEO_LATENT_H) // VIDEO_PATCH_SIZE_H,
w=(UPSAMPLED_VIDEO_LATENT_W if self.is_upsampler else BASE_VIDEO_LATENT_W) // VIDEO_PATCH_SIZE_W,
f=VIDEO_LATENT_FRAMES // VIDEO_PATCH_SIZE_F
)
aud_out_patches = x[:, start_idx + video_patches_shape[1] + 1 : start_idx + video_patches_shape[1] + 1 + audio_patches_shape[1]]
aud_pred = self.final_audio(aud_out_patches, cond_emb)
aud_pred = rearrange(
aud_pred,
'b l (p c) -> b c (l p)',
p=AUDIO_PATCH_SIZE,
c=AUDIO_LATENT_CHANNELS
)
if aud_pred.shape[2] != noisy_audio_shape[2]:
aud_pred = F.interpolate(aud_pred, size=noisy_audio_shape[2], mode='linear', align_corners=False)
return vid_pred, aud_pred
OutputProjector 类实现了以下功能:
最后,将上述所有组件整合到主类 JointDenoisingTransformer 中,该类将输入投影、词元构建、补丁转换和输出投影等步骤串联起来,形成完整的前向传播逻辑。
class JointDenoisingTransformer(nn.Module):
def __init__(self, is_upsampler=False):
super().__init__()
self.is_upsampler = is_upsampler
self.embed_dim = DIT_EMBED_DIM
self.input_proj = InputProjector(is_upsampler, self.embed_dim)
self.token_builder = TokenBuilder(is_upsampler, self.embed_dim)
self.patch_transformer = PatchTransformer(is_upsampler, self.embed_dim)
self.output_proj = OutputProjector(
is_upsampler, self.embed_dim,
self.input_proj.video_patch_size,
self.input_proj.audio_patch_size
)
def forward(self, noisy_video, noisy_audio, t, context, low_res_video=None):
B = noisy_video.shape[0]
video_patches, audio_patches, cond_emb, low_res_patches = self.input_proj(
noisy_video, noisy_audio, t, context, low_res_video
)
x = self.token_builder(B, video_patches, audio_patches, low_res_patches)
x = self.patch_transformer(x, cond_emb)
vid_pred, aud_pred = self.output_proj(
x, cond_emb,
video_patches.shape, audio_patches.shape, noisy_audio.shape,
low_res_patches.shape if low_res_patches is not None else None
)
return vid_pred, aud_pred
JointDenoisingTransformer 类已定义完毕。它是一个用于对视频和音频数据进行去噪的神经网络模型,集成了输入投影、词元构建、补丁转换和输出投影等多个组件。该模型能够处理基础分辨率和上采样分辨率的任务,通过处理带噪声的视频和音频输入来生成去噪后的输出。
VideoVAE、AudioVAE、ConditioningEncoder 和 JointDenoisingTransformer 等核心类均已定义。接下来,将进入模型训练阶段,实例化这些组件,并利用先前准备的数据集来优化模型参数。
Veo 3 模型训练
第三阶段是基于前述架构进行模型训练。在详细介绍训练循环逻辑之前,首先实例化模型的各个组件,并为训练过程做好准备。
DENOISER_TIMESTEPS = 1000
base_denoiser = JointDenoisingTransformer(is_upsampler=False).to(DEVICE)
upsampler_denoiser = JointDenoisingTransformer(is_upsampler=True).to(DEVICE)
noise_scheduler = DDPMScheduler(num_train_timesteps=DENOISER_TIMESTEPS, beta_schedule="linear")
上述代码实例化了用于基础分辨率视频和音频去噪的 base_denoiser,以及用于上采样分辨率去噪的 upsampler_denoiser。noise_scheduler 负责管理去噪过程中的噪声水平。
NUM_EPOCHS = 10
LEARNING_RATE = 1e-4
video_vae = VideoVAE().to(DEVICE)
audio_vae = AudioVAE().to(DEVICE)
conditioning_encoder = ConditioningEncoder().to(DEVICE)
all_params = (
list(video_vae.parameters()) +
list(audio_vae.parameters()) +
list(base_denoiser.parameters()) +
list(upsampler_denoiser.parameters()) +
list(conditioning_encoder.parameters())
)
optimizer = optim.AdamW(all_params, lr=LEARNING_RATE)
loss_fn = nn.MSELoss()
lr_scheduler = CosineAnnealingLR(
optimizer,
T_max=len(train_dataloader) * NUM_EPOCHS
)
实例化 VideoVAE、AudioVAE 和 ConditioningEncoder 模型,并配置了用于训练的优化器和学习率调度器。优化器选用 AdamW,学习率调度策略为 CosineAnnealingLR。损失函数采用均方误差 (MSE) 来训练去噪器。
训练循环的核心目标是训练两个去噪器(基础分辨率去噪器和上采样分辨率去噪器)来准确预测添加到潜在表示中的噪声,并通过最小化均方误差(MSE)损失来实现。在前向传播过程中:
首先,对低分辨率的 video_base 潜在表示添加噪声,并训练 base_denoiser 来预测此噪声,同时处理音频部分的去噪。
接着,对高分辨率的 video_upsampled 潜在表示添加噪声,并训练 upsampler_denoiser 来预测此噪声。在此阶段,带噪声的低分辨率视频潜在表示将作为额外的条件信号输入,以辅助模型在高分辨率重建中添加更多细节。
在反向传播过程中,计算来自两个去噪模型的组合损失,执行反向传播,并更新所有相关模型的参数。整个训练过程使用单一优化器,训练循环中包含针对基础阶段和上采样阶段的独立损失计算。
接下来定义模型的训练循环。该循环将迭代数据集,处理每个批次的数据,并根据计算得到的损失更新模型参数。根据每个批次的模态信息,模型将对视频和音频数据进行差异化处理,并应用相应的去噪模型(基础模型或上采样模型)。
GRADIENT_CLIP_NORM = 1.0
loss_history = []
for epoch in range(NUM_EPOCHS):
total_loss = 0.0
for batch_idx, batch in enumerate(train_dataloader):
optimizer.zero_grad()
modality = batch['modality']
video_base, video_upsampled, audio, img_base = (b.to(DEVICE) for b in (batch['video_base'], batch['video_upsampled'], batch['audio'], batch['input_image_base']))
raw_caption = batch['raw_caption']
timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (BATCH_SIZE,), device=DEVICE).long()
with torch.no_grad():
video_latents_base = video_vae.encode(video_base)
video_latents_upsampled = video_vae.encode(video_upsampled)
audio_latents = audio_vae.encode(audio)
text_embeds = conditioning_encoder.get_text_embeds(raw_caption)
context = text_embeds
if modality[0] == 'i2v':
with torch.no_grad():
img_embeds = conditioning_encoder.get_image_embeds(img_base)
context += img_embeds
noise_vid_base = torch.randn_like(video_latents_base)
noisy_video_base = noise_scheduler.add_noise(video_latents_base, noise_vid_base, timesteps)
noise_aud = torch.randn_like(audio_latents)
noisy_audio = noise_scheduler.add_noise(audio_latents, noise_aud, timesteps)
pred_noise_vid_base, pred_noise_aud = base_denoiser(noisy_video_base, noisy_audio, timesteps, context)
loss_base_vid = loss_fn(pred_noise_vid_base, noise_vid_base)
loss_aud = loss_fn(pred_noise_aud, noise_aud)
noise_vid_up = torch.randn_like(video_latents_upsampled)
noisy_video_upsampled = noise_scheduler.add_noise(video_latents_upsampled, noise_vid_up, timesteps)
pred_noise_vid_up, _ = upsampler_denoiser(noisy_video_upsampled, torch.zeros_like(noisy_audio), timesteps, context, low_res_video=noisy_video_base)
loss_upsample_vid = loss_fn(pred_noise_vid_up, noise_vid_up)
loss = loss_base_vid + loss_aud + loss_upsample_vid
loss.backward()
torch.nn.utils.clip_grad_norm_(all_params, GRADIENT_CLIP_NORM)
optimizer.step()
lr_scheduler.step()
total_loss += loss.item()
loss_history.append(total_loss / len(train_dataloader))
print(f"Epoch {epoch+1} Avg Loss: {total_loss / len(train_dataloader):.4f}")
print("--- Training Finished ---")
Epoch 2 Avg Loss: 3.7529
Epoch 3 Avg Loss: 3.6651
Epoch 4 Avg Loss: 3.5488
Epoch 5 Avg Loss: 3.5301
Epoch 6 Avg Loss: 3.5549
Epoch 7 Avg Loss: 3.4246
Epoch 8 Avg Loss: 3.4193
Epoch 9 Avg Loss: 3.4146
Epoch 10 Avg Loss: 3.3345
--- Training Finished ---
训练循环执行完毕后,可以观察到每个周期的组合损失。为了更直观地评估模型训练过程,我们将训练损失随周期的变化进行可视化。
plt.figure(figsize=(16, 3))
plt.plot(loss_history, label='Training Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.show()
从图中可见,损失值呈下降趋势,但下降幅度未达理想状态。这表明模型需要更多轮次的训练,并可能需要进一步调整超参数。由于计算资源限制,本演示在此暂停训练,重点在于展示代码结构与执行流程。
模型训练完成后(尽管本例中训练不充分),即可利用其从文本提示或图像生成视频。这是接下来的步骤。
基于级联逆向扩散的 Veo 3 推理
模型已完成数个周期的训练。尽管我们已实现了模型的各个组件,但当前模型的复杂度和训练程度尚不足以支撑一次完整的、高质量的训练运行。
尽管如此,我们仍可利用已训练(或部分训练)的模型执行推理过程。需要注意的是,由于模型未在足量的大规模数据上进行充分训练,预期输出的视频和音频将包含显著噪声。
推理过程并非简单的前向传播,它涉及额外的输入准备和输出处理步骤。
模型的推理过程采用级联逆向扩散(cascaded reverse diffusion)策略,分为两个主要步骤:
首先,生成低分辨率的基础潜在表示(视频和音频)。
然后,利用这些生成的基础潜在表示作为条件,输入到上采样器模型中,将视频细化至更高分辨率。
此逻辑对于文本到视频(T2V)和图像到视频(I2V)的推理均适用。若提供图像作为输入,其嵌入将被添加到基础模型和上采样器模型的上下文中。
最终,经过处理的“纯净”高分辨率视频潜在表示和基础音频潜在表示将分别通过其对应的 VAE 解码器,生成最终的视频和音频输出。
以下 generate_content 函数实现了上述两阶段逆向扩散的推理逻辑。
INFERENCE_STEPS = 50
def generate_content(prompt, input_image_path=None, steps=INFERENCE_STEPS):
for m in [video_vae, audio_vae, base_denoiser, upsampler_denoiser, conditioning_encoder]:
m.eval()
with torch.no_grad():
text_embeds = conditioning_encoder.get_text_embeds(prompt)
context = text_embeds
if input_image_path:
img_pil = Image.open(input_image_path).convert("RGB").resize((BASE_VIDEO_WIDTH, BASE_VIDEO_HEIGHT))
img_tensor = transforms.ToTensor()(img_pil).unsqueeze(0).to(DEVICE)
context += conditioning_encoder.get_image_embeds(img_tensor)
vid_latents_base = torch.randn(1, VIDEO_LATENT_CHANNELS, VIDEO_LATENT_FRAMES, BASE_VIDEO_LATENT_H, BASE_VIDEO_LATENT_W, device=DEVICE)
aud_latents = torch.randn(1, AUDIO_LATENT_CHANNELS, AUDIO_LATENT_SAMPLES, device=DEVICE)
noise_scheduler.set_timesteps(steps)
for t in tqdm(noise_scheduler.timesteps, desc="Denoising (Base)"):
ts = t.unsqueeze(0).to(DEVICE)
pred_vid_noise, pred_aud_noise = base_denoiser(vid_latents_base, aud_latents, ts, context)
vid_latents_base = noise_scheduler.step(pred_vid_noise, t, vid_latents_base).prev_sample
aud_latents = noise_scheduler.step(pred_aud_noise, t, aud_latents).prev_sample
vid_latents_upsampled = torch.randn(1, VIDEO_LATENT_CHANNELS, VIDEO_LATENT_FRAMES, UPSAMPLED_VIDEO_LATENT_H, UPSAMPLED_VIDEO_LATENT_W, device=DEVICE)
for t in tqdm(noise_scheduler.timesteps, desc="Denoising (Upsampler)"):
ts = t.unsqueeze(0).to(DEVICE)
pred_vid_noise_up, _ = upsampler_denoiser(
vid_latents_upsampled, torch.zeros_like(aud_latents), ts, context, low_res_video=vid_latents_base
)
vid_latents_upsampled = noise_scheduler.step(pred_vid_noise_up, t, vid_latents_upsampled).prev_sample
final_video = video_vae.decode(vid_latents_upsampled)
final_audio = audio_vae.decode(aud_latents)
return final_video, final_audio
generate_content 函数接收文本提示和可选的输入图像路径,基于提供的提示生成视频和音频内容。它利用训练好的模型,逐步对潜在表示进行去噪,首先处理基础分辨率,然后处理上采样分辨率。
现在,可以利用图像或文本提示进行视频生成。以下为文本到视频生成的测试示例。
t2v_prompt = "A mystical forest with glowing trees and soft ethereal music."
gen_video_t2v, gen_audio_t2v = generate_content(t2v_prompt)
为方便观察生成的视频和音频,定义一个简单的可视化函数。
def display_media(video, audio, title_prefix=""):
fig, axes = plt.subplots(1, 2, figsize=(15, 3))
if video is not None:
frame = (video[0,:, BASE_VIDEO_FRAMES//2].permute(1,2,0).cpu().numpy()*255).astype(np.uint8)
axes[0].imshow(frame)
axes[0].set_title(f"{title_prefix} Video Frame (Upsampled)")
axes[0
].axis('off')
if audio is not None:
axes[1].plot(audio[0,0].cpu())
axes[1].set_title(f"{title_prefix} Audio")
axes[1].set_xlabel('Sample')
axes[1].set_ylabel('Amplitude')
plt.tight_layout()
plt.show()
print("--- Displaying Generated Outputs ---")
display_media(gen_video_t2v, gen_audio_t2v, "T2V Output")
从输出结果可以看出,生成的视频帧包含显著噪声,音频也表现为噪声。这符合预期,因为模型仅在小规模数据集上进行了有限周期的训练。尽管如此,该实验验证了模型能够根据文本提示生成视频和音频,这已达到本项目的主要演示目标。
一个重要的问题随之而来:这个初步的 Veo 3 模型已构建完成,那么 Google 是如何评估其模型性能的呢?这是项目接下来的探讨内容。
评估策略
Google 并未公开 Veo 3 评估所用的全部确切指标,但在其模型介绍中提及 Veo v3 是在 Meta 发布的 MovieGenBench(包含视频和视频+音频两种任务)基准数据集上进行评估的。这些数据集分别包含 1,003 个视频生成提示和 527 个视频+音频生成提示,并附带了由其他主流模型生成的对应视频,例如 Meta 的 MovieGen(视频及视频+音频)、Kling 2.0(仅视频)、Minimax(仅视频)以及 Sora Turbo(仅视频)。
除了使用特定的基准数据集进行评估外,从统计学角度衡量生成视频质量的方法也多种多样。其中,两个常用的客观评价指标是:
CLIP Score (CLIP 分数): 此指标利用 CLIP 模型来评估生成的视频内容与输入文本提示之间的语义一致性。它通过计算文本嵌入与视频帧嵌入之间的余弦相似度来实现。
LPIPS (Learned Perceptual Image Patch Similarity, 学习感知图像块相似度): 此指标衡量生成的视频帧与参考图像之间的感知相似度,常用于评估生成图像或视频的视觉质量。
这些指标是评估文本到视频生成模型(尤其是此类任务)质量的常用手段。
以下函数用于计算 CLIP 分数:
def calculate_clip_score(gen_video, prompt):
middle_frame = gen_video[0, :, BASE_VIDEO_FRAMES // 2].cpu()
frame_pil = transforms.ToPILImage()(middle_frame)
text_embed = conditioning_encoder.get_text_embeds(prompt)
image_embed = conditioning_encoder.get_image_embeds(frame_pil)
text_embed = text_embed / text_embed.norm(dim=-1, keepdim=True)
image_embed = image_embed / image_embed.norm(dim=-1, keepdim=True)
clip_score = (text_embed @ image_embed.T).item()
return clip_score
calculate_clip_score 函数接收生成的视频和文本提示,从视频中提取中间帧,计算该帧与提示文本的 CLIP 嵌入,并返回它们之间的余弦相似度作为 CLIP 分数。
类似地,LPIPS 指标用于评估生成视频帧与参考图像之间的感知相似性。通常选用基于 VGG 网络的 LPIPS 模型(net='vgg'),因其在捕捉感知差异方面表现优异。
lpips_model = lpips.LPIPS(net='vgg').to(DEVICE)
def calculate_lpips(gen_video, img_path):
input_img = transforms.ToTensor()(
Image.open(img_path).convert('RGB').resize(
(UPSAMPLED_VIDEO_WIDTH, UPSAMPLED_VIDEO_HEIGHT)
)
).unsqueeze(0) * 2 - 1
gen_frame = gen_video[0, :, 0].cpu().unsqueeze(0) * 2 - 1
return lpips_model(input_img.to(DEVICE), gen_frame.to(DEVICE)).item()
对前述生成的文本到视频结果计算 CLIP 分数:
clip_score = calculate_clip_score(gen_video_t2v, t2v_prompt)
print(f"T2V CLIP Score: {clip_score:.4f}")
T2V CLIP Score: 0.1389
计算得到的 CLIP 分数接近 0,表明生成质量不佳,这与训练不充分的预期相符。尽管如此,此示例演示了如何对 Veo 3 或类似视频扩散模型等文本到视频生成系统进行定性评估。类似地,也可以计算图像到视频任务的 LPIPS 分数或其他相关指标。
总结
本文详细介绍了一个简化版 Veo 3 文本到视频生成模型的构建过程。首先进行了数据预处理,涵盖了去重、不安全内容过滤、质量合规性检查以及数据标注等环节。
随后设计并实现了模型的整体架构,包括 VideoVAE、AudioVAE、ConditioningEncoder 和 JointDenoisingTransformer 等核心组件。最后,构建了训练循环,并执行了推理过程,以根据提示或图像生成视频。
本项目为理解文本到视频生成模型的工作原理以及如何使用 PyTorch 从零开始构建此类模型提供了一个基础框架。尽管本实现与 Google Veo 3 的复杂度和性能尚有差距,但它清晰地展示了此类先进模型所涉及的核心概念、关键组件及技术流程,为进一步探索和研究奠定了基础。
想要了解更多资讯,请扫描下方二维码,关注机器学习研究会
转自:数据派THU