快捷搜索: 王者荣耀 脱发

通过java代码的方式提交任务远程到flink集群

flink通过java代码的方式提交任务

1. 引入pom文件

<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-clients_2.12</artifactId>
     <version>1.13.0</version>
 </dependency>
2.PackagedProgram类创建
2.1 PackaghedProgram.newBuilder

可以通过它的Builder方法实现,需要设置一些必备的信息,比如mainClass,jarPath这些

val packagedProgram: PackagedProgram = PackagedProgram
    .newBuilder()
    .setEntryPointClassName("你的flink程序文件主函数入口")
     //"你的flink程序文件"
    .setJarFile(jarFile)
     //"savepoint的信息"
    .setSavepointRestoreSettings(SavepointRestoreSettings.none)
    .setArguments("你的flink程序可能需要的参数","1","2","3")
    .build();
2.2 SavepointRestoreSettings

主要就是保存点的设置,可以直接调用类中的static方法创建

//不设置的情况
SavepointRestoreSettings.none
//通过path创建
SavepointRestoreSettings.forPath(path, allowNonRestoredState)
3. 通过PackagedProgramUtils创建JobGraph
3.1 createJobGraph源码

这个类下有两个static方法方法可以直接创建JobGraph,区别是一个指定生成JobId,一个随机生成JobId

/**
  * Creates a {@link JobGraph} with a specified {@link JobID} from the given {@link
  * PackagedProgram}.
  *
  * @param packagedProgram to extract the JobGraph from
  * @param configuration to use for the optimizer and job graph generator
  * @param defaultParallelism for the JobGraph
  * @param jobID the pre-generated job id
  * @return JobGraph extracted from the PackagedProgram
  * @throws ProgramInvocationException if the JobGraph generation failed
  */
public static JobGraph createJobGraph(
         PackagedProgram packagedProgram,
         Configuration configuration,
         int defaultParallelism,
         @Nullable JobID jobID,
         boolean suppressOutput)
         throws ProgramInvocationException {
          
   
     final Pipeline pipeline =
             getPipelineFromProgram(
                     packagedProgram, configuration, defaultParallelism, suppressOutput);
     final JobGraph jobGraph =
             FlinkPipelineTranslationUtil.getJobGraphUnderUserClassLoader(
                     packagedProgram.getUserCodeClassLoader(),
                     pipeline,
                     configuration,
                     defaultParallelism);
     if (jobID != null) {
          
   
         jobGraph.setJobID(jobID);
     }
     jobGraph.addJars(packagedProgram.getJobJarAndDependencies());
     jobGraph.setClasspaths(packagedProgram.getClasspaths());
     jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());

     return jobGraph;
 }
3.2 创建JobGraph

PakcageProgram configuration flink的conf parallelism 并行度,设置默认的也可以 jobId suppressoutOutPut boolean类型,是否打印stdout/stderr在jobGraph创建阶段

val jobGraph: JobGraph = PackagedProgramUtils.createJobGraph(
    packagedProgram,
    flinkConfig,
    parallelism,
    null,
    false
)

configuration也可以通过加载本地flink-conf.yaml获得,传入flink的安装路径即可.

private def getFlinkDefaultConfiguration(flinkHome: String): Configuration = {
          
   
    Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
  }
4. 提交远程任务
JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false);
            CompletableFuture<JobID> result = client.submitJob(jobGraph);
            JobID jobId=  result.get();
            System.out.println("提交完成");
            System.out.println("jobId:"+ jobId.toString());
经验分享 程序员 微信小程序 职场和发展