当前位置: 首页 > news >正文

Analytics Zoo:在Apache Spark上实现分布式Tensorflow和BigDL管道的统一分析和AI平台

  • 如今,将深度学习应用于大数据管道往往需要手工“拼接”许多独立的组件(如TensorFlow、Apache Spark、Apache HDFS等),这个过程可能非常复杂,而且容易出错。
  • Analytics Zoo提供了一个在Apache Spark上实现分布式TensorFlow、Keras和BigDL管道的统一分析和AI平台,简化了这个过程。
  • 它将Spark、TensorFlow、Keras和BigDL程序无缝地合并到一个集成管道中,可以透明地扩展到大型Apache Hadoop/Spark集群,用于分布式训练或推理。
  • 早期用户(如世界银行、Cray、Talroo、Baosight、美的/库卡等)已经基于Analytics Zoo构建了分析+AI应用程序,它可以应用于范围广泛的工作负载(包括基于迁移学习的图像分类、用于短时降水预测的sequence-to-sequence预测、用于推荐工作的神经协同过滤、无监督时序异常检测等等)。
  • 本文提供了几个具体的教程,介绍如何使用Analytics Zoo在Apache Spark上实现分布式TensorFlow管道,以及在实际的用例中使用Analytics Zoo实现端到端的文本分类管道。

人工智能应用程序的不断进步将深度学习带到新一代数据分析开发的前沿。特别是,我们看到越来越多的组织需要将深度学习技术(如计算机视觉、自然语言处理、生成对抗神经网络等)应用到他们的大数据平台和管道。如今,这常常需要手工“拼接”许多独立的组件(例如Apache Spark、TensorFlow、Caffe、Apache Hadoop分布式文件系统HDFS、Apache Storm/Kafka等),这可能是一个复杂且容易出错的过程。

在英特尔,我们一直与开源社区用户以及京东、UCSF、Mastercard等合作伙伴和客户广泛合作,在Apache Spark上构建深度学习(DL)和AI应用程序。为了简化端到端的开发和部署,我们开发了Analytics Zoo,这是一个统一的分析+ AI平台,它将Spark、TensorFlow、Keras和BigDL程序无缝地合并到一个集成管道中,可以透明地扩展到大型Apache Hadoop/Spark集群,用于分布式训练或推理。

早期用户(如世界银行、Cray、Talroo、Baosight、美的/库卡等)已经基于Analytics Zoo构建了分析+AI应用程序,它可以应用于范围广泛的工作负载,其中包括基于迁移学习的图像分类、用于短时降水预测的sequence-to-sequence预测、用于推荐工作的神经协同过滤、无监督时序异常检测等等。

本文提供了几个具体的教程,介绍如何使用Analytics Zoo在Apache Spark上实现分布式TensorFlow管道,以及在实际的用例中使用Analytics Zoo实现端到端的文本分类管道。

Apache Spark上的分布式TensorFlow

使用Analytics Zoo,用户可以方便地使用Spark和TensorFlow在大型集群上构建端到端的深度学习管道,如下所述。

使用PySpark进行数据预处理和分析

举例来说,要使用分布式的方式处理对象检测管道的训练数据,可以使用PySpark简单地把原始图像数据读入一个RDD(弹性分布式数据集),这是一个跨集群分区的不可变记录集合,然后运用一些转换解码图像,并提取边界框和类标签,如下所示。

train_rdd = sc.parallelize(examples_list)  .map(lambda x: read_image_and_label(x))  .map(lambda image: decode_to_ndarrays(image))

结果RDD (train_rdd)中的每条记录都包含一个NumPy ndrray列表(即图像、边界框、类和检测到的框的数量),然后可以直接在Analytics Zoo上用于TensorFlow模型的分布式训练;这是通过从结果RDD创建TFDataset来完成的(如下所示)。

dataset = TFDataset.from_rdd(train_rdd,             names=[\u0026quot;images\u0026quot;, \u0026quot;bbox\u0026quot;, \u0026quot;classes\u0026quot;, \u0026quot;num_detections\u0026quot;],             shapes=[[300, 300, 3],[None, 4], [None], [1)]],             types=[tf.float32, tf.float32, tf.int32, tf.int32],             batch_size=BATCH_SIZE,             hard_code_batch_size=True)

使用TensorFlow开发深度学习模型

在Analytics Zoo中,TFDataset表示一个分布式元素集,其中每个元素包含一个或多个Tensorflow Tensor对象。然后,我们可以直接使用这些Tensor(作为输入)来构建Tensorflow模型;例如,我们可以使用Tensorflow Object Detection API构建一个SSDLite+MobileNet V2模型(如下所示)。

# 使用tensorflow对象检测api来构造模型# https://github.com/tensorflow/models/tree/master/research/object_detectionfrom object_detection.builders import model_builderimages, bbox, classes, num_detections = dataset.tensorsdetection_model = model_builder.build(model_config, is_training=True)resized_images, true_image_shapes = detection_model.preprocess(images)detection_model.provide_groundtruth(bbox, classes)prediction_dict = detection_model.predict(resized_images, true_image_shapes)losses = detection_model.loss(prediction_dict, true_image_shapes)total_loss = tf.add_n(losses.values())

在Spark和BigDL上进行分布式训练/推理

在构造好模型之后,我们可以直接在Spark上(利用BigDL框架)以分布式的方式训练模型。例如,在下面的代码片段中,我们应用迁移学习技术来训练一个在MS COCO数据集上预训练过的Tensoflow模型。

with tf.Session() as sess:    init_from_checkpoint(sess, CHECKPOINT_PATH)    optimizer = TFOptimizer(total_loss, RMSprop(LR), sess)    optimizer.optimize(end_trigger=MaxEpoch(20))    save_to_new_checkpoint(sess, NEW_CHEKCPOINT_PATH)

在后台,从磁盘读取输入数据并进行预处理,利用PySpark生成Tensorflow Tensor的RDD;然后,在BigDL和Spark(如BigDL技术报告所述)上以分布式的方式对Tensorflow模型进行训练。整个训练管道可以自动从单个节点扩展到基于Xeon的大规模Hadoop/Spark集群(无需修改代码或手动配置)。

另外,模型训练好以后,我们可以使用PySpark、TensorFlow和BigDL(类似于上面的训练管道)在Analytics Zoo上执行大规模的分布式评估/推断。或者,我们也可以使用Analytics Zoo提供的POJO风格的服务API来部署低延迟的在线服务(例如,Web服务、Apache Storm、Apache Flink等)模型,如下所示。

AbstractInferenceModel model = new AbstractInferenceModel(){};model.loadTF(modelPath, 0, 0, false);List\u0026lt;List\u0026lt;JTensor\u0026gt;\u0026gt; output = model.predict(inputs);

下图显示了Analytics Zoo中Apache Spark管道上的分布式TensorFlow的整个工作流(包括训练、评估/推断和在线服务)。

\"image\"

端到端分析和AI管道

Analytics Zoo还为用户提供了丰富的端到端管道分析和AI支持,包括:

  • 易于使用的抽象,如Spark Dataframe和ML管道支持、迁移学习支持、Keras风格的API、POJO风格的模型服务API等等;
  • 面向图象、文本和3D图象的常见特征工程操作*;*
  • 内置的深度学习模型,如文本分类、推荐、对象检测、图象分类等;
  • 参考用例,如时间序列异常检测、欺诈检测、图像相似性搜索等。

使用这些高级管道支持,用户可以在几行代码中轻松构建复杂的数据分析和深度学习应用程序,如下所述。

使用NNImageReader将图象加载到Spark DataFrames中

from zoo.common.nncontext import *from zoo.pipeline.nnframes import *sc = init_nncontext()imageDF = NNImageReader.readImages(image_path, sc)

使用DataFrames转换处理加载的数据

getName = udf(lambda row: ...)getLabel = udf(lambda name: ...)df = imageDF.withColumn(\u0026quot;name\u0026quot;, getName(col(\u0026quot;image\u0026quot;))) \\     .withColumn(\u0026quot;label\u0026quot;, getLabel(col('name')))

使用内置的特征工程操作处理图像

from zoo.feature.imageimport *transformer = ChainedPreprocessing(        [RowToImageFeature(), ImageChannelNormalize(123.0, 117.0, 104.0),         ImageMatToTensor(), ImageFeatureToTensor()])

使用迁移学习API加载已有的Caffe模型,删除最后几层,冻结开始几层,追加几个新层(使用Keras风格的API)

from zoo.pipeline.api.netimport *full_model = Net.load_caffe(def_path, model_path)# 删除pool5之后的层Remove layers after pool5model = full_model.new_graph(outputs=[\u0026quot;pool5\u0026quot;])# 冻结从输入到res4f 之间的层,包括res4fmodel.freeze_up_to([\u0026quot;res4f\u0026quot;])# 追加几个层image = Input(name=\u0026quot;input\u0026quot;, shape=(3, 224, 224))resnet = model.to_keras()(image)resnet50 = Flatten()(resnet)logits = Dense(2)(flatten)newModel = Model(inputs, logits)

使用Spark ML管道训练模型

estimater = NNEstimater(newModel, CrossEntropyCriterion(), transformer) \\                .setLearningRate(0.003).setBatchSize(40).setMaxEpoch(2) \\                .setFeaturesCol(\u0026quot;image\u0026quot;).setCachingSample(False)nnModel = estimater.fit(df)

基于Analytics Zoo的真实AI案例

如上所述,有许多早期用户已经在Analytics Zoo上构建了真实的应用程序。在本节中,我们将更详细地描述如何在Microsoft Azure的Analytics Zoo上使用NLP技术构建端到端的文本分类管道。

文本分类概述

文本分类是一种常见的自然语言处理任务,其目的是将输入文本语料库分类为一个或多个类别。例如,垃圾邮件检测将电子邮件的内容分为垃圾邮件或非垃圾邮件类别。

一般来说,文本分类模型的训练包括以下步骤:收集和准备训练数据集及验证数据集、数据清理和预处理、训练模型、验证和评估模型、优化模型(包括但不限于添加数据、调整超参数、调整模型)。

Analytics Zoo中有几个预定义的文本分类器可以开箱即用,即CNN、LSTM、GRU。我们选择从CNN开始。我们在下面的文本中使用Python API来说明训练过程。

from zoo.models.textclassificationimport TextClassifiertext_classifier = TextClassifier(class_num, embedding_file, \\               sequence_length=500, encoder=\u0026quot;cnn\u0026quot;, encoder_output_dim=256)

在上面的API中,class_num是这个问题中的类别数量,embedding_fileis是预训练词向量文件的路径(目前只支持Glove ),sequence_length是每个文本记录中包含的单词数,encoder 是词编码器的类型(可以是CNN、LSTM或GRU),encoder_output_dim是这个编码器的输出。该模型接收词索引序列作为输入,输出标签。

数据收集和预处理

训练数据集中的每个记录包含两个字段,一个是dialogue和一个是label。我们收集了数千条这样的记录,并通过手动和半自动的方式收集标签。然后,我们对原始文本进行数据清理,去掉无意义的标记和混淆的部分,并将它们转换为文本RDD,每个记录的格式为一个(文本、标签)对。接下来,我们对文本RDD进行预处理,并输出我们的模型可以接收的正确形式。请确保你的数据清洗和处理对训练和预测都是一样的!

(How to get invoice …, 1)
(Can you send invoice to me…,1)
(Remote service connection failure…,2)
(How to buy…, 3)

上面是数据清理只有的文本RDD示例(每条记录是一个文本-标签对)。

1.数据读取

我们可以使用Analytics Zoo提供的TextSet以分布式方式读取文本数据,如下所示。

from zoo.feature.text import TextSetfrom zoo.common.nncontext import init_nncontextsc = init_nncontext(\u0026quot;Text Classification\u0026quot;)text_set = TextSet.read(data_path, sc)

2.分词

然后我们将句子分解为单词,将每个输入文本转换为一个标记(单词)数组,并对标记进行规范化(例如,删除未知字符并转换为小写)。

text_set = text_set.tokenize()   \\                   .normalize()

3.序列对齐

不同的文本可能会生成不同大小的标记数组。但是,文本分类模型要求输入的所有记录大小固定。因此,我们必须将标记数组对齐到相同的大小(在parametersequence_lengthin文本分类器中指定)。如果标记数组的大小大于所需的大小,则从开头或结尾对单词进行删除;否则,我们将无意义的单词填充到数组的末尾(例如“##”)。

text_set= text_set.shape_sequence(sequence_length)

4.词索引

标记数组大小对齐后,需要将每个标记(单词)转换为索引,可用来查找其词向量(在文本分类器模型中)。在单词转换为索引的过程中,我们还通过删除文本中出现频率最高N个单词来移除停用词(即经常出现在文本中但无助于语义理解的单词,如“the”、“of”等)。

text_set= text_set.word2idx(remove_topN=10, max_words_num)

5.转换成样本

经过以上步骤,每个文本都变成一个有形状的张量(sequence_length, 1),然后我们从每个记录构造一个BigDL样本,以生成的张量作为特征,以标签整数作为标签。

text_set = text_set.generate_sample()

模型训练、测试、评估和优化

在以相同的方式准备好训练数据集(train_rdd)和验证数据集(val_rdd)之后,我们实例化一个新的TextClassifier模型(text_classifier),然后创建一个优化器以分布式方式训练模型。我们使用稀疏分类交叉熵(Sparse Categorical Cross Entropy)作为损失函数。

train_set, val_set= text_set.random_split( \\    [training_split, 1 - training_split])model.compile(optimizer=Adagrad(learningrate=float(options.learning_rate), \\                                learningrate_decay=0.001),              loss=\u0026quot;sparse_categorical_crossentropy\u0026quot;, \\              metrics=['accuracy'])model.fit(train_set,batch_size=int(options.batch_size), \\          nb_epoch=max_epoch, validation_data=val_set)

训练时可调整的参数包括轮数、批大小、学习率等。你可以指定输出指标的验证选项,如在训练过程中设置准确度验证,以检测过拟合或欠拟合。

如果在验证数据集上获得的结果不好,我们就必须优化模型。通常,这是指重复调整超参数/数据/模型、训练和验证的过程,直到结果足够好为止。通过调整学习速率、添加新数据和扩充停用词字典,我们的准确率得到了显著提高。

有关Analytics Zoo中的文本处理和分类支持的更多细节,请参阅这些文档。

作者简介Jason Dai是英特尔大数据技术高级首席工程师兼首席技术官,负责领导全球工程团队(包括硅谷和上海)开发先进的大数据分析和机器学习技术。他是Apache Spark的提交者、PMC成员、Apache MXNet导师、Strata Data Conference北京站的联合主席以及BigDL(Apache Spark的分布式深度学习框架)的创建者。

查看英文原文:Analytics Zoo: Unified Analytics + AI Platform for Distributed Tensorflow, and BigDL on Apache Spark

相关文章:

  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • oracle问题 ORA-01843:not a valid month
  • 退款证书相关
  • 如何将本地文件上传至阿里云ECS中
  • React 作者关于 Hooks 的深度 issue,值得你阅读
  • 【AIX】在命令前显示完整路径
  • Promise,我是这么理解的!
  • python 怎样使用单个反斜杠\
  • 在IPv6之前
  • 数据挖掘领域经典算法——CART算法
  • JavaScript原型的实际应用
  • 微软宣布Azure Function支持Python
  • adb
  • java监控工具VisualVM
  • linux学习day3
  • 0基础学习移动端适配
  • canvas 高仿 Apple Watch 表盘
  • Java-详解HashMap
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • Python学习之路16-使用API
  • React-flux杂记
  • 创建一种深思熟虑的文化
  • 机器学习中为什么要做归一化normalization
  • 那些被忽略的 JavaScript 数组方法细节
  • 如何在 Tornado 中实现 Middleware
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 选择阿里云数据库HBase版十大理由
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • ​卜东波研究员:高观点下的少儿计算思维
  • ​总结MySQL 的一些知识点:MySQL 选择数据库​
  • #LLM入门|Prompt#2.3_对查询任务进行分类|意图分析_Classification
  • (AngularJS)Angular 控制器之间通信初探
  • (C)一些题4
  • (C++17) optional的使用
  • (附源码)springboot学生选课系统 毕业设计 612555
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (三)Honghu Cloud云架构一定时调度平台
  • (转)linux下的时间函数使用
  • (转载)Linux 多线程条件变量同步
  • (轉貼) 蒼井そら挑戰筋肉擂台 (Misc)
  • *(长期更新)软考网络工程师学习笔记——Section 22 无线局域网
  • ./include/caffe/util/cudnn.hpp: In function ‘const char* cudnnGetErrorString(cudnnStatus_t)’: ./incl
  • .mysql secret在哪_MYSQL基本操作(上)
  • .NET Core IdentityServer4实战-开篇介绍与规划
  • .net core控制台应用程序初识
  • .NetCore实践篇:分布式监控Zipkin持久化之殇
  • .NET大文件上传知识整理
  • @DataRedisTest测试redis从未如此丝滑
  • @value 静态变量_Python彻底搞懂:变量、对象、赋值、引用、拷贝
  • [ IOS ] iOS-控制器View的创建和生命周期
  • [20171113]修改表结构删除列相关问题4.txt
  • [Angularjs]asp.net mvc+angularjs+web api单页应用之CRUD操作
  • [asp.net core]project.json(2)
  • [BZOJ1877][SDOI2009]晨跑[最大流+费用流]