围绕 transformers 构建现代 NLP 开发环境
阿里妹导读
Intro
样本处理
核心思路:函数式,流式,组合式,batch 做多路融合,对 datasets 兼容
MaxCompute(ODPS) :无法通过行号快速读取数据,但是有 Tunnel 接口支持从某个下标开始顺序读取数据。
文件系统:包括本地文件,HDFS,以及OSS等对象存储。本地文件虽然能用 lseek() 等函数快速跳转到某个位置(且该操作通常为 O(1)),但是如果每条样本字节数不一样,封装为随机读取还是非常复杂,但是做成流式读取就很容易。其他云上的存储介质更是如此。
消息队列:例如 MetaQ,天然流式的数据,可以主动拉取,也可以以订阅的方式封装流式读取的接口。
positive = Threaded(Map(func, ODPS(access_id, access_key, project,
positive_sample_table_name,
read_once=False)))
negative = Threaded(Map(func, ODPS(access_id, access_key, project,
negative_sample_table_name,
read_once=False)))
combined = Combine([positive, negative], sample_weight=[1.0, 1.0])
# 直接读取数据
for data in combined:
print(data)
# 使用 huggingface datasets 模块
# 之后可以直接用在 transformers.Trainer 类中参与训练
import datasets
train_dataset = datasets.IterableDataset.from_generator(combined,
gen_kwargs={"ranks": [0,1,2,3], "world_size": 4} # 支持分布式训练
)
技术问题:对分布式训练的支持
def _ODPS(access_id, access_key, project, table_name, partition_spec, read_once, retry,
endpoint, ranks=None, world_size=None):
# 加载 ranks + world_size 对应分片数据,实现略(计算读取 range 后,使用 PyODPS 加载数据)
def ODPS(access_id, access_key, project, table_name, partition_spec=None, read_once=True, retry=True, endpoint="http://xxxxxx"):
return partial(_ODPS, access_id, access_key, project, table_name, partition_spec, read_once, retry, endpoint)
注意,这里使用了partial返回了一个原始函数“科里化”后的版本,这是使 generator 可组合的关键设计。
模型开发
核心思路:
继承 PreTrainedModel,PreTrainedCofig,PreTrainedTokenizer 基类,与 transformers 体系打通。
通过 mixin / monkey patching 方式,扩充现有框架功能,例如对 OSS 模型加载/存储的支持。
# 加载我们项目组开发的分类模型(多任务层次分类)
model = BertForMultiTaskHierarchicalClassification.from_pretrained("./local_dir")
# or 从 OSS 直接加载
model = BertForMultiTaskHierarchicalClassification.from_pretrained("oss://model_remote_dir")
# 保存模型
model.save_pretrained("./local_dir")
model.save_pretrained("oss://model_remote_dir")
# 加载我们使用 C++ 开发的 tokenizer
tokenizer = ShieldTokenizer.from_pretrained("oss://model_remote_dir")
# 使用 AutoClass 实现相同功能,不需要指定特定的模型类名,由框架自动推断
model = AutoModel.from_pretrained("oss://model_remote_dir")
tokenizer = AutoTokenizer.from_pretrained("oss://model_remote_dir")
# 扩展 transformers 默认的 pipeline
# 增加 multitask-hierarchical-classification 任务
pipe = pipeline("multitask-hierarchical-classification", model=model, tokenizer=tokenizer)
print(pipe("测试文本"))
可以看出,我们自研的模型使用方式和 transformers 上的开源模型别无二致,甚至还能直接从 OSS 上加载模型,极大降低了模型的使用学习成本。
如何支持集团内的存储介质
class OSSRemoteModelMixin(object):
"""
支持用户在 from_pretrained 和 save_pretrained 时按照 oss://path 的格式指定路径 (bucket, ak, sk 需要在环境变量中指定, 见 util.oss_util 类)
可以用于所有包含 from_pretrained 和 save_pretrained 方法的类 (config or tokenizer or model)
"""
def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):
pretrained_model_name_or_path = convert_oss_to_local_path(cls, pretrained_model_name_or_path,
kwargs.get('cache_dir', risk_shield.CACHE_DIR))
return super(OSSRemoteModelMixin, cls).from_pretrained(
pretrained_model_name_or_path, *model_args, **kwargs)
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
*args,
**kwargs
):
prefix = "oss://"
oss_path = save_directory
if save_directory.startswith(prefix):
# save to a temp dir
# .......
# 将文件拷贝到 OSS,实现略
return res
else:
res = super(OSSRemoteModelMixin, self).save_pretrained(save_directory, *args, **kwargs)
# 让模型继承自 OSSRemoteModelMixin 就会自动获得 OSS 存取的能力
class BertForMultiTaskHierarchicalClassification(
OSSRemoteModelMixin, BertPreTrainedModel):
config_class = BertForMultiTaskHierarchicalClassificationConfig
def __init__(self, config:BertForMultiTaskHierarchicalClassificationConfig):
# .....
# 对于 AutoModel,则直接覆盖他们的 from_pretrained 方法。
def patch_auto_class(cls):
"""
让 AutoClass 支持 OSS 路径
"""
old_from_pretrained = cls.from_pretrained
def new_from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):
pretrained_model_name_or_path = \
convert_oss_to_local_path(cls, pretrained_model_name_or_path,
kwargs.get('cache_dir', risk_shield.CACHE_DIR))
return old_from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)
cls.from_pretrained = classmethod(new_from_pretrained)
同样,让框架支持 HDFS,ODPS Volumn 等其他存储介质,也可采用相同的方案。
如何使用自己的 Tokenizer
类变量 vocab_files_names
__getstate__, __setstate__,解决 C++ 对象的 pickle 问题
_convert_token_to_id
_convert_id_to_token
convert_ids_to_tokens
__setattr__
vocab_size
get_vocab
tokenize
save_vocabulary
_pad 对 tokenize 后的结果进行 pad,主要用在 trainer 里的 collator
_encode_plus 处理单条文本
_batch_encode_plus 处理 batch 文本
使用我们自研的 tokenizer,在单核下比 huggingface 使用 Rust 开发的 BertTokenizerFast 在多核下更快。
训练代码
def compute_metrics(model, eval_pred):
logits, labels = eval_pred
# 针对层次分类设计的评估指标
metrics = evaluate_multitask_hierarchical_classifier(model, labels)
return metrics
# 定义模型
tokenizer = ShieldTokenizer.from_pretrained("oss://backbone")
config = BertForMultiTaskHierarchicalClassificationConfig.from_pretrained("oss://backbone")
config.multi_task_config = {
# 这里的分类树仅是例子
"main": {
"hierarchical_tree":
["父类别1", "父类别2",
["父类别3",
["父类别3-子类别1", "父类别3-子类别2", "父类别3-子类别3", "父类别3-子类别4"]
]
]
}
}
model = BertForMultiTaskHierarchicalClassification.from_pretrained("./backbone")
# 定义训练数据加载策略
positive = Threaded(Map(func, ODPS(access_id, access_key, project,
positive_sample_table_name,
read_once=False)))
negative = Threaded(Map(func, ODPS(access_id, access_key, project,
negative_sample_table_name,
read_once=False)))
combined = Combine([positive, negative], sample_weight=[1.0, 1.0])
train_ds = datasets.IterableDataset.from_generator(combined)
training_arg = TrainingArguments(
output_dir="./output",
overwrite_output_dir=True,
num_train_epochs=4,
# ...
# 其他训练参数
# ...
dataloader_num_workers=2,
)
trainer = Trainer(
model=model,
args=training_arg,
train_dataset=train_ds,
tokenizer=tokenizer,
eval_dataset=val_ds,
compute_metrics=partial(compute_metrics, model),
# 针对层次分类开发的 collator
data_collator=MultiTaskHierarchicalClassifierCollator(
tokenizer=tokenizer, model=model, max_length=max_length,
task_label_smooth=task_label_smooth
)
)
# 将实验指标写入 tensorboard 并上传到 OSS
trainer.add_callback(OSSTensorboardWriterCallback("experiment/v1/"))
trainer.train()
第一个元素是 loss,trainer 自动优化该值。
后续的元素只能是 python dict / list / tuple / tensor,tensor 第一维的大小必须和 batch size 一致。最理想的情况就是一个二维 logits 矩阵。
模型部署
import risk_shield
from transformers import AutoTokenizer, AutoModel
model = AutoModel.from_pretrained("oss://model.tar.gz")
tokenizer = AutoTokenizer.from_pretrained("oss://model.tar.gz")
# 导出 ONNX
model.export_onnx(output_dir)
# 或导出 TensorRT
model.export_tensorrt(output_dir)
# 或导出 Tensorflow(ODPS 部署)
model.export_tf_saved_model(output_dir)
# 导出切词器到同一目录
tokenizer.save_pretrained(output_dir)
#########################
# 部署时加载对应 pipeline
from risk_shield import ONNXHierarchicalClassifierPipeline
from risk_shield import TensorRTHierarchicalClassifierPipeline
from risk_shield import TFSavedModelHierarchicalClassifierPipeline
pipe = TensorRTHierarchicalClassifierPipeline(output_dir)
result= pipe("测试文本")
实验管理
此外,我们在开发了一个实时查看 OSS 上 tensorboard 的命令行工具,一般我们都会在 GPU 服务器上训练模型和上报指标,在各自的笔记本上查看和分析指标,你看,不需要依赖 wandb 这种外部平台,也能实现类似的指标管理能力。
1. 命令行工具,实时拉取 OSS 上的 tensorboard
2. 超参搜索及指标对比
工具链及可视化
1. 命令行工具,加载 OSS 上保存的模型 checkpoint 并打开浏览器页面
2. 基于 gradio 开发的 debug 工具(分类模型)
3. 基于 gradio 开发的 debug 工具(NER 模型)
“软件2.0”
-- 团队 A 同学发布模型到 OSS
shield_publish ~/checkpoint_dir https://xxxx.aliyuncs.com/../xxxx.tar.gz
WARNING:root:从本地目录上传:~/checkpoint_dir
xxxx.tar.gz: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [01:50<00:00, 1.11s/it]
WARNING:root:已发布OSS(url):https://xxxx.aliyuncs.com/../xxxx.tar.gz
-- 团队 B 同学使用 OSS 上“内部开源”的模型
AutoModel.from_pretrained("https://xxxx.aliyuncs.com/../xxxx.tar.gz")
[1]https://huggingface.co/docs/datasets/v2.13.1/en/package_reference/main_classes#datasets.Dataset.from_generator
[2]https://wandb.ai/site
[3]https://karpathy.medium.com/software-2-0-a64152b37c35
[4]https://huggingface.co/docs/hub/index
[5]https://huggingface.co/spaces
[6]https://modelscope.cn/models
[7]https://huggingface.co/docs/hub/paddlenlp
阿里云开发者社区,千万开发者的选择
阿里云开发者社区,百万精品技术内容、千节免费系统课程、丰富的体验场景、活跃的社群活动、行业专家分享交流,欢迎点击【阅读原文】加入我们。
微信扫码关注该文公众号作者