diff --git a/.gitignore b/.gitignore index cc4c461..60e4c4f 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ hs_err_pid* .idea *.iml +target diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..379771f --- /dev/null +++ b/pom.xml @@ -0,0 +1,125 @@ + + + + 4.0.0 + + com.thinker + flink-test + 1.0-SNAPSHOT + + flink-test + + http://www.example.com + + + UTF-8 + 1.7 + 1.7 + + + + + + org.apache.flink + flink-scala_2.11 + 1.7.0 + + + + org.apache.flink + flink-streaming-scala_2.11 + 1.7.0 + + + + junit + junit + 4.11 + test + + + + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.4.6 + + + + + compile + testCompile + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + maven-clean-plugin + 3.1.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.8.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + diff --git a/src/main/java/com/thinker/App.java b/src/main/java/com/thinker/App.java new file mode 100644 index 0000000..51c286c --- /dev/null +++ b/src/main/java/com/thinker/App.java @@ -0,0 +1,46 @@ +package com.thinker; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.Collector; + +/** + * Hello world! + */ +public class App { + + public static final class LineSplitter implements FlatMapFunction> { + + @Override + public void flatMap(String s, Collector> collector) throws Exception { + String[] tokens = s.toLowerCase().split("\\W+"); + for (String token: tokens){ + if (token.length() > 0){ + collector.collect(new Tuple2(token, 1)); + } + } + } + } + + public static void main(String[] args) throws Exception { + System.out.println("Hello World!"); + if (args.length != 2) { + System.err.println("USAGE:\nSocketTextStreamWordCount "); + return; + } + String hostname = args[0]; + Integer port = Integer.parseInt(args[1]); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource streamSource = env.socketTextStream(hostname, port); + + SingleOutputStreamOperator> sum = streamSource.flatMap(new LineSplitter()) + .keyBy(0).sum(1); + sum.print(); + env.execute("Java WordCount from SocketTextStream Example"); + } +} diff --git a/src/test/java/com/thinker/AppTest.java b/src/test/java/com/thinker/AppTest.java new file mode 100644 index 0000000..5a1e400 --- /dev/null +++ b/src/test/java/com/thinker/AppTest.java @@ -0,0 +1,20 @@ +package com.thinker; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Unit test for simple App. + */ +public class AppTest +{ + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() + { + assertTrue( true ); + } +}