国内大量揄拍人妻精品視頻,国产午夜亚洲精品久久,国产精品视频色尤物yw,国产精品爽爽ⅴa在线观看,国模大尺度私拍

  • 鄭州
您的位置: 法制網(wǎng) > 綜合 > > 詳情

環(huán)球消息!大數(shù)據(jù)NiFi(二十):實時同步MySQL數(shù)據(jù)到Hive

來源: 騰訊云 2023-02-27 15:20:21

?實時同步MySQL數(shù)據(jù)到Hive

案例:將mysql中新增的數(shù)據(jù)實時同步到Hive中。

以上案例需要用到的處理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。

首先通過“CaptureChangeMySQL”讀取MySQL中數(shù)據(jù)的變化(需要開啟MySQL binlog日志),將Binlog中變化的數(shù)據(jù)同步到“RouteOnAttribute”處理器,通過此處理器獲取上游數(shù)據(jù)屬性,獲取對應(yīng)binlog操作類型,再將想要處理的數(shù)據(jù)路由到“EvaluateJsonPath”處理器,該處理器可以將json格式的binlog數(shù)據(jù)解析,通過自定義json 表達式獲取json數(shù)據(jù)中的屬性放入FlowFile屬性,將FlowFile通過“ReplaceText”處理器獲取上游FowFile屬性,動態(tài)拼接sql替換所有的FlowFile內(nèi)容,將拼接好的sql組成FlowFile路由到“PutHiveQL”將數(shù)據(jù)寫入到Hive表。


(資料圖片)

一、開啟MySQL的binlog日志

mysql-binlog是MySQL數(shù)據(jù)庫的二進制日志,記錄了所有的DDL和DML(除了數(shù)據(jù)查詢語句)語句信息。一般來說開啟二進制日志大概會有1%的性能損耗。這里需要開啟MySQL的binlog日志方便后期使用“CaptureChangeMySQL”處理器來獲取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。

1、登錄mysql查看MySQL是否開啟binlog日志

[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

2 、開啟mysql binlog日志

在/etc/my.cnf文件中[mysqld]下寫入以下內(nèi)容:

[mysqld]#隨機指定一個不能和其他集群中機器重名的字符串server-id=123#配置binlog日志目錄,配置后會自動開啟binlog日志,并寫入該目錄log-bin=/var/lib/mysql/mysql-bin

3、重啟mysql 服務(wù),重新查看binlog日志情況

[root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

二、???????配置“CaptureChangeMySQL”處理器

“CaptureChangeMySQL”主要是從MySQL數(shù)據(jù)庫捕獲CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作發(fā)生時的順序輸出為單獨的FlowFile文件。

關(guān)于“CaptureChangeMySQL”處理器的“Properties”主要配置的說明如下:

配置項

默認(rèn)值

允許值

描述

MySQL Hosts(MySQL 節(jié)點)

MySQL集群節(jié)點相對應(yīng)的主機名/端口項的列表。多個節(jié)點使用逗號分隔,格式為:host1:port、host2:port…,處理器將嘗試按順序連接到列表中的主機。如果一個節(jié)點關(guān)閉,并且群集啟用了故障轉(zhuǎn)移,那么處理器將連接到活動節(jié)點。

MySQL Driver Class Name(MySQL驅(qū)動名稱)

com.mysql.jdbc.Driver

MySQL數(shù)據(jù)庫驅(qū)動程序類的類名。

MySQL Driver Location(s)(MySQL驅(qū)動的位置)

包含MySQL驅(qū)動程序包及其依賴項的文件/文件夾和/或url的逗號分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。

Username(用戶名)

訪問MySQL集群的用戶名。

Password(密碼)

訪問MySQL集群的密碼。

Database/Schema Name Pattern(匹配數(shù)據(jù)庫/Schema)

用于根據(jù)CDC事件列表匹配數(shù)據(jù)庫(或模式,具體取決于RDBMS類型)的正則表達式。正則表達式必須與存儲在RDBMS中的數(shù)據(jù)庫名稱匹配。如果未設(shè)置屬性,則數(shù)據(jù)庫名稱將不會用于篩選CDC事件。

Table Name Pattern(匹配表)

用于匹配影響匹配表的CDC事件的正則表達式(regex)。regex必須與存儲在數(shù)據(jù)庫中的表名匹配。如果未設(shè)置屬性,則不會根據(jù)表名篩選任何事件。

Max Wait Time(最大連接等待時長)

30 seconds

允許建立連接的最長時間,零表示實際上沒有限制。

Distributed Map Cache Client(分布式緩存客戶端)

指定用于保存處理器所需的各種表、列等信息的分布式映射緩存客戶端控制器服務(wù)。如果未指定,則生成的事件將不包括列類型或名稱等信息。

Retrieve All Records(檢索所有記錄)

true

?true?false

指定是否獲取所有可用的CDC事件,而不考慮當(dāng)前的binlog文件名或位置。如果處理器狀態(tài)中存在binlog文件名和位置值,則忽略此屬性的值。這允許4種不同的配置:1).如果處理器State中存在binlog數(shù)據(jù),則State用來確定開始位置,并忽略Retrieve All Records的值。(目前NiFi版本測試有問題)2).如果處理器State中不存在binlog數(shù)據(jù),此值設(shè)置為true意味著從頭開始讀取Binlog 數(shù)據(jù)。3).如果處理器State中不存在binlog數(shù)據(jù),并且沒有指定binlog文件名和位置,此值設(shè)置為false意味著從binlog尾部開始讀取數(shù)據(jù)。4).如果處理器State中不存在binlog數(shù)據(jù),并指定binlog文件名和位置,此值設(shè)置為false意味著從指定binlog尾部開始讀取數(shù)據(jù)。

Include Begin/Commit Events(包含開始/提交事件)

false

?true?false

指定是否發(fā)出與二進制日志中的開始或提交事件相對應(yīng)的事件。如果下游流中需要開始/提交事件,則設(shè)置為true,否則設(shè)置為false,這將抑制這些事件的生成并可以提高流性能。

Include DDL Events(標(biāo)準(zhǔn)表/列名)

false

?true?false

指定是否發(fā)出與數(shù)據(jù)定義語言(DDL)事件對應(yīng)的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,則設(shè)置為true,否則設(shè)置為false。為false時這將抑制這些事件的生成,并可以提高流性能。

配置步驟如下:

1、創(chuàng)建“CaptureChangeMySQL”處理器

2、配置“DistributeMapCacheServer”控制服務(wù)

監(jiān)控mysql變化需要設(shè)置“DistributedMapCacheClient”控制服務(wù),其對應(yīng)的Server中存儲處理器所需的各種表、列等信息,所以這里需要首先配置“DistributeMapCacheServer”控制服務(wù)。

?

?

3、配置“SCHEDULING”

由于這里使用“CaptureChangeMySQL”處理器監(jiān)控“MySQL”中的數(shù)據(jù),所以設(shè)置調(diào)度訪問周期為“10s”,防止一直監(jiān)聽MySQL binlog數(shù)據(jù),帶來性能消耗。

?

4、配置“PROPERTIES”

在“CaptureChangeMySQL”處理器中配置“PROPERTIES”,配置如下:

MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar

注意:這里需要在每臺NiFi節(jié)點上創(chuàng)建對應(yīng)目錄,上傳mysql驅(qū)動包。

“PROPERTIES”配置如下:

此外,在“PROPERTIES”中還需要配置“Distributed Map Cache Client”控制服務(wù),來讀取“DistributeMapCacheServer”控制服務(wù)中的緩存數(shù)據(jù):

?

另外,這里我們只是監(jiān)控表“test2”對應(yīng)的CDC事件,這里設(shè)置匹配表名為“test2”,最終“PROPERTIES”的配置如下:

注意:以上“Table Name Pattern”這里配置對應(yīng)的Value值為:test2,也可以不配置,不配置會監(jiān)控所有MySQL表的變化對應(yīng)的binlog事件。當(dāng)后面向Hive表中插入新增和更新數(shù)據(jù)時,對應(yīng)MySQL中的元數(shù)據(jù)表也會變化,也會監(jiān)控到對應(yīng)的binlog事件。為了避免后期出現(xiàn)監(jiān)控到其他表的binlog日志,這里建議配置上“test2”。

5、啟動MySQL,創(chuàng)建表“test2”測試“CaptureChangeMySQL”處理器

登錄mysql ,使用“mynifi”庫,創(chuàng)建表“test2”。暫時設(shè)置“CaptureChangeMySQL”處理器“success”事件自動終止并啟動,向表中插入對應(yīng)的數(shù)據(jù)查看“CaptureChangeMySQL”處理器能否正常監(jiān)控事件。

在mysql中創(chuàng)建對應(yīng)的表:

use mynifi;create table test2 (id int,name varchar(255),age int);

啟動“CaptureChangeMySQL”處理器:

向表“test2”中插入以下數(shù)據(jù):

insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

可以在“CaptureChangeMySQL”處理器中右鍵“View data provenance”查看捕獲到的“insert”、“update”、“delete”事件:

注意問題:在配置好“CaptureChangeMySQL”處理器啟動后,當(dāng)MySQL中有數(shù)據(jù)插入、修改、刪除時當(dāng)前處理器會讀取MySql binlog日志,并在當(dāng)前處理器中記錄讀取binlog的位置狀態(tài)。正常來說這里關(guān)閉“CaptureChangeMySQL”處理器后再次啟動,會接著保存的binlog位置繼續(xù)讀?。梢詤⒄铡癙ROPERTIES”屬性中“Retrieve All Records”配置說明),但是經(jīng)過測試,此NiFi版本出現(xiàn)以下錯誤(無效的binlog位置,目測是一個版本bug錯誤):

所以在之后的測試中,我們可以將“CaptureChangeMysql”處理器讀取binlog的狀態(tài)清空,然后再次啟動即可,這里會重復(fù)讀取MySQL之前已經(jīng)檢測到的新增、修改、刪除數(shù)據(jù)。

清空“CaptureChangeMysql”讀取binlog狀態(tài):

三、??????????????配置“RouteOnAttribute”處理器

“RouteOnAttribute”是根據(jù)FlowFile的屬性使用屬性表達式進行數(shù)據(jù)路由。

關(guān)于“RouteOnAttribute”處理器的“Properties”主要配置的說明如下:

配置項

默認(rèn)值

描述

Routing Strategy(路由策略)

Route to Property name

指定在計算表達式語言時如何使用哪個關(guān)系。有如下幾個關(guān)系可選擇:?Route to Property nameFlowFile的副本將被路由到對應(yīng)的表達式計算結(jié)果為"true"的每個關(guān)系。?Route to "matched" if all match要求所有用戶定義的表達式求值都為"true",才認(rèn)為FlowFile是匹配的。?Route to "matched" if any matches至少有一個用戶定義的表達式求值為"true",才能認(rèn)為FlowFile是匹配的。

注意:該處理器允許用戶自定義屬性并指定該屬性的匹配表達式。屬性與動態(tài)屬性指定的屬性表達式相匹配的FileFlow,映射到動態(tài)屬性上。

配置如下:

1、創(chuàng)建“RouteOnAttribute”處理器

2、配置“PROPERTIES”自定義屬性

注意:以上自定義的屬性中update、insert、delete對應(yīng)的json 表達式寫法為:${cdc.event.type:equals("delete")},代表匹配對應(yīng)類型的FlowFile,“cdc.event.type”是上游FlowFile中的屬性,“equales”是對應(yīng)的方法,“delete”使用單引號引起,表示匹配的CDC事件。

3、連接“CaptureChangeMySQL”處理器與“RouteOnAttribute”處理器

四、配置“EvaluatejsonPath”處理器

“EvaluatejsonPath”處理器將根據(jù)上游“RouteOnAttribute”匹配的事件將內(nèi)容映射成FlowFile屬性,方便后期拼接SQL獲取數(shù)據(jù),上游匹配到的FlowFile中的數(shù)據(jù)格式為:

EvaluatejsonPath”處理器配置如下:

1、配置“EvaluatejsonPath”的“PROPERTIES”屬性

2、連接“RouteOnAttribute”處理器和“EvaluatejsonPath”處理器

連接關(guān)系中,我們這里只關(guān)注“insert”和“update”的數(shù)據(jù),后期獲取對應(yīng)的屬性將插入和更新的數(shù)據(jù)插入到Hive表中,對于“delete”的數(shù)據(jù)可以路由到其他關(guān)系中,例如需要將刪除數(shù)據(jù)插入到另外的Hive表中,可以再設(shè)置個分支處理。這里我們將“delete”和“failure”的數(shù)據(jù)設(shè)置自動終止關(guān)系。

設(shè)置“RouteOnAttribute”處理器其他匹配路由關(guān)系為自動終止:

五、??????????????配置“ReplaceText”處理器

“ReplaceText”處理器可以獲取“EvaluatejsonPath”轉(zhuǎn)換后FlowFile中的屬性來替換原有數(shù)據(jù)組成一個“insert into ... values (... ...)”語句,方便后續(xù)將數(shù)據(jù)插入到Hive中?!癛eplaceText”處理器的配置如下:

1、配置“RelaceText”處理器“PROPERTIES”屬性

在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”

注意:

以上獲取的tablename名稱為“test2”,后面這個sql是要將數(shù)據(jù)插入到Hive中的,所以這里在Hive中也應(yīng)該創(chuàng)建“test2”的表名稱,或者將表名稱寫成固定表,后期在Hive中創(chuàng)建對應(yīng)的表即可。

另外,需要注意${name}在插入Hive中時對應(yīng)的列為字符串,這里需要加上單引號。

2、連接“EvaluatejsonPath”處理器與“ReplaceText”處理器

配置“EvaluatjsonPath”處理器“failure”和“unmatch”路由關(guān)系為自動終止。

六、??????????????配置Hive 支持HiveServer2

訪問Hive有兩種方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置環(huán)境。HiveServer2使得連接Hive的Client從Yarn和HDFS集群中獨立出來,不需要每個幾點都配置Hive和Hadoop的jar包和一系列環(huán)境。

NiFi連接Hive就是使用了HiveServer2方式連接,所以這里需要配置HiveServer2。

配置HiveServer2步驟如下:

1、在Hive服務(wù)端配置hive-site.xml

#在Hive 服務(wù)端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000hive.server2.thrift.bind.host192.168.179.4

2、在每臺Hadoop 節(jié)點配置core-site.xml

     hadoop.proxyuser.root.hosts     *       hadoop.proxyuser.root.groups        * 

3、重啟HDFS ,Hive ,在Hive服務(wù)端啟動Metastore和HiveServer2服務(wù)

nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &

4、在客戶端通過beeline連接Hive

[root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 沒有密碼直接跳過即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+|              tab_name              |+------------------------------------+| personinfo                         || test2                              |+------------------------------------+

以上配置完成后,還需要將配置好的core-site.xml文件發(fā)送到各個NiFi節(jié)點對應(yīng)的路徑/root/test下替換原有的core-site.xml文件。之后重啟NiFi集群,各個NiFi節(jié)點上執(zhí)行命令:

service nifi restart

七、配置“PutHiveQL”處理器

“PutHiveQL”主要執(zhí)行HiveQL的DDL/DML命令,傳入給該處理器的FlowFile內(nèi)容是要執(zhí)行的HiveQL命令。HiveQL命令可以使用“?”來指定參數(shù),這種情況下,參數(shù)必須存在于FlowFile的屬性中,命名約定為hiveql.args.N.type和hiveql.args.N.value,其中N為正整數(shù)。

關(guān)于“PutHiveQL”處理器的“Properties”主要配置的說明如下:

配置項

默認(rèn)值

允許值

描述

Hive Database Connection Pooling Servic(Hive數(shù)據(jù)庫連接池服務(wù))

Hive Controller服務(wù),用于獲取與Hive數(shù)據(jù)庫的連接。

Batch Size(批次大?。?/p>

100

一批次讀取FlowFile的個數(shù)。

Character Set(編碼)

UTF-8

指定數(shù)據(jù)的編碼格式。

Statement Delimiter(語句分隔符)

;

語句分隔符,用于分隔多個語句腳本中的SQL語句。

Rollback On Failure(失敗時回滾)

false

?true?false

指定如何處理錯誤。默認(rèn)false指的是如果在處理FlowFile時發(fā)生錯誤,則FlowFile將根據(jù)錯誤類型路由到“failure”或“retry”關(guān)系,處理器繼續(xù)處理下一個FlowFile。相反,可以設(shè)置為true回滾當(dāng)前已處理的FlowFile,并立即停止進一步的處理。如果設(shè)置為true啟用,失敗的FlowFiles將停留在輸入關(guān)系中并會反復(fù)處理,直到成功處理或通過其他方式將其刪除為止??梢栽O(shè)置足夠大的“Yield Duration”避免重試次數(shù)過多。

“PutHiveQL”處理器的配置如下:

1、創(chuàng)建“PutHiveQL”處理器

?

2、 配置“PROPERTIES”

?

點擊之后,配置“HiveConnectionPool”控制服務(wù):

注意以上需要配置:

“Database Connection URL” :這里是Hive的HiveServer2啟動的節(jié)點,也就是服務(wù)端節(jié)點?!癹dbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,這里需要將以上各個文件在NiFi集群各個節(jié)點對應(yīng)位置準(zhǔn)備好?!癉atabase User”:root,這里防止操作Hive對應(yīng)的HDFS時權(quán)限問題。

配置完成后,需要啟用對應(yīng)的“HiveConnectionPool”控制服務(wù):

最終配置“PROPERTIES”為:

3、連接“ReplaceText”處理器與“PutHiveQL”處理器并設(shè)置關(guān)系

?

設(shè)置“ReplaceText”處理器“failure”路由關(guān)系為自動終止:

設(shè)置“PutHiveQL”處理器路由關(guān)系為自動終止:

?

八、??????????????運行測試

1、在Hive中創(chuàng)建表“test2”

動HDFS,啟動Hive服務(wù)端和客戶端,創(chuàng)建表“test2”

create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";

2、啟動NiFi處理數(shù)據(jù)流程,向MySQL中寫入數(shù)據(jù),查看Hive中表數(shù)據(jù)

首先清空“CaptureChangeMySQL”處理器的狀態(tài),單獨啟動“CaptureChangeMySQL”處理器,清空重新消費的數(shù)據(jù)(以上主要就是避免此版本NiFi bug問題),啟動當(dāng)前案例中其他NiFi處理器。

然后向MySQL中插入以下數(shù)據(jù):

insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

NiFi頁面:

Hive表test2中的結(jié)果:

標(biāo)簽: Hive 云數(shù)據(jù)庫 Server Java
溫馨提示:

在實際法律問題情景中,個案情況都有所差異,為了高效解決您的問題,保障合法權(quán)益,建議您直接向?qū)I(yè)律師說明情況,解決您的實際問題。 立即在線咨詢 >

上一篇
下一篇
相關(guān)知識推薦
操作
分享
15037178970

公眾服務(wù)

法制網(wǎng)公眾號

快速找律師 / 免費咨詢

查法律知識 / 查看解答 / 隨時追問

律師服務(wù)(工作日8:30-18:00 ,非工作日請QQ留言)

律師加盟

律師營銷服務(wù)

在線客服:

加盟熱線:

律師營銷診斷

營銷分析 / 回復(fù)咨詢

案件接洽 / 合作加盟

法制法律網(wǎng),中國知名的 法律咨詢網(wǎng)站,能夠為廣大用戶提供在線 免費法律咨詢服務(wù)。
CopyRight@2003-2022 fazhi.net ALL Rights Reservrd 版權(quán)所有
豫ICP備2022016495號-26
違法和不良信息舉報電話: