2019独角兽企业重金招聘Python工程师标准>>>
添加依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>
读取mysql数据:
SparkSession spark = SparkSession
.builder()
.appName("Java Spark MYSQL basic example")
.getOrCreate();
String url = "jdbc:mysql://10.37.154.65:3306/cmallpre2";
String table = "t_es_order";
Properties connectionProperties = new Properties();
connectionProperties.setProperty("dbtable", table);// 设置表
connectionProperties.setProperty("user", "CMALLusr");// 设置用户名
connectionProperties.setProperty("password", "PCy5VJYd1Ysn");// 设置密码
// 一个条件表示一个分区
String[] predicates = new String[] {
"1=1 order by TABLE_ID limit 1,10",
"1=1 order by TABLE_ID limit 20,30" };
// 读取数据
Dataset<Row> rows =spark.read().jdbc(url, table, predicates
connectionProperties);
//添加筛选条件
Dataset<Row> filter = rows.filter(col("TABLE_ID").gt("10"));
System.out.println("mysql count:" + filter.count());
spark.close();