通过java代码的方式提交任务远程到flink集群
flink通过java代码的方式提交任务
1. 引入pom文件
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.0</version> </dependency>
2.PackagedProgram类创建
2.1 PackaghedProgram.newBuilder
可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些
val packagedProgram: PackagedProgram = PackagedProgram .newBuilder() .setEntryPointClassName("你的flink程序文件主函数入口") //"你的flink程序文件" .setJarFile(jarFile) //"savepoint的信息" .setSavepointRestoreSettings(SavepointRestoreSettings.none) .setArguments("你的flink程序可能需要的参数","1","2","3") .build();
2.2 SavepointRestoreSettings
主要就是保存点的设置,可以直接调用类中的static方法创建
//不设置的情况 SavepointRestoreSettings.none //通过path创建 SavepointRestoreSettings.forPath(path, allowNonRestoredState)
3. 通过PackagedProgramUtils创建JobGraph
3.1 createJobGraph源码
这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId
/** * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link * PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph * @param jobID the pre-generated job id * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism, @Nullable JobID jobID, boolean suppressOutput) throws ProgramInvocationException { final Pipeline pipeline = getPipelineFromProgram( packagedProgram, configuration, defaultParallelism, suppressOutput); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader( packagedProgram.getUserCodeClassLoader(), pipeline, configuration, defaultParallelism); if (jobID != null) { jobGraph.setJobID(jobID); } jobGraph.addJars(packagedProgram.getJobJarAndDependencies()); jobGraph.setClasspaths(packagedProgram.getClasspaths()); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); return jobGraph; }
3.2 创建JobGraph
PakcageProgram configuration flink的conf parallelism 并行度,设置默认的也可以 jobId suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段
val jobGraph: JobGraph = PackagedProgramUtils.createJobGraph( packagedProgram, flinkConfig, parallelism, null, false )
configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.
private def getFlinkDefaultConfiguration(flinkHome: String): Configuration = { Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) }
4. 提交远程任务
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false); CompletableFuture<JobID> result = client.submitJob(jobGraph); JobID jobId= result.get(); System.out.println("提交完成"); System.out.println("jobId:"+ jobId.toString());