SpringBoot下调用kettle脚本
一、pom引入kettle相关依赖
<properties><kettle.version>7.1.0.9-101</kettle.version>
</properties>
<!-- 核心包 -->
<dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-core</artifactId><version>${kettle.version}</version>
</dependency>
<dependency><groupId>pentaho-kettle</groupId><artifactId>kettle-engine</artifactId><version>${kettle.version}</version>
</dependency>
<dependency><groupId>pentaho</groupId><artifactId>metastore</artifactId><version>${kettle.version}</version>
</dependency>
<!-- 其他依赖包,不同版本可能不同 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-vfs2</artifactId><version>2.1</version>
</dependency>
二、设置脚本参数
打开转换或者作业脚本,右键,选择“转换设置”或者“作业设置”,设置命名参数。参数可以在脚本中用${name}
方式使用,包括数据库连接都可以用。在java程序调用时,可以传递参数给脚本,实现动态变化;
三、Java中调用脚本
import java.lang.reflect.Field;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** kettle 工具类* @author 锁战强**/public class KettleUtils {private static String KETTLE_FILE_PATH="";static {try {KettleEnvironment.init();//初始化kettle环境EnvUtil.environmentInit();//读取kettle.properties文件加入环境变量} catch (KettleException e) {throw new AppException(e, ErrorCode.KETTLE_INIT_FAIL);}}/** * 运行转换文件方法* @param ktrPath 转换文件的路径,后缀ktr* @param params 多个参数变量值*/public static void runTransfer(String ktrPath, Map<String,String> variables, String[] params) {Trans trans = null;try {TransMeta transMeta = new TransMeta(KETTLE_FILE_PATH + ktrPath);// 转换trans = new Trans(transMeta);if(variables != null) {for(Entry<String,String> entry : variables.entrySet()) {trans.setParameterValue(entry.getKey(), entry.getValue());}}// 执行转换trans.execute(params);// 等待转换执行结束trans.waitUntilFinished();// 抛出异常if (trans.getErrors() > 0) {throw new AppException(ErrorCode.KETTLE_RUN_JOB_ERROR);}} catch (Exception e) {throw new AppException(e,ErrorCode.KETTLE_RUN_JOB_ERROR);} }/*** java 调用 kettle 的job* */public static void runJob(String jobPath,Map<String,String> variables, String[] params) {try {// jobname 是Job脚本的路径及名称JobMeta jobMeta = new JobMeta(KETTLE_FILE_PATH + jobPath, null); // 向Job 脚本传递参数,脚本中获取参数值:${参数名}for(Entry<String,String> entry : variables.entrySet()) {jobMeta.setParameterValue(entry.getKey(), entry.getValue());}Job job = new Job(null, jobMeta);job.start();job.waitUntilFinished();if (job.getErrors() > 0) {throw new AppException(ErrorCode.KETTLE_RUN_JOB_ERROR);}} catch (Exception e) {throw new AppException(e,ErrorCode.KETTLE_RUN_JOB_ERROR);}}public static void main(String[] args) {String kettlePath = "f:/a.ktr";Map<String,String> map = new HashMap<>();map.put("db.host", "127.0.0.1");map.put("db.name", "compass");map.put("db.port", "1433");map.put("db.user", "sa");map.put("db.pwd", "******"); KettleUtils.runTransfer(kettlePath, map, null);}
}