通过 IntelliJ IDEA 打包 Flink Scala 项目
在 Windows 上安装 Flink 可以参考 这里 。
安装 Scala 插件
在 IntelliJ IDEA 中安装 Scala 插件(具体的可以参考 这里)。
- 在 File -> Settings 中选择 Plugins
- 查询 Scala 然后安装
安装 sbt 1.3.0
在 SBT 官网 下载 sbt-1.3.0.msi 后安装。
下载示例项目 flink-project 并打包
从 GitHub tillrohrmann/flink-project 上获取示例项目的源码。
该项目自带两个示例 WordCount 和 SocketTextStreamWordCount
用 IntelliJ IDEA 打开后,在控制台(Terminal)运行 sbt
sbt clean package
打包生成的文件在 target\scala-2.12 目录下。
运行 WordCount 示例
WordCount.scala 统计固定字符串中的单词数,然后打印到控制台。
package org.example
import org.apache.flink.api.scala._
* Implements the "WordCount" program that computes a simple word occurrence histogram
* over some sample data
* This example shows how to:
* - write a simple Flink program.
* - use Tuple data types.
* - write and use user-defined functions.
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
// execute and print result
定位到 Flink 的根目录后执行如下命令(注意替换 jar 包的地址为你本机保存的文件路径)。
bin\flink.bat run -c org.example.WordCount D:\liujiajia\github\flink-project\target\scala-2.12\flink-project_2.12-0.1-SNAPSHOT.jar
D:\flink\flink-1.9.0>bin\flink.bat run -c org.example.WordCount D:\liujiajia\git
log4j:WARN No appenders could be found for logger (org.apache.flink.client.cli.C
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more in
Starting execution of program
Program execution finished
Job with JobID 9a2f9b67ea279cc337eb3f944e19e203 has finished.
Job Runtime: 465 ms
Accumulator Results:
- c3775c9a05fc465154267a3392ace514 (java.util.ArrayList) [26 elements]
运行 SocketTextStreamWordCount 示例
SocketTextStreamWordCount.scala 示例从指定的 socket 地址获取文本然后统计单词数,最后打印到输出。
package org.example
import org.apache.flink.streaming.api.scala._
* This example shows an implementation of WordCount with data from a text socket.
* To run the example make sure that the service providing the text data is already up and running.
* To start an example socket text stream on your local machine run netcat from a command line,
* where the parameter specifies the port number:
* {{{
* nc -lk 9999
* }}}
* Usage:
* {{{
* SocketTextStreamWordCount <hostname> <port> <output path>
* }}}
* This example shows how to:
* - use StreamExecutionEnvironment.socketTextStream
* - write a simple Flink Streaming program in scala.
* - write and use user-defined functions.
object SocketTextStreamWordCount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
counts print
env.execute("Scala SocketTextStreamWordCount Example")
先启动一个 socket 服务。这里需要使用 netcat,下载地址:github diegocr/netcat。
切换到 netcat 根目录执行如下命令创建 socket 服务。
nc -l -p 9000
之后切换到 Flink 根目录执行如下命令运行 Job(注意替换 jar 包的地址为你本机保存的文件路径)。
bin\flink.bat run -c org.example.SocketTextStreamWordCount D:\liujiajia\github\flink-project\target\scala-2.12\flink-project_2.12-0.1-SNAPSHOT.jar localhost 9000
在 netcat 窗口输入单词,在启动 Flink 时打开的 task manager 窗口中会打印统计结果。