当前位置: 首页 > news >正文

Flink 1.18.1的基本使用

系统示例应用
/usr/local/flink-1.18.1/bin/flink run /usr/local/flies/streaming/SocketWindowWordCount.jar --port 9010
nc -l 9010
asd asd sdfsf sdf sdfsdagd sdf

在这里插入图片描述

在这里插入图片描述


单次统计示例工程
cd C:\Dev\IdeaProjectsmvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.18.1
 Define value for property 'groupId':Define value for property 'artifactId':Define value for property 'version' 1.0-SNAPSHOT: :Define value for property 'package' : :com.eduflink-example1.0.0com.edu.flink
package com.edu.flink;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;public class WindowWordCount {public static void main(String[] args) throws Exception {//设置运行时环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//设置输入流,并执行数据流的处理和转换env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("192.168.18.128", 9000).flatMap(new Splitter()).keyBy(0).timeWindow(Time.seconds(5)).sum(1);dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));//设置输出流dataStream.print();//执行程序env.execute("Window WordCount");System.out.print("finished...");}public static class Splitter implements FlatMapFunction<String, Tuple2<String,Integer>> {@Overridepublic void flatMap(String sentence, Collector<Tuple2<String, Integer>> out)throws Exception {for (String word : sentence.split(" ")) {out.collect(new Tuple2<String, Integer>(word, 1));}}}}

在这里插入图片描述

在这里插入图片描述

相关文章:

  • 全面理解jvm
  • 板块零 IDEA编译器基础:第二节 创建JAVA WEB项目与IDEA基本设置 来自【汤米尼克的JAVAEE全套教程专栏】
  • 网络异常案例六_IP冲突
  • 【C语言】三子棋游戏实现代码
  • Java赋能:大学生成绩量化新篇章
  • 【机器学习】AAAI 会议论文聚类分析
  • Antd+React+react-resizable实现表格拖拽功能
  • 通过docker-compose部署NGINX服务,并使该服务开机自启
  • DQN的理论研究回顾
  • nvm安装node后,npm无效
  • vue - 指令(一)
  • 在 CentOS 7上使用 Apache 和 mod_wsgi 部署 Django 应用的方法
  • ‘javax.sql.DataSource‘ that could not be found的问题
  • 什么是冒烟测试,UT测试,IT测试,如何来开展这些测试
  • iPhone搞机记录
  • 【跃迁之路】【463天】刻意练习系列222(2018.05.14)
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • Angular 响应式表单之下拉框
  • CentOS从零开始部署Nodejs项目
  • CSS实用技巧
  • flutter的key在widget list的作用以及必要性
  • Git学习与使用心得(1)—— 初始化
  • Go 语言编译器的 //go: 详解
  • Idea+maven+scala构建包并在spark on yarn 运行
  • iOS高仿微信项目、阴影圆角渐变色效果、卡片动画、波浪动画、路由框架等源码...
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • Laravel Telescope:优雅的应用调试工具
  • mysql_config not found
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • PyCharm搭建GO开发环境(GO语言学习第1课)
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 关于List、List?、ListObject的区别
  • 基于webpack 的 vue 多页架构
  • 浅谈web中前端模板引擎的使用
  • 区块链技术特点之去中心化特性
  • 如何优雅地使用 Sublime Text
  • 深入浅出webpack学习(1)--核心概念
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 手写双向链表LinkedList的几个常用功能
  • 智能合约开发环境搭建及Hello World合约
  • PostgreSQL 快速给指定表每个字段创建索引 - 1
  • 阿里云重庆大学大数据训练营落地分享
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • #gStore-weekly | gStore最新版本1.0之三角形计数函数的使用
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • (2)Java 简介
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (原)本想说脏话,奈何已放下
  • (原创)boost.property_tree解析xml的帮助类以及中文解析问题的解决
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • ***测试-HTTP方法
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .gitignore文件_Git:.gitignore