Flink学习17:算子介绍flatMap
1.flatmap核心包含两部操作:
1.将数据切分
2.拍扁
示例:
输出结果: import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object flatMapTest { def main(args: Array[String]): Unit = { //create env val env = StreamExecutionEnvironment.getExecutionEnvironment //create ds val ds = env.fromElements("hadoop is good", "spark is fast", "flink is better") val flatMapedDs = ds.flatMap(_.split(" ")) flatMapedDs.print() env.execute() } }
输出结果:
2.自定义flatmap方法
核心操作:
1.继承FlatMapFunction类
2.重写flatMap方法
示例:
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.util.Collector
object myFlatMapTest {
//defined my flatMap func
//judge the word length,bigger than special length then split,else do nothing class myFlatMap(maxWordLength:Int) extends FlatMapFunction[String, String]{ override def flatMap(t: String, collector: Collector[String]): Unit = {
if (t.length>maxWordLength){ t.split(" ").foreach(collector.collect) } } }
def main(args: Array[String]): Unit = {
//create env val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds val ds = env.fromElements("this is a good morning, long long long and long", "that day is friday")
//use my flatMap func
val flatMapedDs = ds.flatMap(new myFlatMap(25))
flatMapedDs.print()
env.execute()
}
} 输出结果: