魔搭推出Twinkle: 训练即服务, 让模型训练回归算法语义
随着大模型的持续演进,研发重心正从“预训练”显著转向“后训练”。要充分挖掘模型在各类场景下的应用潜力,针对性的训练微调至关重要。不可否认,以强化学习(RL)为代表的后训练范式,是模型生命周期中复杂度最高的环节之一:其实现方式高度定制化,难以通用;组件耦合度高,导致源码层面的理解门槛极高;此外,多模型协作的架构也极大地增加了代码编写的难度。
除了 OpenAI 提供的“数据进,模型出”的黑盒训练模式外,业界开源训练框架大致可分为两类:
- 通用型训练框架:以 LLaMA-Factory 和 ms-swift 为代表。这类框架基于 Transformers 和 TRL 的 Trainer 开发,深度适配 safetensors 模型生态,用户通常通过命令行配置来快速启动训练。
- 定制化训练框架:侧重于高度灵活的自定义流程(尤其在 RL 场景下)。这类框架允许用户自主定义训练逻辑,并通常借助 Ray 等分布式引擎来完成复杂的多模型协作与调度。

这两种技术路线各有千秋,但对于具备算法背景、理解模型训练原理的开发者而言,什么样的训练范式,才能同时兼顾“易用性”与“灵活性”?作为 ModelScope 开源社区及 ms-swift 项目的开发团队,我们在实践模型训练及与社区交流的过程中体会到:基于算法语义的训练 API 抽象,能更系统化提升模型训练的体验。它不仅能全局性地优化训练流程,还能通过干净的接口设计,来解耦复杂逻辑,为 Serverless 训练服务提供底层支撑,驱动 Training-as-a-Service(TaaS)的落地。
在我们思考与探索这个方向的过程中,Thinking Machine Lab 于去年年底推出了 Tinker API,在这一方向上先行一步。它通过提供forward_backward、optim_step和sample等基础接口,赋予了算法层面更强的定制化能力。然而,出了client端的API,Tinker 的底层训练服务实现并未对外开放;且目前Tinker的接口,仍难以完全满足在我们的构想中,对训练更细粒度的控制需求,例如实现类似 PyTorch 那样高度灵活的复杂训练循环编排等等。
秉承对训练框架的理解与发展愿景,我们正式推出了 Twinkle 项目。这是一个全新的、基于 Client-Server 架构的模块化训练框架,支持从本地到Serverless的服务化训练。通过与招商银行信息技术部深度技术合作,Twinkle 从创立伊始就同时兼顾易用性与实际生产可用性。同时,Twinkle 的开源开放是全方面的,覆盖了Client 和 Server 的全部实现,力求与社区一起打造一个真正开放、可定制的训练生态。
Twinkle 具备以下特点:
Client-Server架构解耦:标准化接口,在兼容Tinker API的同时,支持对训练更灵活控制。
1. 多种运行模式:支持本地一体化训练部署、远端集群部署、以及直接使用公用训练服务。
2. 灵活的后端支持:提供包括Transformers / Megatron在内的多样化训练后端。
3. 多租户训练服务:支持在共享的基础模型部署上,同时训练来自多个用户的LoRA。
4. Python源代码启动:支持100行以内运行/定制RL训练,浅封装的组件实现,简洁明了。
Twinkle 的全部代码,均在我们的github项目上开放:https://github.com/modelscope/twinkle。同时我们也在ModelScope上提供了Serverless的训练服务,当前该服务支持通过Tinker API来访问,后续也会开放通过更丰富的 Twinkle 原生API的访问。
设计与实现
结合我们之前开发 ms-swift 等框架的经验,我们发现主流训练范式对于具备算法背景的开发者而言,仍存在诸多缺憾。比如,完全自定制训练的门槛极高,开发者除了理解训练算法外,还需兼顾 GPU 调度、资源分配及故障恢复等底层工程细节。而以 ms-swift 为代表的通用框架路径,为了降低使用门槛,往往采取了较深的封装,这使得要在训练算法层面进行调整或定制,就会涉及对源码的复杂修改。此外,现有框架通常将算力与算法做了深度的耦合,导致算法开发者,也必须同时承担算力管理的职责。在这种架构上,要将训练框架进行“服务化”,需要面临较大的适配挑战。
回顾我们在开发ms-swift的过程中,通常追求的是把模型的训练(乃至使用)门槛降到最低。但是我们也一直在思考这么一个问题:对于具备一定算法背景,理解模型训练原理的算法开发者,到底需要一个什么样的训练框架?我们尝试细化这个问题:
- 是不是离开易用化的命令行深度封装,就一定要回退到写底层的训练代码?
- 有没有可能通过API把训练算法语义进行抽象,提供一个更加灵活,更方便定制的训练框架?
- 算法语义和工程实现的边界在哪里?能否让开发者关注于算法的逻辑,而将语义的准确实现的,包括托管 RL 所需复杂训练流程(Rollout -> Reward -> Update),交给框架?
同时,受 Tinker 服务化能力的启发,我们进一步探索了训练架构的边界:如何在同一套基础模型部署之上,实现多租户并发 LoRA 训练?这种模式不仅能极大地提升算力效率,更为实现真正的 Serverless 化“训练即服务(TaaS)”提供了可行路径。
正是这些考虑,指导了Twinkle项目的设计与实现。
系统架构
Twinkle 最重要也是最核心的设计理念就是“组件化”。针对大模型场景,把每个需要使用的,可以独立构建的部分都收敛为标准化的模块。例如:
- Dataset:负责数据的拉起和预处理
- DataLoader:负责数据的分配
- Model:负责模型构型和训练
- Sampler:负责采样
将整体项目串联在一起的,是Twinkle 的infra组件。该组件负责将所有模块进行装饰,使它们可以平滑工作在单卡、torchrun多机多卡、Ray或者client模式下。而构筑在其上的server组件,则使得MultiLoraModel能运行在基于FastAPI/Ray-serve技术架构上的服务中,实现多租户共用单基模训练/采样。

当前Twinkle 的训练复用目前支持的训练种类有:
- PT/SFT:基于LoRA的预训练和微调
- RL:基于自定义Advantage、Reward和Loss等实现的RL
- 多种模式混合训练:用户A进行SFT,用户B进行RL,互不干扰
此外得益于组件化的设计,Twinkle 能在多租条件下,持续扩展更多的可训练类型,包括TaaS化独占式全参训练、结合各类Agent框架的多轮仿真RL训练以及扩散模型训练等等。
Twinkle 采用了解耦的Client-Server架构,旨在提供最大的灵活性。客户端提供了两条不同的集成路径:
- Twinkle原生模式:提供与服务端接口完全一致的标准 API,实现无缝的端到端集成。
- Tinker 兼容模式:全面支持原生 Tinker API,开发者可以通过修改 Tinker 的基础 URL(Base URL)来指向 Twinkle 的训练服务,实现使用 Tinker 客户端来使用 Twinkle 服务。
接口的设计
在保留了Tinker API的兼容,提供包括forward_backward(前向/后向传播)、optim_step(优化器更新)和sample(采样)等基础训练接口的同时,Twinkle 为了实现对训练更加准确的控制和定制化,额外提供了更丰富的训练API接口,来支持细粒度训练控制、动态组件配置以及远程数据处理等能力。
- 细粒度训练控制
Twinkle 将训练步骤解耦,允许用户在 Client 端像写本地 PyTorch 代码一样编排复杂的训练循环(如梯度累积、自定义反向传播时机)。这包括/calculate_loss(计算loss而不进行反向传播)、/zero_grad(清空梯度)、/forward_only(仅执行前向计算)等等额外接口。 - 动态组件配置
在运行过程中,Twinkle 支持动态对模型的核心组件进行更新,而无需重启服务或重新加载模型权重。这其中包括动态更新loss函数、动态更新优化器/LR调度器/预处理器、动态添加Adapter以及动态更换对话模版等操作。 - 远程数据处理
Twinkle 架构支持存算分离的数据流,通过引入ProcessorManagement服务,来支持将数据逻辑卸载到服务端。例如,当你的数据量过大,driver进程无法运行时,可以将dataset和dataloader单独运行在worker中。
更多的接口设计以及API说明,可以参考相关文档。
多租户实现
Twinkle 支持在共享基础模型上进行并发多租户训练。凭借“LoRA 池 + 租户应用”的架构,Twinkle 能够支持N 个租户在完全隔离的环境下并行训练。这种设计提供了前所未有的灵活性:从模型视角来看,每个租户的会话都是独立的,支持异构配置——包括独特的训练数据填充策略、优化器以及损失函数,且所有这些都在同一基础模型上并发进行训练。

应用示例:
- 租户 A:在本地加载私有数据集,设置 LoRA rank=8,利用基础模型进行 SFT(有监督微调)。
- 租户 B:远程从 Hub 远程加载开源数据集,设置 LoRA rank=32,利用基础模型进行 PT(预训练)。
- 租户 C:利用基础模型进行 GRPO 损失计算,并调用 Sampler 进行采样。
- 租户 D:利用基础模型进行 logps(对数概率)推理。
这些进程在单个基础模型上并发执行,因为在 Twinkle 生态系统中,模型(Model)与采样器(Sampler)被集成作为任务无关(task-agnostic)的通用组件。同时Twinkle 服务端的实现,具备自动化集群管理与动态扩缩容功能,为构建可定制的规模化训练服务奠定了坚实基础。
实际使用
基于本地一体化部署使用
以下是一个使用 Twinkle 编写的RL代码,从这份约150行的代码中,我们可以清晰地看到:数据加载、rollout、训练过程分别由不同的组件承担,组件和组件之间由标准协议传递参数,而对Ray的引用则没有明显出现,仅在initialize中指定为Ray方式即可。
import os
from typing import List, Tuple, Dict, Any
from peft import LoraConfig
import twinkle
from twinkle import DeviceMesh, DeviceGroup, get_device_placement
from twinkle.advantage import GRPOAdvantage
from twinkle.checkpoint_engine import CheckpointEngineManager
from twinkle.data_format import SamplingParams
from twinkle.dataloader import DataLoader
from twinkle.dataset import Dataset, DatasetMeta
from twinkle.model.megatron import MegatronModel
from twinkle.metric import CompletionRewardMetric
from twinkle.preprocessor.llm import GSM8KProcessor
from twinkle.processor import InputProcessor
from twinkle.reward import GSM8KAccuracyReward, GSM8KFormatReward
from twinkle.sampler import vLLMSampler
from twinkle.template import Template
MODEL_ID = os.environ.get('MODEL_ID', 'Qwen/Qwen3.5-35B-A3B')
MODEL_GPUS = int(os.environ.get('MODEL_GPUS', 4))
SAMPLER_GPUS = int(os.environ.get('SAMPLER_GPUS',4))
NUM_GPUS = MODEL_GPUS + SAMPLER_GPUS
NUM_GENERATIONS = int(os.environ.get('NUM_GENERATIONS', 8))
MAX_NEW_TOKENS = int(os.environ.get('MAX_NEW_TOKENS', 4096))
LEARNING_RATE = float(os.environ.get('LR', 1e-5))
MAX_STEPS = int(os.environ.get('MAX_STEPS', 200))
BATCH_SIZE = int(os.environ.get('BATCH_SIZE', 16)) # global prompt-level, global completion-level batch size = BATCH_SIZE * num_generations * dp_size
MINI_BATCH_SIZE = int(os.environ.get('MINI_BATCH_SIZE', 16)) # global completion-level mini-batch-size
MICRO_BATCH_SIZE = int(os.environ.get('MICRO_BATCH_SIZE', 2)) # per-device-micro-batch-size (completion-level), batch_size in forward_backward
GRADIENT_ACCUMULATION_STEPS = int(os.environ.get('GRADIENT_ACCUMULATION_STEPS', 1))
ADAPTER_NAME = 'default'
def create_gsm8k_dataset():
dataset = Dataset(DatasetMeta('ms://modelscope/gsm8k', subset_name='main', split='train'))
dataset.set_template('Template', model_id=MODEL_ID, max_length=2048)
dataset.map(GSM8KProcessor())
dataset.encode(add_generation_prompt=True)
return dataset
def compute_rewards(
trajectories: List[Dict[str, Any]],
) -> Tuple[List[float], List[float], List[float]]:
accuracy_reward_fn = GSM8KAccuracyReward()
format_reward_fn = GSM8KFormatReward()
accuracy_rewards = accuracy_reward_fn(trajectories)
format_rewards = format_reward_fn(trajectories)
total_rewards = [a + f for a, f in zip(accuracy_rewards, format_rewards)]
return total_rewards, format_rewards, accuracy_rewards
def main():
# set sampler and model separate to use different gpus
device_groups = [
DeviceGroup(name='model',ranks=list(range(MODEL_GPUS)),device_type='GPU'),
DeviceGroup(name='sampler',ranks=list(range(MODEL_GPUS, NUM_GPUS)),device_type='GPU'),
]
model_mesh = DeviceMesh.from_sizes(world_size=MODEL_GPUS, dp_size=MODEL_GPUS)
sampler_mesh = DeviceMesh.from_sizes(world_size=SAMPLER_GPUS, dp_size=SAMPLER_GPUS)
twinkle.initialize(mode='ray', nproc_per_node=NUM_GPUS, groups=device_groups,lazy_collect=False)
lora_config = LoraConfig(target_modules='all-linear', r=32, lora_alpha=64, lora_dropout=0.05)
model = MegatronModel(model_id=MODEL_ID, device_mesh=model_mesh, remote_group='model', mixed_precision='bf16')
model.add_adapter_to_model(ADAPTER_NAME, lora_config, gradient_accumulation_steps=1)
model.set_optimizer('default', lr=LEARNING_RATE)
model.set_lr_scheduler('default', lr_decay_steps=MAX_STEPS, max_lr=LEARNING_RATE)
model.set_loss('GRPOLoss', epsilon=0.2)
model.set_processor(InputProcessor)
model.set_template('Template', model_id=MODEL_ID)
sampler = vLLMSampler(
model_id=MODEL_ID,
engine_args={
'gpu_memory_utilization': 0.8,
'max_model_len': 4096,
'max_lora_rank': 32, # save as lora_config
'enable_lora': True,
},
device_mesh=sampler_mesh,
remote_group='sampler',
)
sampler.set_template(Template, model_id=MODEL_ID)
ckpt_manager = CheckpointEngineManager(model=model, sampler=sampler)
dataloader = DataLoader(
dataset=create_gsm8k_dataset,
batch_size=BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS,
min_batch_size=BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS,
device_mesh=model_mesh,
remote_group='model',
)
advantage_fn = GRPOAdvantage()
metrics = CompletionRewardMetric()
sampling_params = SamplingParams(max_tokens=MAX_NEW_TOKENS)
optim_step = 0
print(get_device_placement())
for batch in dataloader:
if optim_step >= MAX_STEPS:
break
metrics.reset()
global_prompts = batch if isinstance(batch, list) else [batch]
ckpt_manager.sync_weights(merge_and_sync=False)
sampler.reset_prefix_cache()
sample_response = sampler.sample(
global_prompts*NUM_GENERATIONS,
sampling_params,
num_samples=1,
)
all_input_data: List[Dict[str, Any]] = []
all_old_logps: List[List[float]] = []
all_completion_lengths: List[int] = []
for sequence in sample_response.sequences:
all_input_data.append(sequence.new_input_feature)
all_old_logps.append(sequence.logprobs)
all_completion_lengths.append(len(sequence.tokens))
total_rewards, format_rewards, accuracy_rewards = compute_rewards(
all_input_data
)
metrics.accumulate(
completion_lengths=all_completion_lengths,
rewards={
'total': total_rewards,
'format': format_rewards,
'accuracy': accuracy_rewards,
},
)
advantages = advantage_fn(total_rewards, num_generations=NUM_GENERATIONS, scale='group').tolist()
# Split completions into mini-batches and run one optim step per mini-batch.
total_completions = len(all_input_data)
for mb_start in range(0, total_completions, MINI_BATCH_SIZE):
mb_end = min(mb_start + MINI_BATCH_SIZE, total_completions)
mb_inputs = all_input_data[mb_start:mb_end]
mb_old_logps = all_old_logps[mb_start:mb_end]
mb_advantages = advantages[mb_start:mb_end]
model.forward_backward(
inputs=mb_inputs,
old_logps=mb_old_logps,
advantages=mb_advantages,
micro_batch_size=MICRO_BATCH_SIZE,
)
model.clip_grad_and_step()
optim_step += 1
if optim_step >= MAX_STEPS:
break
log_dict = metrics.calculate()
log_dict.update(model.calculate_metric(is_training=True))
metrics.reset()
print(f'[Step {optim_step}/{MAX_STEPS}] {log_dict}')
print(f'Training completed. optim_steps={optim_step}')
model.save('grpo-gsm8k-checkpoint')
if __name__ == '__main__':
main()
Tinker 兼容性
Twinkle 对希望使用Tinker客户端的开发者提供了适配Tinker的兼容性。无论是通过本地单机部署,还是通过Ray上的集群部署,开发者都可以像使用Tinker官方server一样来使用 Twinkle的服务。此外,我们还在ModelScope上提供了可直接使用的Serverless的 Twinkle 训练服务,具体可以参考官方服务文档。以这一服务为例,通过使用Tinker兼容的接口,实现的训练代码如下:
import os
from tinker import types
from tqdm import tqdm
from twinkle import init_tinker_client
from twinkle.dataloader import DataLoader
from twinkle.dataset import Dataset, DatasetMeta
from twinkle.preprocessor import SelfCognitionProcessor
from twinkle.server.tinker.common import input_feature_to_datum
# Initialize the Tinker client before importing ServiceClient
init_tinker_client()
from tinker import ServiceClient
# The base model to fine-tune / evaluate
base_model = 'Qwen/Qwen3-30B-A3B-Instruct-2507'
base_url = 'http://www.modelscope.cn/twinkle'
def train():
# Step 1: Prepare the dataset
# Load the self-cognition dataset from ModelScope (first 500 examples)
dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(500)))
# Apply the chat template matching the base model (max 256 tokens per sample)
dataset.set_template('Template', model_id=f'ms://{base_model}', max_length=256)
# Replace placeholder names with custom model/author identity
dataset.map(SelfCognitionProcessor('twinkle模型', 'twinkle团队'), load_from_cache_file=False)
# Tokenize and encode the dataset into model-ready input features
dataset.encode(batched=True, load_from_cache_file=False)
# Wrap the dataset into a DataLoader that yields batches of size 8
dataloader = DataLoader(dataset=dataset, batch_size=8)
# Step 2: Initialize the training client
service_client = ServiceClient(
base_url=base_url,
api_key=os.environ.get('MODELSCOPE_TOKEN')
)
# Create a LoRA training client for the base model (rank=16 for the LoRA adapter)
training_client = service_client.create_lora_training_client(base_model=base_model, rank=16)
# Step 3: Run the training loop
for epoch in range(3):
print(f'Epoch {epoch}')
for step, batch in tqdm(enumerate(dataloader)):
# Convert each InputFeature into a Datum for the Tinker API
input_datum = [input_feature_to_datum(input_feature) for input_feature in batch]
# Send data to server: forward + backward pass (computes gradients)
fwdbwd_future = training_client.forward_backward(input_datum, 'cross_entropy')
# Optimizer step: update model weights with Adam
optim_future = training_client.optim_step(types.AdamParams(learning_rate=1e-4))
# Wait for both operations to complete
fwdbwd_result = fwdbwd_future.result()
optim_result = optim_future.result()
print(f'Training Metrics: {optim_result}')
# Save a checkpoint after each epoch
save_future = training_client.save_state(f'twinkle-lora-{epoch}')
save_result = save_future.result()
print(f'Saved checkpoint to {save_result.path}')
if __name__ == '__main__':
train()
在这个例子里,我们使用了ModelScope上部署的现有服务。开发者完全可以将相同的服务代码部署到自己的本地机器或者GPU集群上,然后通过修改对应的部署URL来进行使用。
算法组件的使用
Twinkle 的server架构构建在基础算法组件上,这些算法组件基于PyTorch等基础算法库,并针对大模型场景进行了特殊优化。
from twinkle.dataloader import DataLoader
dataloader = DataLoader(dataset, device_mesh=...)
for data in dataloader:
...
上面的代码在不同的模型构型和工作模式下,会有不同的表现:
- 在torchrun的条件下,batch会返回符合当前模型构型的数据集合。例如八张卡以ddp方式训练时,每张卡会获取[0~1],[2~3],...[14~15]的一片。
- 在Ray的条件下,batch则会返回全部的16个样例,可以让driver端集中处理样例,再分发给模型。而后续Twinkle✨的infra部分会按照模型的布局分配数据,例如模型dp=2,tp=2,cp=2时,会将16个样例分为两个dp组分发,则每四个模型会收到[0~7],[8~15]分片中的一片。
通过这样的方式,Twinkle 将大模型训练需要的所有组件全部解耦,极大地减小了封装深度。开发者在使用 Twinkle 时,不需要特别注意数据是如何分发和处理的,从而可以更专心地构建算法、算子本身。
目前已经支持了Dataset、Model、Template、Loss、Advantage等20多个组件,还有更多的组件正在开发中。

我们强烈推荐开发者查看我们的cookbook,并根据其中的训练代码进行二次开发。开发者可以定制数据集/优势函数/奖励/模板等,包括自己特定的训练流程,快速实现新算法的研发或行业模型的训练过程。
训练精度与速度
我们基于 Qwen3-4B 模型和 GSM8K 数据集,采用准确性奖励(accuracy reward)与格式奖励(format reward)相结合的策略,以veRL为基准线,对Twinkle 进行了对比评估。
训练配置:
- 总 batch size 为 128(以completions 计),划分为 8 个mini-batch 进行更新。
- 均采用server异构部署模式:使用 4 张 GPU 进行训练,另 4 张 GPU 专用于推理。
Twinkle 的奖励曲线

veRL 的奖励曲线

可以看到,在训练效果方面,两个框架在训练过程中展现奖励曲线趋势基本一致;而在训练效率方面,Twinkle 完成一个global batch 的平均耗时约为 70 秒,而 veRL约为 80 秒。两者训练速度相近。
开源WorkShop
作为开源社区,魔搭希望能让更多的开发者和技术团队理解和使用大模型技术、对自己所在的行业实现AI迭代式创新。基于这个愿景,魔搭和并与招商银行信息技术部开展了WorkShop模式的合作,Twinkle 框架的产生,得益于这种纯技术式开源开放的交流合作模式。
开发者对可以通过不同的方式来参与workshop:包括提交PR来开发新组件,将组件贡献进魔搭ModelHub并在Twinkle 的cookbook和README中引用该技术,以及在模型训练、智能体、文生视频/文生图等各个领域共建新的开源项目等等。
多硬件支持
为了更好的服务更广泛的用户,Twinkle 还对国产化硬件做了原生支持。在模型训练、vLLM采样、kernels等场景中进行了大量的适配和效率提升。下面我们给出了RL+昇腾硬件上的训练效率对比:
得益于招商银行信息技术部与昇腾团队的贡献,Twinkle 完成了昇腾生态的原生支持,并针对昇腾硬件做了特殊优化,基于 Qwen3-4B 模型和 GSM8K 数据集,采用准确性奖励(accuracy reward)与格式奖励(format reward)相结合的策略,以veRL为基准线,对Twinkle进行了对比评估。
训练配置:
- 总 batch size 为32(以 completions 计)。
- Twinkle 采用异构部署模式:使用 4 张 GPU 进行训练,另 4 张 GPU 专用于推理。
- verRL采用全异步部署模式:使用 4 张 GPU 进行训练,另 4 张 GPU 专用于推理。
Twinkle 的训练曲线

veRL 的曲线

可以看出,两个框架在训练过程中展现奖励曲线趋势基本一致,而在训练效率方面,Twinkle 完成一个global batch 的平均耗时约为20 秒,而 veRL约为30 秒。由于我们在昇腾上的优化,Twinkle 的速度稍占优势。
总结
Twinkle 项目可以以组件化方式承载单卡、多卡、Ray、Client-Server等模式,帮助算法开发者尽量减少训练、采样的复杂开发和维护的成本,并给大模型行业提供一种新的商业模式。然而我们对这个框架的希望还不止于此。比如,我们可以将评测过程API化,这样整个的训-采-评过程都可以细粒度切分随意组合,这带来了非常大的想象空间,例如模型的持续进化。此外,在较新的工作中,API化的训练过程可以融合进Agent框架中,实现大模型的自进化(self-evolve)。我们希望Twinkle 可以起到一个抛砖引玉的作用,引起一定的业界共鸣,共建及推进大模型训练、使用的新的方法论。
更多推荐




所有评论(0)