实现pp相关逻辑

Change-Id: I065a933e43a3d578e35687c8da471e41961427b6
This commit is contained in:
tianyutong
2025-06-23 23:13:03 +08:00
parent b2e940a01b
commit c4939227fe
3 changed files with 304 additions and 0 deletions

68
USAGE.md Normal file
View File

@@ -0,0 +1,68 @@
# 异构配置方法
## 异构DP配置
举例
```shell
--use-tp-pp-dp-mapping
--micro-batch-size-per-dp 1 2 1 6
--num-micro-batches-per-dp 1 1 1 1
```
### --use-tp-pp-dp-mapping
改变通信组顺序让dp走异构
### --micro-batch-size-per-dp
为不同的数据并行组设置微批次大小
- 格式`n0 mbs0 n1 mbs1 ...`
- `n0, n1, ...`:数据并行组内连续设备个数
- `mbs0, mbs1, ...`:对应设备组的微批次大小
- 约束:
$$
\sum_{i} n_i = \text{data-parallel-size}
$$
$$
\text{GBS} \mod \left( \sum_{i} n_i \times \text{mbs}_i \right) = 0
$$
### --num-micro-batches-per-dp
为不同的数据并行组设置微批次数量
- 格式`n0 nmb0 n1 nmb1 ...`
- `n0, n1, ...`:数据并行组内连续设备个数
- `nmb0, nmb1, ...`:对应设备组的微批次数量
- 约束:
$$
\sum_{i} n_i = \text{data-parallel-size}
$$
$$
global\_batch\_size=\sum_{i} n_i \times \text{mbs}_i \times \text{num\_mbs}_i
$$
***
## 异构pp配置
举例
```shell
--hetero-pipeline-stages 1 2 1 6
```
### --hetero-pipeline-stages
用于给不同的stage配置不同的层数
`n0 layers_0_0 layers_0_1 ... n1 layers_1_0 layers_1_1 ...`
`n0` 表示第 0 个异构阶段的设备数量,后续跟随该阶段各层的层数;
`n1` 表示第 1 个异构阶段的设备数量,后续跟随该阶段各层的层数,以此类推

View File

@@ -475,6 +475,102 @@ def validate_args(args, defaults={}):
args.recompute_method_per_stage = recompute_method_per_stage
>>>>>>> BASE (6f9206 实现pp相关逻辑)
if args.hetero_pipeline_stages is not None:
assert args.micro_batch_size_per_dp is None, \
"micro_batch_size_per_dp should be None when use hetero_pipeline_stages"
args.hetero_data_parallel_splits = None
stages = []
hetero_pipeline_stages = []
hetero_pipeline_stage_splits = []
counter = 0
num_layers = 0
for item in args.hetero_pipeline_stages:
if counter == 0:
hetero_pipeline_stage_splits.append(item)
counter = item
else:
stages.append(item)
num_layers += item
counter -= 1
if counter == 0:
hetero_pipeline_stages.append(stages)
stages = []
args.hetero_pipeline_stages = hetero_pipeline_stages
args.hetero_pipeline_stage_splits = hetero_pipeline_stage_splits
for split, stages in zip(args.hetero_pipeline_stage_splits, args.hetero_pipeline_stages):
assert split == len(stages), \
f"hetero_pipeline_stage_split {split} should be equal to the length of hetero_pipeline_stage {stages}"
assert num_layers == args.num_layers, f"sum of hetero_pipeline_stages {sum} should be equal to num_layers {args.num_layers}"
assert args.pipeline_model_parallel_size == sum(args.hetero_pipeline_stage_splits), \
f"pipeline_model_parallel_size {args.pipeline_model_parallel_size} should be equal to the sum of hetero_pipeline_stage_splits {args.hetero_pipeline_stage_splits}"
# assert len(args.hetero_pipeline_stage_splits) == len(args.hetero_device_types), \
# f"length of hetero_pipeline_stage_splits {args.hetero_pipeline_stage_splits} should be equal to the length of hetero_device_types {args.hetero_device_types}"
if args.recompute_granularity_per_stage != None:
assert args.recompute_granularity == 'full', \
'recompute-granularity-per-stage is only'\
'application to full recompute granularity mode'
assert args.recompute_method is not None, \
'for distributed recompute activations to work you '\
'need to use a recompute method '
pipeline_size_split = args.recompute_granularity_per_stage[::2]
recompute_granularity_split = args.recompute_granularity_per_stage[1::2]
for i in recompute_granularity_split:
assert i == 1 or i == 0, 'element of recompute-granularity-per-stage must be 0 or 1.'
assert sum(pipeline_size_split) == args.pipeline_model_parallel_size, \
'recompute-granularity-per-stage setting:' \
'the sum of n0, n1, ... should be equal to pipeline-model-parallel-size.'
args.recompute_granularity_per_stage = [recompute_granularity_split[i] for i,j in enumerate(pipeline_size_split) for _ in range(j)]
if args.recompute_num_layers_per_stage != None:
assert args.recompute_granularity == 'full', \
'recompute-num-layers-per-stage is only'\
'application to full recompute granularity'
assert args.recompute_method_per_stage is not None, \
'recompute_method_per_stage must be used with '\
'recompute_num_layers_per_stage '
recompute_num_layers_stage_split = args.recompute_num_layers_per_stage[::2]
recompute_num_layers_layer_split = args.recompute_num_layers_per_stage[1::2]
recompute_methods_stage_split = args.recompute_method_per_stage[::2]
recompute_methods_method_split = args.recompute_method_per_stage[1::2]
assert len(recompute_num_layers_stage_split) == len(recompute_num_layers_layer_split), \
'args.recompute_num_layers_per_stage setting must match form: n0, layers0, n1, layers1, ...'
assert len(recompute_methods_stage_split) == len(recompute_methods_method_split), \
'args.recompute_method_per_stage setting must match form: n0, layers0, n1, layers1, ...'
if args.virtual_pipeline_model_parallel_size != None:
assert args.pipeline_model_parallel_size * args.virtual_pipeline_model_parallel_size == sum(recompute_num_layers_stage_split), \
'args.recompute_num_layers_per_stage setting:' \
'the sum of n0, n1, ... should be equal to pipeline-model-parallel-size * virtual_pipeline_model_parallel_size'
assert args.pipeline_model_parallel_size * args.virtual_pipeline_model_parallel_size == sum(recompute_methods_stage_split), \
'args.recompute_method_per_stage setting:' \
'the sum of n0, n1, ... should be equal to pipeline-model-parallel-size * virtual_pipeline_model_parallel_size'
else:
assert args.pipeline_model_parallel_size == sum(recompute_num_layers_stage_split), \
'args.recompute_num_layers_per_stage setting:' \
'the sum of n0, n1, ... should be equal to pipeline-model-parallel-size.'
assert args.pipeline_model_parallel_size == sum(recompute_methods_stage_split), \
'args.recompute_method_per_stage setting:' \
'the sum of n0, n1, ... should be equal to pipeline-model-parallel-size.'
recompute_num_layers_per_stage = []
for i in range(len(recompute_num_layers_stage_split)):
for j in range(recompute_num_layers_stage_split[i]):
recompute_num_layers_per_stage.append(recompute_num_layers_layer_split[i])
recompute_method_per_stage = []
for i in range(len(recompute_methods_stage_split)):
for j in range(recompute_methods_stage_split[i]):
recompute_method_per_stage.append(recompute_methods_method_split[i])
args.recompute_num_layers_per_stage = recompute_num_layers_per_stage
args.recompute_method_per_stage = recompute_method_per_stage
# Batch size.
assert args.micro_batch_size is not None
assert args.micro_batch_size > 0

140
run.sh Normal file
View File

@@ -0,0 +1,140 @@
#!/bin/bash
# Runs the "70B" parameter model
export CUDA_DEVICE_MAX_CONNECTIONS=1
export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
# export CUDA_VISIBLE_DEVICES=0,1,2,3
# export CUDA_VISIBLE_DEVICES=0,1
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6007
NNODES=1
NODE_RANK=0
CHECKPOINT_PATH=/data2/share/llama-dataset/cp
TENSORBOARD_LOGS_PATH=/data2/share/llama-dataset/tb
TOKENIZER_PATH=/data2/nfs/llama-dataset/tokenizer.model
DATA_PATH=/data2/nfs/llama-dataset/merged-1t/merged-1t
# 7 B
HIDDEN_SIZE=4096
FFN_HIDDEN_SIZE=11008
NUM_LAYERS=8
NUM_HEADS=32
SEQ_LENGTH=4096
TRAIN_STEPS=5
# LR=3e-4
# MIN_LR=3e-5
# LR_WARMUP_STEPS=1
# WEIGHT_DECAY=0.1
# GRAD_CLIP=1
TP=2
PP=2
MBS=2
GBS=128
DISTRIBUTED_ARGS=(
--nproc_per_node $GPUS_PER_NODE
--nnodes $NNODES
--node_rank $NODE_RANK
--master_addr $MASTER_ADDR
--master_port $MASTER_PORT
)
LLAMA_MODEL_ARGS=(
--micro-batch-size ${MBS}
--num-layers ${NUM_LAYERS}
--hidden-size ${HIDDEN_SIZE}
--ffn-hidden-size $FFN_HIDDEN_SIZE
--num-attention-heads ${NUM_HEADS}
--seq-length ${SEQ_LENGTH}
--max-position-embeddings ${SEQ_LENGTH}
--num-query-groups 8
--tokenizer-type Llama2Tokenizer
--tokenizer-model $TOKENIZER_PATH
--swiglu
--use-flash-attn
--use-rotary-position-embeddings
--no-position-embedding
--disable-bias-linear
)
HETERO_ARGS=(
# hetero pp config
--hetero-pipeline-stages 1 2 1 6
# Hetero dp config
# --use-tp-pp-dp-mapping
# --micro-batch-size-per-dp 1 2 1 6
# --num-micro-batches-per-dp 1 1 1 1
)
TRAINING_ARGS=(
--global-batch-size ${GBS}
--train-iters ${TRAIN_STEPS}
--weight-decay 1e-2
--use-distributed-optimizer
--clip-grad 1.0
# --fp16
--bf16
--attention-softmax-in-fp32
--lr 0.00015
--lr-decay-style cosine
--min-lr 6.0e-6
--lr-warmup-fraction .01
--adam-beta1 0.9
--adam-beta2 0.95
--attention-dropout 0
--hidden-dropout 0
--untie-embeddings-and-output-weights
--sequence-parallel
--distributed-backend nccl
--initial-loss-scale 65536
--min-loss-scale 1.0
--loss-scale-window 1024
--transformer-impl transformer_engine
# --use-legacy-models
# --use-tp-pp-dp-mapping
)
MODEL_PARALLEL_ARGS=(
--tensor-model-parallel-size ${TP}
--pipeline-model-parallel-size ${PP}
)
DATA_ARGS=(
--data-path $DATA_PATH
--split 1
)
EVAL_AND_LOGGING_ARGS=(
--log-interval 1
--timing-log-level 1
)
INITIALIZATION_ARGS=(
--init-method-std 0.02
--seed 1234
)
SCRIPT_FILE=$(pwd)/pretrain_gpt.py
cmd="
torchrun ${DISTRIBUTED_ARGS[@]} ${SCRIPT_FILE} \
${LLAMA_MODEL_ARGS[@]} \
${HETERO_ARGS[@]} \
${TRAINING_ARGS[@]} \
${MODEL_PARALLEL_ARGS[@]} \
${DATA_ARGS[@]} \
${INITIALIZATION_ARGS[@]} \
${EVAL_AND_LOGGING_ARGS[@]}
"
echo $cmd
eval $cmd