通过 IntelliJ IDEA 打包 Flink Scala 项目
在 Windows 上安装 Flink 可以参考 这里 。
安装 Scala 插件
在 IntelliJ IDEA 中安装 Scala 插件(具体的可以参考 这里)。
- 在 File -> Settings 中选择 Plugins
- 查询 Scala 然后安装
安装 sbt 1.3.0
在 SBT 官网 下载 sbt-1.3.0.msi 后安装。
下载示例项目 flink-project 并打包
从 GitHub tillrohrmann/flink-project 上获取示例项目的源码。
该项目自带两个示例 WordCount 和 SocketTextStreamWordCount
用 IntelliJ IDEA 打开后,在控制台(Terminal)运行 sbt
命令。
bash
sbt clean package
打包生成的文件在 target\scala-2.12 目录下。
运行 WordCount 示例
WordCount.scala 统计固定字符串中的单词数,然后打印到控制台。
其代码如下:
java
package org.example
import org.apache.flink.api.scala._
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram
* over some sample data
*
* This example shows how to:
*
* - write a simple Flink program.
* - use Tuple data types.
* - write and use user-defined functions.
*/
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = text.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }
.groupBy(0)
.sum(1)
// execute and print result
counts.print()
}
}
定位到 Flink 的根目录后执行如下命令(注意替换 jar 包的地址为你本机保存的文件路径)。
bash
bin\flink.bat run -c org.example.WordCount D:\liujiajia\github\flink-project\target\scala-2.12\flink-project_2.12-0.1-SNAPSHOT.jar
打印结果如下:
bash
D:\flink\flink-1.9.0>bin\flink.bat run -c org.example.WordCount D:\liujiajia\git
hub\flink-project\target\scala-2.12\flink-project_2.12-0.1-SNAPSHOT.jar
log4j:WARN No appenders could be found for logger (org.apache.flink.client.cli.C
liFrontend).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more in
fo.
Starting execution of program
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)
Program execution finished
Job with JobID 9a2f9b67ea279cc337eb3f944e19e203 has finished.
Job Runtime: 465 ms
Accumulator Results:
- c3775c9a05fc465154267a3392ace514 (java.util.ArrayList) [26 elements]
运行 SocketTextStreamWordCount 示例
SocketTextStreamWordCount.scala 示例从指定的 socket 地址获取文本然后统计单词数,最后打印到输出。
java
package org.example
import org.apache.flink.streaming.api.scala._
/**
* This example shows an implementation of WordCount with data from a text socket.
* To run the example make sure that the service providing the text data is already up and running.
*
* To start an example socket text stream on your local machine run netcat from a command line,
* where the parameter specifies the port number:
*
* {{{
* nc -lk 9999
* }}}
*
* Usage:
* {{{
* SocketTextStreamWordCount <hostname> <port> <output path>
* }}}
*
* This example shows how to:
*
* - use StreamExecutionEnvironment.socketTextStream
* - write a simple Flink Streaming program in scala.
* - write and use user-defined functions.
*/
object SocketTextStreamWordCount {
def main(args: Array[String]) {
if (args.length != 2) {
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.sum(1)
counts print
env.execute("Scala SocketTextStreamWordCount Example")
}
}
先启动一个 socket 服务。这里需要使用 netcat,下载地址:github diegocr/netcat。
切换到 netcat 根目录执行如下命令创建 socket 服务。
bash
nc -l -p 9000
之后切换到 Flink 根目录执行如下命令运行 Job(注意替换 jar 包的地址为你本机保存的文件路径)。
bash
bin\flink.bat run -c org.example.SocketTextStreamWordCount D:\liujiajia\github\flink-project\target\scala-2.12\flink-project_2.12-0.1-SNAPSHOT.jar localhost 9000
在 netcat 窗口输入单词,在启动 Flink 时打开的 task manager 窗口中会打印统计结果。