Flink 从 RabbitMQ 读取并计算后输出到 MySQL
实现的需求是从 RabbitMQ 读取 JSON 格式的消息,处理结果输出到 MySQL。
主要参考了 这篇博客 和 Apache Flink 中文文档 。
编程语言: Scala 2.12.10
构建工具: sbt 1.3.0
IDE:IntelliJ IDEA Community 2019.1
开发环境的搭建可以参考 这篇博客 => 通过 IntelliJ IDEA 打包 Flink Scala 项目 。
1. 通过 IntelliJ IDEA 创建 Scala -> sbt 项目
sbt 选择 1.3.0
Scala 选择 2.12.10
2. 在 build.sbt 中添加引用
主要使用如下几个包:
- genson-scala:Json 序列化/反序列化
- druid:阿里的数据库连接池
- mysql-connector-java:MySQL 的 Connector
- flink-connector-rabbitmq:RabbitMQ 的 Connector
build.sbt :
sbtPlugin := true
name := "octopus-behavior-analysis"
version := "0.1"
//scalaVersion := "2.12.10"
val flinkVersion = "1.9.0"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"com.owlike" %% "genson-scala" % "1.6" % "compile",
"com.alibaba" % "druid" % "1.1.20" % "compile",
"mysql" % "mysql-connector-java" % "8.0.17" % "compile",
"org.apache.flink" %% "flink-connector-rabbitmq" % flinkVersion % "compile")
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关于 scalaVersion 的设置为什么要注释掉详见 这篇博客。
3. RabbitMQStreamWordCount.scala
使用 RMQSource
从 RabbitMQ 队列中读取消息。
由于消息格式为 Json,这里使用的 AbstractDeserializationSchema
自定义了反序列化处理(如果是简单的字符串可使用 SimpleStringSchema
)。
反序列化处理使用了 genson 的 fromJson
方法。
之后就可以使用 DataStream
的 API 做各种转换了,这里仅统计了消息中 id 出现的次数,比较简单,仅作参考。
package octopus.ba
import com.owlike.genson.defaultGenson._
import octopus.ba.config.RabbitMQConfig
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig
object RabbitMQStreamWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val connectionConfig = new RMQConnectionConfig.Builder()
.setHost(RabbitMQConfig.host) // 例:192.168.0.1
.setPort(RabbitMQConfig.port) // 一般使用默认端口 5672
.setUserName(RabbitMQConfig.userName)
.setPassword(RabbitMQConfig.password)
.setVirtualHost(RabbitMQConfig.virtualHost) // 如果没有配置的话,则设置为默认的虚拟 Host "/"
.build()
val stream = env
.addSource(new RMQSource[RabbitMQMessageModel](
connectionConfig,
"queue_name",
true,
new AbstractDeserializationSchema[RabbitMQMessageModel]() {
override def deserialize(bytes: Array[Byte]): RabbitMQMessageModel = fromJson[RabbitMQMessageModel](new String(bytes))
} ))
.setParallelism(1)
stream.addSink(new SinkVisitLineLog)
val counts = stream
.map(x => (x.id, 1))
.keyBy(0)
counts.addSink(new SinkVisitLineStatistics)
counts.print()
env.execute("Scala RabbitMQStreamWordCount Example")
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
4. SinkVisitLineStatistics.scala
导出到 MySQL 需要自定义继承 RichSinkFunction
的类。
package octopus.ba
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import java.sql._
class SinkVisitLineStatistics extends RichSinkFunction[(String, Int)] {
private var ps: PreparedStatement = null
private var connection:Connection = null
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@throws[Exception]
override def open(parameters: Configuration): Unit = {
super.open(parameters)
connection = MySqlConnection.getConnection()
}
@throws[Exception]
override def close(): Unit = {
super.close()
// 关闭连接和释放资源
if (connection != null) connection.close()
if (ps != null) ps.close()
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@throws[Exception]
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
ps = connection.prepareStatement("INSERT INTO user_visit_line_statistics (LineGuid, VisitCount) VALUES (?,?) ON DUPLICATE KEY UPDATE VisitCount = VisitCount + VALUES(VisitCount);")
ps.setString(1, value._1)
ps.setInt(2, value._2)
ps.executeUpdate
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
另外还用到了一个 SinkVisitLineLog.scala,写法也类似,只有接受的参数类型不一样。
package octopus.ba
import java.sql._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class SinkVisitLineLog extends RichSinkFunction[RabbitMQMessageModel] {
private var ps: PreparedStatement = null
private var connection:Connection = null
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@throws[Exception]
override def open(parameters: Configuration): Unit = {
super.open(parameters)
connection = MySqlConnection.getConnection
}
@throws[Exception]
override def close(): Unit = {
super.close()
//关闭连接和释放资源
if (connection != null) connection.close()
if (ps != null) ps.close()
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@throws[Exception]
override def invoke(value: RabbitMQMessageModel, context: SinkFunction.Context[_]): Unit = {
ps = connection.prepareStatement("insert into user_visit_line_log (LineGuid) values(?);")
ps.setString(1, value.id)
ps.executeUpdate
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
5. MySqlConnection.scala MySql 连接工厂
创建链接池提取成一个静态类。
关于 MySQL 的 URL 后面为什么加上 ?serverTimezone=UTC
详见 这篇博客 。
package octopus.ba
import java.sql.Connection
import com.alibaba.druid.pool.DruidDataSource
import octopus.ba.config.MySqlConfig
/**
* MySql 连接工厂
*/
object MySqlConnection {
private var druidDataSource = new DruidDataSource
def getConnection() = {
druidDataSource.setDriverClassName("com.mysql.jdbc.Driver")
// 例:"jdbc:mysql://192.168.0.1:3306/mydbname?serverTimezone=UTC"
druidDataSource.setUrl(MySqlConfig.url)
druidDataSource.setUsername(MySqlConfig.username)
druidDataSource.setPassword(MySqlConfig.password)
// 设置连接池的一些参数
// 1.数据库连接池初始化的连接个数
druidDataSource.setInitialSize(50)
// 2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
druidDataSource.setMaxActive(200)
// 3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
druidDataSource.setMinIdle(30)
var con:Connection = null
try {
con = druidDataSource.getConnection
System.out.println("创建连接池:" + con)
} catch {
case e: Exception =>
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage)
}
con
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
6. 添加 sbt-assembly 插件
在 project 目录下创建 plugins.sbt 文件。
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
如果没有自动更新的话,可以通过在 Terminal 窗口执行 sbt update
命令更新。
7. 在 IntelliJ IDEA 中通过右键 RabbitMQStreamWordCount.scala 选择 Run/Debug 运行
若出现 java.lang.NoClassDefFoundError: org/apache/flink/api/common... 错误需要在 Run/Debug Configurations 中勾选 Include dependencies with “Provided” scope。
具体见 这篇博客
8. 打包项目
因为包含一些 compile 的依赖,所以需要打包成 fat jar。
通过 sbt assembly
可以生成包含 compile scope 的依赖。
同样的在 Terminal 窗口执行 sbt clean assembly
命令。
打包后的 jar 文件保存在 \target\scala-2.12\sbt-1.0 目录下。
详情见 这篇博客
9. 发布 Job 到 Flink 运行
本机环境 Flink 是安装在 D:\flink\flink-1.9.0 文件的。示例的批处理文件如下:
D:
cd D:\flink\flink-1.9.0
bin\flink.bat run -c octopus.ba.RabbitMQStreamWordCount C:\Users\liujiajia\Documents\octopus-behavior-analysis-flink-jobs\target\scala-2.12\sbt-1.0\octopus-behavior-analysis-assembly-0.1.jar
2
3
成功运行后可以在 Apache Flink Dashboard 中查看 Job 的状态,也可以手动取消任务。
由于从 RabbitMQ 中订阅的方式,这种 Job 不会自动结束,所以上面代码中的数据库连接方式需要调整为在 Sink 类的 invoke
方法中获取,否则会因为数据库关闭空闲连接而导致 Job 失败。