当前位置: 首页 > news >正文

Flink Rest Basic Auth - 安全认证

背景

公司目前需要将Flink实时作业云化,构建多租户实时计算平台。目前考虑为了资源高效利用,并不打算为每个租户部署一套独立的Kubernetes集群。也就意味着多个租户的作业可能会运行在同一套kubernets集群中。此时实时作业的任务就变的很危险,因为网络可能是通的,就会存在危险的REST API暴露出去,被一些不坏好意的人利用,从而影响其他租户的作业。鉴于此考虑给Flink的作业添加一个认证方式,可以是Kerberos或者是Http 用户名密码Baisc认证。各种搜索和询问,最终发现了一些线索FLIP-181: Custom netty HTTP request inbound/outbound handlers 这里描述了为何flink官方否定这个诉求。当然不要着急,笔者在flink-basic-auth-handler上找到了方案,并且成功将方案迁移到了flink-1.17.2版本中。

改造步骤

Flink 的JobManager/SQLGateway是基于Netty实现的一套轻量级的web服务接口,这些接口都实现了RestServerEndpoint抽象类。因此我们可以看看这个类start方法中可以看到在启动的代码中可以看到InboundChannelHandlerFactory这个东西,通过改Factory创建一个Inbound的hander。

public final void start() throws Exception {synchronized (lock) {Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");log.info("Starting rest endpoint.");final Router router = new Router();final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();handlers = initializeHandlers(restAddressFuture);/* sort the handlers such that they are ordered the following:* /jobs* /jobs/overview* /jobs/:jobid* /jobs/:jobid/config* /:**/Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);checkAllEndpointsAndHandlersAreUnique(handlers);handlers.forEach(handler -> registerHandler(router, handler, log));ChannelInitializer<SocketChannel> initializer =new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws ConfigurationException {RouterHandler handler = new RouterHandler(router, responseHeaders);// SSL should be the first handler in the pipelineif (isHttpsEnabled()) {ch.pipeline().addLast("ssl",new RedirectingSslHandler(restAddress,restAddressFuture,sslHandlerFactory));}ch.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(uploadDir)).addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders));for (InboundChannelHandlerFactory factory :inboundChannelHandlerFactories) {Optional<ChannelHandler> channelHandler =factory.createHandler(configuration, responseHeaders);if (channelHandler.isPresent()) {ch.pipeline().addLast(channelHandler.get());}}ch.pipeline().addLast(new ChunkedWriteHandler()).addLast(handler.getName(), handler).addLast(new PipelineErrorHandler(log, responseHeaders));}};NioEventLoopGroup bossGroup =new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));NioEventLoopGroup workerGroup =new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(initializer);Iterator<Integer> portsIterator;try {portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);} catch (IllegalConfigurationException e) {throw e;} catch (Exception e) {throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);}int chosenPort = 0;while (portsIterator.hasNext()) {try {chosenPort = portsIterator.next();final ChannelFuture channel;if (restBindAddress == null) {channel = bootstrap.bind(chosenPort);} else {channel = bootstrap.bind(restBindAddress, chosenPort);}serverChannel = channel.syncUninterruptibly().channel();break;} catch (final Exception e) {// syncUninterruptibly() throws checked exceptions via Unsafe// continue if the exception is due to the port being in use, fail early// otherwiseif (!(e instanceof java.net.BindException)) {throw e;}}}if (serverChannel == null) {throw new BindException("Could not start rest endpoint on any port in port range "+ restBindPortRange);}log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();final String advertisedAddress;if (bindAddress.getAddress().isAnyLocalAddress()) {advertisedAddress = this.restAddress;} else {advertisedAddress = bindAddress.getAddress().getHostAddress();}port = bindAddress.getPort();log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();restAddressFuture.complete(restBaseUrl);state = State.RUNNING;startInternal();}}

然后在构造函数中可以发现inboundChannelHandlerFactories对象是通过SPI方案加载进来的。

 public RestServerEndpoint(Configuration configuration)throws IOException, ConfigurationException {Preconditions.checkNotNull(configuration);RestServerEndpointConfiguration restConfiguration =RestServerEndpointConfiguration.fromConfiguration(configuration);Preconditions.checkNotNull(restConfiguration);this.configuration = configuration;this.restAddress = restConfiguration.getRestAddress();this.restBindAddress = restConfiguration.getRestBindAddress();this.restBindPortRange = restConfiguration.getRestBindPortRange();this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();this.uploadDir = restConfiguration.getUploadDir();

相关文章:

  • 使用 GPT-4 创作高考作文 2024年
  • 想在VBA软件中做个登录验证会员授权,用什么云服务器好?
  • Python Flask 入门开发
  • Invalid JSON text:“Invalid value.“ at position 0 in value for column ‘user.info
  • 引擎:UI
  • 用爬虫实现---模拟填志愿
  • SmartEDA VS Multisim/Proteus:电子设计江湖,谁主沉浮?
  • Aws EC2,kubeadm方式安装kubernetes(k8s)
  • 【QT】将字符串条件转换为一个可以编程检查的条件
  • 技术周总结2024.06.03~06.09(K8S HikariCP数据库连接池)
  • gorse修改开源项目后,如何使用Docker compose发布
  • 光猫、路由器的路由模式、桥接模式、拨号上网
  • k8s-mysql主从部署
  • 部署kubesphere报错
  • 【运维】如何更换Ubuntu默认的Python版本,update-alternatives如何使用
  • [iOS]Core Data浅析一 -- 启用Core Data
  • 【翻译】babel对TC39装饰器草案的实现
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • css属性的继承、初识值、计算值、当前值、应用值
  • golang 发送GET和POST示例
  • Java多态
  • MySQL Access denied for user 'root'@'localhost' 解决方法
  • php面试题 汇集2
  • zookeeper系列(七)实战分布式命名服务
  • 阿里云爬虫风险管理产品商业化,为云端流量保驾护航
  • 我建了一个叫Hello World的项目
  • 小程序滚动组件,左边导航栏与右边内容联动效果实现
  • 学习使用ExpressJS 4.0中的新Router
  • 原生Ajax
  • 职业生涯 一个六年开发经验的女程序员的心声。
  • 专访Pony.ai 楼天城:自动驾驶已经走过了“从0到1”,“规模”是行业的分水岭| 自动驾驶这十年 ...
  • ​【数据结构与算法】冒泡排序:简单易懂的排序算法解析
  • #pragma 指令
  • $forceUpdate()函数
  • (C#)一个最简单的链表类
  • (delphi11最新学习资料) Object Pascal 学习笔记---第7章第3节(封装和窗体)
  • (附源码)springboot美食分享系统 毕业设计 612231
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(七)
  • (转)一些感悟
  • *++p:p先自+,然后*p,最终为3 ++*p:先*p,即arr[0]=1,然后再++,最终为2 *p++:值为arr[0],即1,该语句执行完毕后,p指向arr[1]
  • ... fatal error LINK1120:1个无法解析的外部命令 的解决办法
  • .Family_物联网
  • .FileZilla的使用和主动模式被动模式介绍
  • .Net Core 生成管理员权限的应用程序
  • .Net Core中Quartz的使用方法
  • .NET/C# 解压 Zip 文件时出现异常:System.IO.InvalidDataException: 找不到中央目录结尾记录。
  • .Net多线程总结
  • .NET中 MVC 工厂模式浅析
  • :O)修改linux硬件时间
  • [ vulhub漏洞复现篇 ] ThinkPHP 5.0.23-Rce
  • [Algorithm][动态规划][路径问题][不同路径][不同路径Ⅱ][珠宝的最高价值]详细讲解
  • [BUG] Authentication Error
  • [C++] vector list 等容器的迭代器失效问题
  • [Go]-抢购类业务方案
  • [IDEA插件] JarEditor 编辑jar包(直接新增、修改、删除jar包内的class文件)