Flink教程(10) 元组Tuple POJO用KeySelector 多个字段keyBy
一、元组
假设有个流
DataStream<Tuple2<String, Integer>> wordAndOne = ....
1. 单个字段keyBy
用字段位置 wordAndOne.keyBy(0) 用字段表达式 wordAndOne.keyBy(v -> v.f0)
2. 多个字段keyBy
用字段位置 wordAndOne.keyBy(0, 1) 用KeySelector wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception { return Tuple2.of(value.f0, value.f1); } });
上述可用lambda简化
wordAndOne.keyBy( (KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>) value -> Tuple2.of(value.f0, value.f1) );
二、POJO
假设有个流
DataStream<PeopleCount> source = ...
PeopleCount的类定义是
public class PeopleCount { private String province; private String city; private Integer counts; public PeopleCount() { } 省略其他代码。。。 }
1. 单个字段keyBy
source.keyBy(a -> a.getProvince()) source.keyBy(PeopleCount::getProvince)
2. 多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() { @Override public Tuple2<String, String> getKey(PeopleCount value) throws Exception { return Tuple2.of(value.getProvince(), value.getCity()); } });
上述可用lambda简化
map.keyBy( (KeySelector<PeopleCount, Tuple2<String, String>>) value -> Tuple2.of(value.getProvince(), value.getCity()) );