Flink(八)Flink的Parallelism并行度
一、Flink的Parallel Execution
实例
1.Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
operators、data sources、data sinks都可以调用setParallelism()方法来设置parallelism
2.Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example");
在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism
3.Client Level
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
或者
try { PackagedProgram program = new PackagedProgram(file, args); InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123"); Configuration config = new Configuration(); Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader()); // set the parallelism to 10 here client.run(program, 10, true); } catch (ProgramInvocationException e) { e.printStackTrace(); }
使用CLI client,可以在命令行调用是用-p来指定,或者Java/Scala调用时在Client.run的参数中指定parallelism
4.System Level
# The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1
可以在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism
小结
Flink可以设置好几个level的parallelism,其中包括Operator Level、Execution Environment Level、Client Level、System Level 在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism;在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism