当前位置: 首页 > news >正文

Zilliz 推出 Spark Connector:简化非结构化数据处理流程

随着人工智能(AI)和深度学习(Deep Learning)技术的高速发展,使用神经网络模型将数据转化为 Embedding 向量 已成为处理非结构化数据并实现语义检索的首选方法,广泛应用于搜索、推荐系统等 AI 业务中。

以生产级别的搜索系统为例,该系统通常包含两个部分:离线数据索引和在线查询服务。实现该系统需要使用多种技术栈。例如,在离线处理中,如何将来源于多种渠道的非结构化数据数据高效、方便地处理并推送到向量数据库以实现在线查询,是一个充满挑战的问题。Apache Spark 和 Databricks 是应用广泛的大批量数据处理方案。Zilliz Cloud 推出了 Spark Connector。该工具将 Milvus 和 Zilliz Cloud 向量数据库 API 集成于 Apache Spark 和 Databricks 任务,大大简化数据处理和推送的实现难度。

本文将介绍 Spark Connector 及其应用场景,并手把手教你如何使用它实现数据推送。

Spark Connector 工作原理及使用场景

Apache Spark 和 Databricks 适合处理海量数据,例如以批量的方式进行非结构化数据清洗并调用模型生成 Embedding 向量。而 Milvus 则擅长存储模型生成的 Embedding 向量数据,并构建索引支持在线服务中的高效查询。这两大工具的强强联合可以实现轻松开发生成式 AI、推荐系统、图像和视频搜索等应用。

当用户在搭建 AI 应用时,很多用户都会遇到如何将数据从 Apache Spark 或 Databricks 导入到 Milvus 或 Zilliz Cloud (全托管的 Milvus 服务) 中的问题。使用 Spark Connector,用户能够在 Apache Spark 或 Databricks 任务中直接调用函数,完成数据向 Milvus 的增量插入或者批量导入,不需要再额外实现“胶水”业务逻辑,简化了数据推送流程。

批量导入数据

由于深度学习进展日新月异,专注于深度学习的团队通常需要频繁更新 Embedding 模型。在第一次批量建库,或者每次更新模型后,都需要处理全量数据、生成一套新的向量数据集。这样一来,就需要启动一个新的 Spark 任务来执行这一次处理,并将新的向量数据集重新插入到向量数据库中以供在线服务使用。有了 Databricks Connector,您只需要授予 Spark 任务写入 Milvus S3 bucket (或者授予 Zilliz Cloud 访问临时的数据源 bucket)的权限即可。简化后的数据处理流程允许您仅仅通过一个简单的函数调用将 Spark 任务生成的向量直接加载到 Milvus 或 Zilliz Cloud 实例中。

增量插入数据

// Specify the target Milvus instance and vector data collection
df.write.format("milvus").option(MILVUS_URI, "https://in01-xxxxxxxxx.aws-us-west-2.vectordb.zillizcloud.com:19535").option(MILVUS_TOKEN, dbutils.secrets.get(scope = "zillizcloud", key = "token")).option(MILVUS_COLLECTION_NAME, "text_embedding").option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding").option(MILVUS_COLLECTION_VECTOR_DIM, "128").option(MILVUS_COLLECTION_PRIMARY_KEY, "id").mode(SaveMode.Append).save()

对于数据量相对较小的用户而言,使用 Spark Connector 也能简化开发工作。您的任务中无需再实现建立服务端连接以及插入数据的代码,只需调用 Connector 中提供的函数即可。

如何使用 Spark Connector

下面,我们将介绍如何使用 Spark Connector 简化数据迁移和处理流程。

使用 Dataframe 直接进行增量插入

使用 Spark Connector,您可以直接利用 Apache Spark 中 Dataframe 的 write API 将数据以增量方式插入到 Milvus 中,大幅降低数据插入流程的实现成本。同理,您也可以直接将数据从 Apache Spark 或 Databricks 导入到 Zilliz Cloud(全托管的 Milvus 服务)中。以下为示例代码:

将数据批量导入到 Collection 中

如果您需要将大量数据高效导入 Collection 中,我们推荐使用 MilvusUtils. bulkInsertFromSpark() 函数。

  • 将数据加载到 Milvus Collection 中

这个过程中需要使用 S3 或 MinIO bucket 作为 Milvus 实例的内部存储。Spark 或 Databricks 任务获取 bucket 的写入权限后,就可以使用 Connector 将数据批量写入 bucket 中,最终一次操作批量插入到向量 Collection 中以供查询使用。

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3a://milvus-bucket/result"
df.write.mode("overwrite").format("parquet").save(outputPath)
// Specify Milvus options.
val targetProperties = Map(MilvusOptions.MILVUS_HOST -> host,MilvusOptions.MILVUS_PORT -> port.toString,MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,MilvusOptions.MILVUS_BUCKET -> bucketName,MilvusOptions.MILVUS_ROOTPATH -> rootPath,MilvusOptions.MILVUS_FS -> fs,MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,MilvusOptions.MILVUS_STORAGE_USER -> minioAK,MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
  • 将数据加载到 Zilliz Cloud Collection 中

如果您使用的是全托管 Milvus 服务——Zilliz Cloud,您可以使用 Zilliz Cloud 提供的数据导入 API 。Zilliz Cloud 提供多样的工具和完整的文档,从而帮助您将各种来源(如 Spark)的数据高效导入 Zilliz Cloud 中。您需要设置一个 S3 bucket 作为媒介,然后授权 Zilliz Cloud 读取 bucket 中的数据。这样一来,Zilliz Cloud 数据导入 API 便可无缝将数据从 S3 bucket 加载到向量数据库中。

以 Databricks 为例,开始前,您需要先通过在 Databricks 集群中添加 jar 文件来加载带有Spark Connector 的 Runtime 库。有多种安装库的方法。下图展示了如何从本地上传 jar 至集群。

如需了解更多如何在 Databricks Workspace 中安装库的信息,请参阅 Databrick 官方文档。

批量插入数据时需要将数据存储在一个临时的 bucket 中,随后再批量导入至 Zilliz Cloud 中。您可以先创建一个 S3 bucket,点击此处了解详情。为了保护您的 Zilliz Cloud 鉴权用户名密码安全,您可以跟随指南在 Databricks 上安全管理密码。

以下为批量数据迁移的示例代码。和前文的 Milvus 例子一样,您只需要填写用于鉴权的向量数据库 URI、Token 以及 S3 bucket 的地址、AK、SK。

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write.mode("overwrite").format("mjson").save(outputPath)// Specify Milvus options.
val properties = Map(MILVUS_URI -> uri,MILVUS_TOKEN -> token,MILVUS_COLLECTION_NAME -> collectionName,MILVUS_STORAGE_ENDPOINT -> s3Endpoint,MILVUS_STORAGE_USER -> s3ak,MILVUS_STORAGE_PASSWORD -> s3sk,ZILLIZCLOUD_API_KEY -> apiKey,ZILLIZCLOUD_REGION -> region,ZILLIZCLOUD_INSTANCE_ID -> clusterId,
)
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))// Call util func to bulkinsert data into Zilliz Cloud through Import Data API.
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputDir, "json")

Connector 使用全流程:Notebook 示例

为帮助您快速上手,我们准备了一个 Notebook 示例 完整地介绍了如何使用 Connector 简化数据增量或批式导入至 Milvus 或 Zilliz Cloud 的流程。

总结

Apache Spark 和 Databricks 与 Milvus 和 Zilliz Cloud(全托管的 Milvus 服务)的整合为 AI 应用开发进一步带来了便利。开发人员可以轻松将数据以增量或批量的形式从数据处理端导入 Milvus 和 Zilliz Cloud 中,实现高效的检索。Spark Connector 助力高效开发可扩展的 AI 解决方案,充分释放非结构化数据的潜能。

准备好开启您的 AI 之旅了吗?立刻免费使用 Zilliz Cloud。

本文作者:陈将,Zilliz 生态和 AI 平台负责人。

相关文章:

  • C++类与对象(中)
  • 软件测试中的压力测试和性能测试区别
  • Python装饰器:让函数更强大
  • java-享元模式
  • 从0到1,AI我来了- (3)AI图片识别的理论知识-I
  • 【Linux】从零开始认识多线程 --- 线程ID
  • .NET Core 发展历程和版本迭代
  • 人工蜂鸟算法(Artificial Hummingbird Algorithm,AHA)及其Python和MATLAB实现
  • HTML前端 盒模型及常见的布局 流式布局 弹性布局 网格布局
  • Linux下git入门操作
  • springboot在加了mapper之后报错
  • 第六章:支持向量机
  • 国科大作业考试资料-人工智能原理与算法-2024新编-第十二次作业整理
  • opencv 按键开启连续截图,并加载提示图片
  • 论文写作之latex配置(VSCODE+TEXT LIVE)
  • 2017 前端面试准备 - 收藏集 - 掘金
  • CentOS6 编译安装 redis-3.2.3
  • Django 博客开发教程 8 - 博客文章详情页
  • JavaScript新鲜事·第5期
  • JavaScript中的对象个人分享
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • nodejs:开发并发布一个nodejs包
  • PHP 小技巧
  • python docx文档转html页面
  • SpiderData 2019年2月23日 DApp数据排行榜
  • vue中实现单选
  • Zsh 开发指南(第十四篇 文件读写)
  • 基于web的全景—— Pannellum小试
  • 如何编写一个可升级的智能合约
  • 智能合约Solidity教程-事件和日志(一)
  • Java性能优化之JVM GC(垃圾回收机制)
  • mysql面试题分组并合并列
  • ​LeetCode解法汇总1410. HTML 实体解析器
  • ​LeetCode解法汇总2182. 构造限制重复的字符串
  • #APPINVENTOR学习记录
  • #define,static,const,三种常量的区别
  • $.ajax中的eval及dataType
  • (160)时序收敛--->(10)时序收敛十
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (二十三)Flask之高频面试点
  • (附源码)springboot车辆管理系统 毕业设计 031034
  • (附源码)基于SSM多源异构数据关联技术构建智能校园-计算机毕设 64366
  • (论文阅读31/100)Stacked hourglass networks for human pose estimation
  • (七)Appdesigner-初步入门及常用组件的使用方法说明
  • (七)glDrawArry绘制
  • (十八)用JAVA编写MP3解码器——迷你播放器
  • (四)鸿鹄云架构一服务注册中心
  • (五)IO流之ByteArrayInput/OutputStream
  • (五)关系数据库标准语言SQL
  • (限时免费)震惊!流落人间的haproxy宝典被找到了!一切玄妙尽在此处!
  • (一)Dubbo快速入门、介绍、使用
  • (转)ObjectiveC 深浅拷贝学习
  • (转载)深入super,看Python如何解决钻石继承难题