idea方式一:
a)添加flink-dist包
b)代碼
import com.example.demo.flink.impl.SplitWordFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: demo * @description: wordcount * @author: yang * @create: 2020-12-28 16:52 */ public class WordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(2); String hostname = "192.168.75.101"; int port = 5000; DataStreamSource<String> dataStream = env.socketTextStream(hostname, port); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new SplitWordFunction()).keyBy(0).sum(1); result.print(); env.execute("test word count"); } }
c)啟動5000端口
nc -l 5000 -v
d) 啟動flink對應的main函數
e) 打開界面
方式二:
方式二比方式一方便點,但是方式二一般用於本地測試,方式一可用於生產
a) 環境改為web
b)代碼
import com.example.demo.flink.impl.SplitWordFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: demo * @description: wordcount * @author: yang * @create: 2020-12-28 16:52 */ public class WordCount { public static void main(String[] args) throws Exception { //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(2); String hostname = "192.168.75.101"; int port = 5000; DataStreamSource<String> dataStream = env.socketTextStream(hostname, port); SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new SplitWordFunction()).keyBy(0).sum(1); result.print(); env.execute("test word count"); } }
方式三:
方式三通過flink安裝包查看界面
./start-cluster.bat 或
./start-cluster.sh