当前位置: 首页 > news >正文

微服务-流量染色

1. 功能目的

通过设置请求头的方式将http请求优先打到指定的服务上,为微服务开发调试工作提供便利

  1. 请求报文难模拟:可以直接在测试环境页面上操作,流量直接打到本地IDEA进行debug
  2. 请求链路较长:本地开发无需启动所有服务,仅需要启动目标服务
  3. 协同开发:与其他人一同开发,并且依赖对方开发的接口,可以直接将自己本地服务的请求发到对方本地服务上
    这是目前主要的使用目的,当然也可以调整负载均衡逻辑,配合网关的一些自定义配置,扩充为灰度发布的效果

2. 实现原理

通过在网关以及Ribbon,实现自定义的负载均衡策略,将请求引流到本地。
PS:需要服务器能访问本地,需要类似OpenVPN这样赋予本地一个IP,供服务器网关能请求到本地
开启OpenVPN之后本机会有多个IP,通过配置指定注册到nacos上的IP

spring.cloud.nacos.discovery.network-interface: 10.0

2.1 本地服务调整

本地启动时,配置文件添加参数,设置一个元数据作为服务的流量标识
比如mdm服务配置参数

spring.cloud.nacos.discovery.metadata.request-mark: azhuzhu

服务启动之后,我们可以在nacos的服务列表里看到元数据
在这里插入图片描述

2.2 网关负载均衡

服务分类:

  1. 服务器服务:metadata中没有 requestMark 参数
  2. 本地服务:metadata中带有 requestMark 参数

网关实现自定义负载均衡策略:

  • 判断请求头中是否带有本地流量标识:requestMark
    • 有标识:判断有无metadata匹配的服务实例
      • 有:调用匹配的服务实例
      • 无:判断目标请求有没有服务器服务实例
        • 有:服务器服务随机数负载
        • 无:可用实例随机负载
    • 无标识:判断目标请求有没有服务器服务实例
      • 有:服务器服务随机数负载
      • 无:可用实例随机负载
        网关负载处理了第一目标服务,假如调用链路为 mdm -> commom-api,我们启动的是common-api,则需要在服务端的ribbon中做负载
        在这里插入图片描述

2.3 请求发起

比如,约定request-mark作为本地流量标识
请求头 request-mark=azhuzhu 表示流量优先打到带有元数据 request-mark=azhuzhu 的服务

  • 浏览器操作:通过浏览器插件ModHeader,在浏览器发起请求时,带上请求头
  • HTTP工具:带上自定义请求头

3. 具体代码介绍

以下所有注册的bean, 都通过指定的配置参数开启

@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")

3.1 自定义负载均衡器

网关及其他客户端的流量染色具体的负载逻辑实现

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import reactor.core.publisher.Mono;import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;/*** 自定义负载均衡器 用于开发环境的流量染色** @author 阿猪 2024-08-09 11:45*/
@Slf4j
@Configuration
@SuppressWarnings("deprecation")
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
@LoadBalancerClients(defaultConfiguration = {ReqMarkLoadBalancer.class})
public class ReqMarkLoadBalancer {public static final String REQUEST_MARK = "request-mark";/*** 开启流量染色时 替换默认的负载器** @param environment               环境信息* @param loadBalancerClientFactory 负载器工厂* @return 自定义负载器*/@Bean@Primarypublic ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);return new RandomLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);}static class RandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;private final String serviceId;public RandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,String serviceId) {this.serviceId = serviceId;this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;}@Overridepublic Mono<Response<ServiceInstance>> choose(Request request) {ServiceInstanceListSupplier supplier =serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);String requestMark = getRequestMark(request);return supplier.get().next().map(serviceInstances -> processInstanceResponseByReqMark(serviceInstances, requestMark));}private String getRequestMark(Request<?> request) {// 客户端的负载 直接从 RequestContextHolder 拿请求头ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes != null) {return attributes.getRequest().getHeader(REQUEST_MARK);}// 网关的负载从 request 取值(网关覆盖了默认实现 把context塞进去了 不然拿到是 null 跟spring boot版本有关系)if (!(request.getContext() instanceof ServerHttpRequest)) {return null;}ServerHttpRequest context = (ServerHttpRequest) request.getContext();if (context.getHeaders().containsKey(REQUEST_MARK)) {return context.getHeaders().getFirst(REQUEST_MARK);}return null;}/*** 默认的随机数负载** @param instances 可用服务实例* @return 命中实例*/private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (CollectionUtils.isEmpty(instances)) {log.warn("No instance available {}", serviceId);return new EmptyResponse();}Random random = new Random();ServiceInstance instance = instances.get(random.nextInt(instances.size()));return new DefaultResponse(instance);}private Response<ServiceInstance> processInstanceResponseByReqMark(List<ServiceInstance> instances, String requestMark) {if (instances.isEmpty()) {return new EmptyResponse();}ServiceInstance sameClusterNameInst = selectInstanceByReqMark(instances, requestMark);return new DefaultResponse(sameClusterNameInst);}private ServiceInstance selectInstanceByReqMark(List<ServiceInstance> instances, String requestMark) {// 元数据不带请求标识的服务, 标识为服务器上的服务List<ServiceInstance> serverInstances = instances.stream().filter(instance -> {Map<String, String> metadata = instance.getMetadata();return MapUtils.isEmpty(metadata) || !metadata.containsKey(REQUEST_MARK);}).collect(Collectors.toList());if (StringUtils.isBlank(requestMark)) {if (CollectionUtils.isEmpty(serverInstances)) {return instances.get(new Random().nextInt(instances.size()));}return serverInstances.get(new Random().nextInt(serverInstances.size()));}List<ServiceInstance> matchInstances = Lists.newArrayList();for (ServiceInstance instance : instances) {Map<String, String> metadata = instance.getMetadata();if (MapUtils.isEmpty(metadata)) {continue;}if (metadata.containsKey(REQUEST_MARK) && requestMark.equals(metadata.get(REQUEST_MARK))) {matchInstances.add(instance);}}Random random = new Random();// 优先匹配到的服务  最后是随机if (CollectionUtils.isNotEmpty(matchInstances)) {return matchInstances.get(random.nextInt(matchInstances.size()));}// 然后是无标识服务(服务器上的服务)if (CollectionUtils.isNotEmpty(serverInstances)) {return serverInstances.get(random.nextInt(serverInstances.size()));}// 前两者都没有就随机获取return instances.get(random.nextInt(instances.size()));}}}

3.2 流量标识请求头透传

这里使用Feign进行内部服务调用,需要将原请求的流量标识 请求头继续传递下去,保证后续的服务链路也能有流量染色的效果。

import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;/*** 流量染色 - 流量标识请求头透传** @author 阿猪 2024-09-25 17:11*/
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class ReqMarkRequestInterceptor implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {// 从 request 中获取流量标识, 设置到 feign 的 requestTemplate中ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (attributes != null) {requestTemplate.header(ReqMarkLoadBalancer.REQUEST_MARK, attributes.getRequest().getHeader(ReqMarkLoadBalancer.REQUEST_MARK));}}}

3.3 网关负载均衡-请求信息获取

由于这个方案中,负载均衡是依靠 请求头 判断的,详见上面请求头的获取ReqMarkLoadBalancer.getRequestMark
在spring boot 2.3.2 版本中 request.getContext是个空的,没法获取请求信息
2.6.x 后面没有这个问题,但需要关注下这个context的类型,调整代码
以下是覆盖默认实现,为 request 的 context 设置请求信息。
实际上是复制 ReactiveLoadBalancerClientFilter的源码稍作修改,看倒数最后两行

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;import java.net.URI;import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;/*** 覆盖默认的负载均衡器 改了choose方法 将request信息传入** @see ReactiveLoadBalancerClientFilter* @author 阿猪 2024-08-09 17:06*/
@Slf4j
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class CustomLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter {private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;private final LoadBalancerClientFactory clientFactory;private final LoadBalancerProperties properties;public CustomLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,LoadBalancerProperties properties) {super(null, null);this.clientFactory = clientFactory;this.properties = properties;}@Overridepublic int getOrder() {return LOAD_BALANCER_CLIENT_FILTER_ORDER;}@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// preserve the original urladdOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()+ " url before: " + url);}return choose(exchange).doOnNext(response -> {if (!response.hasServer()) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}ServiceInstance retrievedInstance = response.getServer();URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";if (schemePrefix != null) {overrideScheme = url.getScheme();}DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);URI requestUrl = reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);}).then(chain.filter(exchange));}@SuppressWarnings("deprecation")private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);if (loadBalancer == null) {throw new NotFoundException("No loadbalancer available for " + uri.getHost());}// 就改了这里 仅调整参数传入 保持原有逻辑(原代码传入了空的request)Request<?> request = new DefaultRequest<>(exchange.getRequest());return loadBalancer.choose(request);}}

将bean覆盖掉,替换掉原本的 ReactiveLoadBalancerClientFilter

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 流量染色 - 将ReactiveLoadBalancerClientFilter覆盖掉 为了获取到http请求头** @author 阿猪 2024-08-09 17:12*/
@Configuration
public class CustomLoadBalancerConfig {@Bean@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,LoadBalancerProperties properties){return new CustomLoadBalancerClientFilter(clientFactory, properties);}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Windows安装openssl开发库
  • 『功能项目』宠物的攻击巨型化【80】
  • 手机也可以更换任意IP地址吗?
  • Linux中修改MySQL密码
  • 自学网络安全(黑客技术)2024年 —90天学习计划
  • java中的动态代理
  • 驱动开发系列18 - PAGE_SHIFT 解释
  • Golang | Leetcode Golang题解之第438题找到字符串中所有字母异位词
  • 智能监控,守护绿色能源:EasyCVR在电站视频监控中心的一站式解决方案
  • 华为GaussDB数据库(单机版)在ARM环境下的安装指南
  • 【C++笔试强训】如何成为算法糕手Day5
  • Html jquery下拉select美化插件——selectFilter.js
  • 街头摊贩检测系统源码分享
  • 微信小程序showLoading ,showToast ,hideLoading连续调用出现showLoading 不关闭的情况记录
  • Linux云计算 |【第四阶段】NOSQL-DAY2
  • JavaScript 如何正确处理 Unicode 编码问题!
  • 【技术性】Search知识
  • chrome扩展demo1-小时钟
  • CSS 三角实现
  • Date型的使用
  • Flex布局到底解决了什么问题
  • mongo索引构建
  • Ruby 2.x 源代码分析:扩展 概述
  • TiDB 源码阅读系列文章(十)Chunk 和执行框架简介
  • Vue.js-Day01
  • vue.js框架原理浅析
  • 聚簇索引和非聚簇索引
  • 聊聊hikari连接池的leakDetectionThreshold
  • 驱动程序原理
  • 使用common-codec进行md5加密
  • ​​​【收录 Hello 算法】10.4 哈希优化策略
  • ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTr
  • #vue3 实现前端下载excel文件模板功能
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • #如何使用 Qt 5.6 在 Android 上启用 NFC
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • (13)Hive调优——动态分区导致的小文件问题
  • (2)Java 简介
  • (C#)一个最简单的链表类
  • (k8s)Kubernetes 从0到1容器编排之旅
  • (二)构建dubbo分布式平台-平台功能导图
  • (生成器)yield与(迭代器)generator
  • (算法)求1到1亿间的质数或素数
  • (转)c++ std::pair 与 std::make
  • .NET CLR基本术语
  • .net on S60 ---- Net60 1.1发布 支持VS2008以及新的特性
  • .NET开源项目介绍及资源推荐:数据持久层
  • .NET命令行(CLI)常用命令
  • .Net下C#针对Excel开发控件汇总(ClosedXML,EPPlus,NPOI)
  • ::
  • @Import注解详解
  • @SpringBootApplication 注解
  • [ 云计算 | AWS ] AI 编程助手新势力 Amazon CodeWhisperer:优势功能及实用技巧
  • [100天算法】-实现 strStr()(day 52)
  • [2]十道算法题【Java实现】