当前位置: 首页 > news >正文

基于配置实现RoaringBitMap的交、差、并集处理

 本文项目github链接:baiye21/RoaringbitmapDemo · GitHub

系列文章:

RoaringBitMap处理海量数据内存diff_roaringbitmap实践-CSDN博客

Roaringbitmap+Mysql构建标签--实际使用问题-CSDN博客

一、背景

   前文提到,使用Roaringbitmap解决了大数据量级别的客户标签检索问题,在实际应用后,经常会有原子标签通过交、并、差集逻辑,组合成业务标签再提供给业务使用的逻辑,如果通过代码实现,无法避免需要频繁修改代码,于是,进一步迭代,是否可以将原子标签的逻辑组合转为配置,再通过代码解析配置,即可执行原子标签的逻辑组合呢?这样,只要生产原子标签,那么基于原子标签的各种组合逻辑,基于配置实现后,无论业务标签定义如何修改,都可以通过修改配置,零代码实现。

二、代码设计简述

   首先定义了一个ITagProcessor接口,用于定义标签数据处理方法。TagFetcherProcessor类则负责根据指定的标签类型和ID列表获取标签原始数据。TagAndLogicProcessor和TagOrLogicProcessor,实现了交集(标签AND)和并集(标签OR)逻辑运算。TagNotLogicProcessor类,用于执行差集(标签NOT)操作。TagProcessorFactory标签处理器解析工厂,负责解析配置,创建具体的处理器实例。

2.1、标签处理接口:

   定义标签处理方法以及标签处理器初始化校验方法

public interface ITagProcessor {/*** 标签处理** @return RoaringBitmap*/RoaringBitmap process();/*** 处理器初始化校验*/void initCheck();
}

2.2、标签取数处理器:

   从数据库或者缓存中取出标签原始数据,这里注意的是CustomerTagService是从Spring容器取得的,因为基于配置的处理器都是通过反射实例化出来的,所以这里单独定义了一个注解,表明需要从容器中注入的bean,这个注解后续处理器解析工厂会再次使用。

@Data
public class TagFetcherProcessor implements ITagProcessor {/*** 标签类型*/private String tagType;/*** 标签ID列表*/private List<String> tagTypeIds;@TagProcessorSpringBean(beanName = "customerTagServiceImpl")private CustomerTagService customerTagService;/*** 返回取数结果** @return RoaringBitmap*/@Overridepublic RoaringBitmap process() {// 改为缓存时,此处需要做复制处理return customerTagService.roaringBitMapClass(tagType, tagTypeIds, BitmapOperateEnum.OR);}@Overridepublic void initCheck() {Preconditions.checkNotNull(tagType, "tagType不能为空");Preconditions.checkNotNull(tagTypeIds, "tagTypeIds不能为空");Preconditions.checkNotNull(customerTagService, "customerTagService不能为空");}
}

2.3、标签逻辑与(交集)或(并集)处理器:

   内含标签处理器列表,可进行系列标签处理器的逻辑与或操作,当处理器列表元素为一时,实际变为标签取数处理器。注:与或处理器,需要改变的只有andOrLogic方法里的操作符

public abstract class AbstractAndOrTagLogic implements ITagProcessor {abstract RoaringBitmap andOrLogic();public abstract void addTagProcess(ITagProcessor tagProcess);public static AbstractAndOrTagLogic getAndOrLogicProcess(String operate) {if (BitmapOperateEnum.AND.getCode().equals(operate)) {return new TagAndLogicProcessor();} else if (BitmapOperateEnum.OR.getCode().equals(operate)) {return new TagOrLogicProcessor();} else {throw new IllegalArgumentException("异常标签操作类型" + operate);}}@Overridepublic RoaringBitmap process() {return andOrLogic();}}
@Data
public class TagAndLogicProcessor extends AbstractAndOrTagLogic {/*** 处理标签处理器*/private List<ITagProcessor> processorList;@Overridepublic void addTagProcess(ITagProcessor tagProcess) {if (CollectionUtils.isEmpty(processorList)) {processorList = new ArrayList<>();}processorList.add(tagProcess);}@OverrideRoaringBitmap andOrLogic() {RoaringBitmap result = initResult();int i = 1;while (i < processorList.size()) {result = BitOperationUtil.roaringBitMapOperate(result, processorList.get(i).process(),BitmapOperateEnum.AND);i++;}return result;}private RoaringBitmap initResult() {return processorList.get(1).process();}@Overridepublic void initCheck() {Preconditions.checkNotNull(processorList, "processorList不能为空");Preconditions.checkArgument(processorList.size() >= 1, "processorList至少要有一个元素");}
}

2.4、标签取反(差集)处理器:

   两个标签的取反处理,接收source和target两个处理器参数。 

@Data
public class TagNotLogicProcessor implements ITagProcessor {/*** 处理标签处理器Source*/private ITagProcessor processorSource;/*** 处理标签处理器Target*/private ITagProcessor processorTarget;@Overridepublic RoaringBitmap process() {return BitOperationUtil.roaringBitMapOperate(processorSource.process(), processorTarget.process(),BitmapOperateEnum.NOT);}@Overridepublic void initCheck() {Preconditions.checkNotNull(processorSource, "processorSource不能为空");Preconditions.checkNotNull(processorTarget, "processorTarget不能为空");}
}

    

2.5、标签配置解析工厂:

   解析标签处理器json串,通过反射机制,实例化各个标签处理器,组合成标签处理流程。

@Slf4j
@Component
public class TagProcessorFactory {public ITagProcessor createTagProcessorFlow(String config)throws JsonProcessingException, ClassNotFoundException, InstantiationException, IllegalAccessException {Map<String, Object> configMap = new ObjectMapper().readValue(config, new TypeReference<Map<String, Object>>() {});// processor_createreturn createProcessor(configMap);}private ITagProcessor createProcessor(Map<String, Object> processorConfig)throws ClassNotFoundException, InstantiationException, IllegalAccessException, JsonProcessingException {// 实例化ITagProcessor processor = createProcessorByType((String) processorConfig.get("class_name"));// 初始化 - 属性赋值populateFields(processor, processorConfig);// 处理器初始化校验processor.initCheck();return processor;}private ITagProcessor createProcessorByType(String className)throws ClassNotFoundException, InstantiationException, IllegalAccessException {Class<?> clazz = Class.forName(className);if (!ITagProcessor.class.isAssignableFrom(clazz)) {throw new IllegalArgumentException(clazz.getTypeName() + " class is not implemented ClueDataProcessor");}return (ITagProcessor) clazz.newInstance();}private void populateFields(ITagProcessor processor, Map<String, Object> configMap)throws IllegalAccessException, JsonProcessingException, ClassNotFoundException, InstantiationException {Field[] fields = processor.getClass().getDeclaredFields();Map<String, Object> params = (Map<String, Object>) configMap.get("params");for (Field field : fields) {field.setAccessible(Boolean.TRUE);// spring_bean注入if (field.isAnnotationPresent(TagProcessorSpringBean.class)) {injectSpringBean(processor, field);// 属性赋值} else {injectFieldValue(processor, field, params);}}}/*** 注入spring_bean** @param processor processor* @param field     field* @throws IllegalAccessException ..*/private void injectSpringBean(ITagProcessor processor, Field field) throws IllegalAccessException {// 如果有spring_bean 注解TagProcessorSpringBean springBeanAnnotation = AnnotationUtils.getAnnotation(field, TagProcessorSpringBean.class);String beanName = springBeanAnnotation.beanName();if (StringUtils.isBlank(beanName)) {beanName = field.getName();}field.set(processor, SpringContextUtil.getBean(beanName));}private void injectFieldValue(ITagProcessor processor, Field field, Map<String, Object> paramMap)throws IllegalAccessException, JsonProcessingException, ClassNotFoundException, InstantiationException {// 集合类型if (Collection.class.isAssignableFrom(field.getType())) {Collection<Object> paramCollection = initParamCollection(field);// 判断是否是 tag_processif (checkProcessCollectionParam(field)) {List<Map<String, Object>> paramMapList = (List<Map<String, Object>>) paramMap.get(field.getName());for (Map<String, Object> tempMap : paramMapList) {paramCollection.add(createProcessor(tempMap));}} else {List<Object> paramMapList = (List<Object>) paramMap.get(field.getName());for (Object obj : paramMapList) {paramCollection.add(obj);}}field.set(processor, paramCollection);// 其他} else {if (ITagProcessor.class.isAssignableFrom(field.getType())) {Map<String, Object> tempParamMap = (Map<String, Object>) paramMap.get(field.getName());field.set(processor, createProcessor(tempParamMap));} else {field.set(processor, paramMap.get(field.getName()));// TODO 校验指定了枚举的filed是否取值正常// checkFieldEnumValue(processor, field);}}}/*** 返回集合类型的初始化集合* TODO: 当前只有list,Set类型** @param field field* @return Collection*/private Collection initParamCollection(Field field) {if (List.class.isAssignableFrom(field.getType())) {return new ArrayList();} else if (Set.class.isAssignableFrom(field.getType())) {return new HashSet();} else {throw new IllegalArgumentException("Unsupported Collection type");}}/*** 判断 Collection 类型参数 是否为 processor类型** @param field Field* @return true/false*/private boolean checkProcessCollectionParam(Field field) {Type genericType = field.getGenericType();if (!(genericType instanceof ParameterizedType)) {throw new IllegalArgumentException("Field genericType Source Value " + genericType);}// List<T>ParameterizedType parameterizedType = (ParameterizedType) genericType;Type actualTypeArgument = parameterizedType.getActualTypeArguments()[0];if (actualTypeArgument instanceof Class) {Class<?> actualClass = (Class<?>) actualTypeArgument;if (ITagProcessor.class.isAssignableFrom(actualClass)) {return true;}}try {// 基础类型无需进一步转换ParameterizedType rowType = (ParameterizedType) actualTypeArgument;log.info("rowType " + rowType.getClass());Class<?> typeClass = (Class<?>) rowType.getRawType();return ITagProcessor.class.isAssignableFrom(typeClass);} catch (ClassCastException e) {log.error("checkProcessCollectionParam ClassCastException type:{}", actualTypeArgument, e);}return false;}
}

2.6、配置案例: 

{"type": "TagNotLogicProcessor","class_name": "com.demo.roaringbitmap.processor.TagNotLogicProcessor","params": {"processorSource": {"type": "TagAndLogicProcessor","class_name": "com.demo.roaringbitmap.processor.TagAndLogicProcessor","params": {"processorList": [{"type": "TagFetcherProcessor","class_name": "com.demo.roaringbitmap.processor.TagFetcherProcessor","params": {"tagType": "cus_class","tagTypeIds": ["D"]}},{"type": "TagFetcherProcessor","class_name": "com.demo.roaringbitmap.processor.TagFetcherProcessor","params": {"tagType": "cus_activity","tagTypeIds": ["1","2","3"]}}]}},"processorTarget": {"type": "TagFetcherProcessor","class_name": "com.demo.roaringbitmap.processor.TagFetcherProcessor","params": {"tagType": "cus_risk_level","tagTypeIds": ["R1","R2","R3","R4","R5"]}}}
}

三、关键代码逻辑

3.1、处理器配置解析工厂

/**
* 配置文件解析后,实例化处理器流程,实例化、属性赋值、初始化校验
*/
private ITagProcessor createProcessor(Map<String, Object> processorConfig)throws ClassNotFoundException, InstantiationException, IllegalAccessException, JsonProcessingException {// 实例化ITagProcessor processor = createProcessorByType((String) processorConfig.get("class_name"));// 初始化 - 属性赋值populateFields(processor, processorConfig);// 处理器初始化校验processor.initCheck();return processor;}/**
* 通过反射机制,实例化处理器
*/
private ITagProcessor createProcessorByType(String className)throws ClassNotFoundException, InstantiationException, IllegalAccessException {Class<?> clazz = Class.forName(className);if (!ITagProcessor.class.isAssignableFrom(clazz)) {throw new IllegalArgumentException(clazz.getTypeName() + " class is not implemented ClueDataProcessor");}return (ITagProcessor) clazz.newInstance();}/**
* 处理器的属性赋值,区分从Spring容器获取bean还是从配置文件读取
* TagProcessorSpringBean 注解在此处区分是否需要从Spring容器取bean
*/
private void populateFields(ITagProcessor processor, Map<String, Object> configMap)throws IllegalAccessException, JsonProcessingException, ClassNotFoundException, InstantiationException {Field[] fields = processor.getClass().getDeclaredFields();Map<String, Object> params = (Map<String, Object>) configMap.get("params");for (Field field : fields) {field.setAccessible(Boolean.TRUE);// spring_bean注入if (field.isAnnotationPresent(TagProcessorSpringBean.class)) {injectSpringBean(processor, field);// 属性赋值} else {injectFieldValue(processor, field, params);}}}/*** 注入spring_bean** @param processor processor* @param field     field* @throws IllegalAccessException ..*/private void injectSpringBean(ITagProcessor processor, Field field) throws IllegalAccessException {// 如果有spring_bean 注解TagProcessorSpringBean springBeanAnnotation = AnnotationUtils.getAnnotation(field, TagProcessorSpringBean.class);String beanName = springBeanAnnotation.beanName();if (StringUtils.isBlank(beanName)) {beanName = field.getName();}field.set(processor, SpringContextUtil.getBean(beanName));}/**
* 处理器属性填充,以下两点注意:
*1、集合类型的赋值需要特殊处理
*2、处理器参数仍然是处理器类型,则需要递归创建
*/
private void injectFieldValue(ITagProcessor processor, Field field, Map<String, Object> paramMap)throws IllegalAccessException, JsonProcessingException, ClassNotFoundException, InstantiationException {// 集合类型if (Collection.class.isAssignableFrom(field.getType())) {Collection<Object> paramCollection = initParamCollection(field);// 判断是否是 tag_processif (checkProcessCollectionParam(field)) {List<Map<String, Object>> paramMapList = (List<Map<String, Object>>) paramMap.get(field.getName());for (Map<String, Object> tempMap : paramMapList) {paramCollection.add(createProcessor(tempMap));}} else {List<Object> paramMapList = (List<Object>) paramMap.get(field.getName());for (Object obj : paramMapList) {paramCollection.add(obj);}}field.set(processor, paramCollection);// 其他} else {// 如果处理器属性仍然是处理器类型,则进入递归,再次创建处理器if (ITagProcessor.class.isAssignableFrom(field.getType())) {Map<String, Object> tempParamMap = (Map<String, Object>) paramMap.get(field.getName());field.set(processor, createProcessor(tempParamMap));} else {field.set(processor, paramMap.get(field.getName()));}}}

3.2、处理器的嵌套

  因为一个完成的标签处理流程,最底层一定是有一个标签取数处理器,取数处理的处理结果是一个标签,可以继续提供给逻辑处理器,而逻辑处理器的输出,也是一个标签,所以,ITagProcessor接口定义的process()方法是一个无参方法,返回结果是一个Roaringbitmap,取数和逻辑处理器均实现该方法,而逻辑处理器的不同之处在于,其内部还有ITagProcessor的成员变量,也就意味着,逻辑处理器可以接受取数处理器的结果或者同样为逻辑处理器的结果,这样,也就实现了处理器的嵌套,这一点,也可以从配置文件看出,各个处理器的嵌套关系。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • JavaSE第11篇:设计模式
  • Servlet(2)
  • C语言 | Leetcode C语言题解之第321题拼接最大数
  • 免费自动化AI视频剪辑工具
  • 深入解析Apache Flink中的事件时间与处理时间
  • 使用TensorRT对YOLOv8模型进行加速推理
  • 解决方案:Cannot write to ‘torch-2.0.1+cu118-cp310-cp310-linux_x86_64.whl.3’ (成功).
  • 我的256天创作纪念日
  • 《学会 SpringMVC 系列 · 剖析初始化》
  • 学习分享:电商平台 API 接入技术要点深度剖析
  • 分享一个简单线性dp
  • 2024 年华数杯全国大学生数学建模竞赛题目B 题 VLSI 电路单元的自动布局完整成品文章分享
  • C++——哈希结构
  • 中国县城建设统计年鉴(2015-2022年)
  • 基础算法之模拟
  • ES6指北【2】—— 箭头函数
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • exif信息对照
  • isset在php5.6-和php7.0+的一些差异
  • JSDuck 与 AngularJS 融合技巧
  • php的插入排序,通过双层for循环
  • Shell编程
  • thinkphp5.1 easywechat4 微信第三方开放平台
  • TypeScript迭代器
  • 编写符合Python风格的对象
  • 容器化应用: 在阿里云搭建多节点 Openshift 集群
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • 一起参Ember.js讨论、问答社区。
  • 自制字幕遮挡器
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​Python 3 新特性:类型注解
  • ​如何使用QGIS制作三维建筑
  • #LLM入门|Prompt#3.3_存储_Memory
  • (12)Hive调优——count distinct去重优化
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (安卓)跳转应用市场APP详情页的方式
  • (附源码)springboot美食分享系统 毕业设计 612231
  • (南京观海微电子)——示波器使用介绍
  • (七)Appdesigner-初步入门及常用组件的使用方法说明
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (四) Graphivz 颜色选择
  • (正则)提取页面里的img标签
  • (轉)JSON.stringify 语法实例讲解
  • .net core webapi 大文件上传到wwwroot文件夹
  • .NET Standard 的管理策略
  • .net 流——流的类型体系简单介绍
  • :O)修改linux硬件时间
  • ??eclipse的安装配置问题!??
  • @拔赤:Web前端开发十日谈
  • [000-002-01].数据库调优相关学习
  • [20171102]视图v$session中process字段含义
  • [ACM独立出版]2024年虚拟现实、图像和信号处理国际学术会议(ICVISP 2024)
  • [ACTF2020 新生赛]Upload 1
  • [BZOJ1053][HAOI2007]反素数ant