通过java代码的方式提交任务远程到flink集群
flink通过java代码的方式提交任务
1. 引入pom文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
2.PackagedProgram类创建
2.1 PackaghedProgram.newBuilder
可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些
val packagedProgram: PackagedProgram = PackagedProgram
.newBuilder()
.setEntryPointClassName("你的flink程序文件主函数入口")
//"你的flink程序文件"
.setJarFile(jarFile)
//"savepoint的信息"
.setSavepointRestoreSettings(SavepointRestoreSettings.none)
.setArguments("你的flink程序可能需要的参数","1","2","3")
.build();
2.2 SavepointRestoreSettings
主要就是保存点的设置,可以直接调用类中的static方法创建
//不设置的情况 SavepointRestoreSettings.none //通过path创建 SavepointRestoreSettings.forPath(path, allowNonRestoredState)
3. 通过PackagedProgramUtils创建JobGraph
3.1 createJobGraph源码
这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId
/**
* Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
* PackagedProgram}.
*
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param jobID the pre-generated job id
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID,
boolean suppressOutput)
throws ProgramInvocationException {
final Pipeline pipeline =
getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, suppressOutput);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
packagedProgram.getUserCodeClassLoader(),
pipeline,
configuration,
defaultParallelism);
if (jobID != null) {
jobGraph.setJobID(jobID);
}
jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
jobGraph.setClasspaths(packagedProgram.getClasspaths());
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
return jobGraph;
}
3.2 创建JobGraph
PakcageProgram configuration flink的conf parallelism 并行度,设置默认的也可以 jobId suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段
val jobGraph: JobGraph = PackagedProgramUtils.createJobGraph(
packagedProgram,
flinkConfig,
parallelism,
null,
false
)
configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.
private def getFlinkDefaultConfiguration(flinkHome: String): Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
}
4. 提交远程任务
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
CompletableFuture<JobID> result = client.submitJob(jobGraph);
JobID jobId= result.get();
System.out.println("提交完成");
System.out.println("jobId:"+ jobId.toString());
