package com.thinker; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * Hello world! */ public class App { public static final class LineSplitter implements FlatMapFunction> { @Override public void flatMap(String s, Collector> collector) throws Exception { String[] tokens = s.toLowerCase().split("\\W+"); for (String token: tokens){ if (token.length() > 0){ collector.collect(new Tuple2(token, 1)); } } } } public static void main(String[] args) throws Exception { System.out.println("Hello World!"); if (args.length != 2) { System.err.println("USAGE:\nSocketTextStreamWordCount "); return; } String hostname = args[0]; Integer port = Integer.parseInt(args[1]); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource streamSource = env.socketTextStream(hostname, port); SingleOutputStreamOperator> sum = streamSource.flatMap(new LineSplitter()) .keyBy(0).sum(1); sum.print(); env.execute("Java WordCount from SocketTextStream Example"); } }