訂閱
糾錯(cuò)
加入自媒體

基于XML描述的可編程函數(shù)式ETL實(shí)現(xiàn)

2019-07-02 10:24
EAWorld
關(guān)注

轉(zhuǎn)載本文需注明出處:微信公眾號(hào)EAWorld,違者必究。

引言:

傳統(tǒng) ETL 主要以 SQL 為主要技術(shù)手段,把數(shù)據(jù)經(jīng)抽取、清洗轉(zhuǎn)換之后加載到數(shù)據(jù)倉(cāng)庫(kù)。但是在如今移動(dòng)互聯(lián)網(wǎng)大力發(fā)展的場(chǎng)景下,產(chǎn)生大量碎片化和不規(guī)則的數(shù)據(jù)。政府,公安等行業(yè),傳統(tǒng)數(shù)據(jù)庫(kù)已經(jīng)遠(yuǎn)遠(yuǎn)無(wú)法滿足需求。數(shù)據(jù)原始文件通過(guò)文件導(dǎo)入到基礎(chǔ)庫(kù),再通過(guò)大數(shù)據(jù) HQL等技術(shù)手段提取出二級(jí)庫(kù),這中間的數(shù)據(jù)導(dǎo)入和 SQL ETL 的提取的過(guò)程,大量消耗 IO 性能和計(jì)算資源,在很多場(chǎng)景下已經(jīng)是數(shù)據(jù)處理的瓶頸所在。

普元在實(shí)施公安項(xiàng)目過(guò)程中開(kāi)發(fā)了一種基于 XML 描述的可編程的函數(shù) ETL 轉(zhuǎn)換方法。主要用于大數(shù)據(jù)文件處理領(lǐng)域,能從原始數(shù)據(jù)文件直接、快速加載到專題庫(kù)的技術(shù)手段。技術(shù)方案主要解決了用 XML 的技術(shù)手段描述數(shù)據(jù)文件的格式,包含文件字段切分、字段類型、默認(rèn)值、異常值校驗(yàn)、時(shí)間格式校驗(yàn)。在處理時(shí)可添加自行開(kāi)發(fā)的 JAVA UDF 函數(shù),函數(shù)實(shí)參支持變量、常量、表達(dá)式、函數(shù)和運(yùn)算符重載。同時(shí)函數(shù)支持多層嵌套,即內(nèi)部函數(shù)的返回值最為外部函數(shù)的實(shí)參。該方案實(shí)現(xiàn)了 XML 內(nèi)函數(shù)體的語(yǔ)法解析并在運(yùn)行過(guò)程中直接編譯為 Java 字節(jié)碼的技術(shù)。有效的解決了政府、公安、電信行業(yè)巨量的數(shù)據(jù)處理需要的大量計(jì)算資源和 IO 性能瓶頸,有效的提高了數(shù)據(jù)處理效率和降低了數(shù)據(jù)處理開(kāi)發(fā)難度。

目錄:

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

二、XML 控制文件結(jié)構(gòu)和語(yǔ)法

三、函數(shù)和多層嵌套函數(shù)傳參

四、UDF 函數(shù)編寫(xiě)方法

五、數(shù)據(jù)測(cè)試工具

六、FlumeOnYarn 架構(gòu)和分布式部署

一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹

對(duì)于數(shù)據(jù)開(kāi)發(fā)項(xiàng)目,我們常常會(huì)面臨眾多的數(shù)據(jù)對(duì)接,部分場(chǎng)景不僅數(shù)據(jù)量大,且數(shù)據(jù)種類多,數(shù)據(jù)解析開(kāi)發(fā)工作量巨大。對(duì)于大量數(shù)據(jù)對(duì)接,一般設(shè)計(jì)的 RPC 接口和 WebService 一般都達(dá)不到數(shù)據(jù)性能要求的。并且他們都是點(diǎn)對(duì)點(diǎn)的服務(wù),一旦上下游系統(tǒng)故障,都會(huì)造成整個(gè)數(shù)據(jù)對(duì)接異常。因此大部分都會(huì)選擇使用文件的方式進(jìn)行數(shù)據(jù)對(duì)接。

對(duì)于非實(shí)時(shí)數(shù)據(jù)對(duì)接需求,這種方式的優(yōu)點(diǎn):

在數(shù)據(jù)量大的情況下,可以通過(guò)文件傳輸,上游只寫(xiě)入,無(wú)需關(guān)心數(shù)據(jù)業(yè)務(wù)和故障;

方案簡(jiǎn)單,避免了網(wǎng)絡(luò)協(xié)議相關(guān)的概念;

維護(hù)簡(jiǎn)單,只需保證磁盤(pán)寫(xiě)入穩(wěn)定性即可;

我們常常會(huì)面臨基于此架構(gòu)的數(shù)據(jù)對(duì)接。但基于此架構(gòu)數(shù)據(jù)處理工作都在下游(即數(shù)據(jù)使用方)。

面對(duì)大量數(shù)據(jù)對(duì)接和眾多的數(shù)據(jù)類型,我們對(duì)于每種數(shù)據(jù)文件解析、解碼、清洗消耗大量的人力,并且基于編碼的方式對(duì)于較多數(shù)據(jù)類型的場(chǎng)景代碼量大,且難以管理。因此經(jīng)過(guò)多次數(shù)據(jù)開(kāi)發(fā)實(shí)踐,我們開(kāi)發(fā)了一種基于 XML 描述的方式來(lái)解析和清洗數(shù)據(jù)文件的實(shí)現(xiàn)。

本架構(gòu)實(shí)現(xiàn)適合以下幾個(gè)方面:

基于文件的數(shù)據(jù)對(duì)接;

文件無(wú)法直接導(dǎo)入到目標(biāo)數(shù)據(jù)庫(kù),需要做轉(zhuǎn)換,清洗為目標(biāo)格式;

如上數(shù)據(jù)對(duì)接架構(gòu)圖,F(xiàn)lume 基本實(shí)現(xiàn)了基于文件系統(tǒng)的自動(dòng)掃描和讀取,因此架構(gòu)實(shí)現(xiàn)了基于 Flume Sink 的模塊。本架構(gòu)也可作為SDK 作為框架集成到現(xiàn)有數(shù)據(jù)處理方案中。

二、XML數(shù)據(jù)控制文件結(jié)構(gòu)和語(yǔ)法

<?xml version="1.0" encoding="UTF-8"?><schema><key>JD_TYPE_V1</key><type>textfile</type><delimiter>,</delimiter><fields><field type="int">exp_flag</field>    <field type="string">sender_id</field>        <field type="string">sender_num</field>  <field type="string" value="unknown">sender_address</field>  <field type="string">receiver_num</field>  <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field>  <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>    <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>  <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field>  </fields></schema>

(可左右滑動(dòng)查看全部代碼)

如上 XML 描述了一種數(shù)據(jù)文件類型及該類型的切分方法,數(shù)據(jù)每行經(jīng)過(guò)切分后,產(chǎn)生的多個(gè)數(shù)據(jù)列的轉(zhuǎn)換方法。

理論上,每種數(shù)據(jù)類型應(yīng)該對(duì)應(yīng)一個(gè)控制文件,意味著控制文件來(lái)描述該種數(shù)據(jù)類型如何解析和轉(zhuǎn)換。

Key 主要標(biāo)注該控制文件處理的類型ID;

Delimiter 為文件列切割字符;

Fields 中包含每列的字段描述;

數(shù)據(jù)類型支持Java基本類型和date類型;

Skip為數(shù)據(jù)對(duì)齊語(yǔ)法,控制在列中忽略某列的值;

Default = true 屬性為數(shù)據(jù)對(duì)齊語(yǔ)法,給某列提供默認(rèn)值,提供默認(rèn)值的列在數(shù)據(jù)列中不移動(dòng)位移;

Value 提供了給該字段提供當(dāng)列中無(wú)值時(shí)提供默認(rèn)值;value=null則指定列值為null;

Date 類型需 pattern 屬性;

三、函數(shù)和多層嵌套函數(shù)傳參

默認(rèn)值

詞法分析時(shí)字段field 的value 屬性值沒(méi)有以英文小括號(hào)閉合的實(shí)體。如下示例中的primeton:

<field type="string" default="true" value="primeton">data_vendor</field>

(可左右滑動(dòng)查看全部代碼)

函數(shù)

函數(shù)是由一組字符串、數(shù)字、下劃線組成的合法函數(shù)名和0 到多個(gè)形式參數(shù)組成。在詞法分析時(shí)字段field 的 value 屬性值由英文小括號(hào)閉合的實(shí)體。如下示例中的:

location(),yn(),concat();<field type="string" default="true" value=" unix_timestamp ">curr_time</field><field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field>    <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>

(可左右滑動(dòng)查看全部代碼)

函數(shù)名

函數(shù)體小括號(hào)前面的部分。一般由字符串、數(shù)字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即為該函數(shù)的函數(shù)名稱。

函數(shù)的形式參數(shù):

1.無(wú)參數(shù)

詞法分析時(shí)value的值滿足函數(shù)條件且函數(shù)體內(nèi)無(wú)參數(shù)。如下示例中:unix_timestamp() 獲得當(dāng)前系統(tǒng)內(nèi)的 Unix 時(shí)間戳;

<field type="string" default="true" value=" unix_timestamp()">curr_time</field>

(可左右滑動(dòng)查看全部代碼)

2.常量型形參

詞法分析時(shí)函數(shù)體內(nèi)以英文單引號(hào)引用的值為函數(shù)體的常量型形參。如’100’,函數(shù)示例為:random_int(‘100’),生成 0-100 以內(nèi)的隨機(jī)整形數(shù)值;

<field type="string" default="true" value="random_int(‘100’)">rand_num</field>

(可左右滑動(dòng)查看全部代碼)

3.變量型形參

詞法分析時(shí)函數(shù)體內(nèi)參數(shù)沒(méi)有英文單引號(hào)引用并且不以英文小括號(hào)閉合的為函數(shù)體的變量型形參。如下示例中的receiver_tel;

<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>

(可左右滑動(dòng)查看全部代碼)

4.函數(shù)型形參

詞法分析時(shí)函數(shù)體內(nèi)沒(méi)有英文單引號(hào)并且以英文小括號(hào)閉合的參數(shù)類型參數(shù)為函數(shù)體的函數(shù)型參數(shù)。如下示例中的:none(sender_num)和none(receiver_num);

<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>

(可左右滑動(dòng)查看全部代碼)

詞法分析獲得到函數(shù)體的同時(shí),使用函數(shù)名調(diào)用UdfRegistors.getUdf(udfName) 函數(shù),以檢驗(yàn)當(dāng)前系統(tǒng)必要存在該函數(shù),否則則拋出無(wú)法識(shí)別的函數(shù)異常。

5.類型校驗(yàn)

詞法分析階段獲得了字段 field 的取值是默認(rèn)值或者函數(shù),下一步需校驗(yàn)其默認(rèn)值或函數(shù)的返回值是否能和定義的字段類型相匹配。如果是函數(shù)同時(shí)校驗(yàn)函數(shù)的形參和實(shí)參類型是否相匹配。

<field type="string" default="true" value="primeton">data_vendor</field><field type="int"    default="true" value="2">call_flag</field>

(可左右滑動(dòng)查看全部代碼)

如上示例中的primeton 需能轉(zhuǎn)換為 string 類型,call_flag 需能轉(zhuǎn)換為 int 類型。如果類型不能轉(zhuǎn)換,則會(huì)拋出類型無(wú)法轉(zhuǎn)換異常。對(duì)于函數(shù),通過(guò) returnType 返回類型和字段類型進(jìn)行校驗(yàn),可匹配或者是該類型的子類型則類型驗(yàn)證通過(guò)。

四、UDF 函數(shù)編寫(xiě)方法

編寫(xiě)一個(gè)UDF函數(shù)的步驟:

繼承 UDF 類,實(shí)現(xiàn) eval 方法;

Eval 方法傳入的是一個(gè)數(shù)組參數(shù);

判斷參數(shù)長(zhǎng)度是否和預(yù)期的一致;

判斷位置參數(shù)類型是否和預(yù)期的一致;

實(shí)現(xiàn)函數(shù)體;

返回eval函數(shù)執(zhí)行的返回值,理論上該返回值的類型應(yīng)該一致,不應(yīng)該同一函數(shù)返回多種類型值;

函數(shù)編寫(xiě)者應(yīng)該保證函數(shù)體內(nèi)是線程安全的;

UDF 實(shí)現(xiàn)如下:

public abstract class UDF {   /**   * 是否支持該組參數(shù)類型,不支持拋出UnsupportedTypeException異常。默認(rèn)返回 true   */   public void support(Class<?>... paramsClass)throws UnsupportedTypeException;   /*** 該 UDF 返回值類型,用于校驗(yàn)嵌套函數(shù)類型是否匹配?煞祷睾(jiǎn)單類型,map,array,record 等類型.默認(rèn)返回 String 類型*/   public Class<?> returnType();/*** UDF 執(zhí)行函數(shù),當(dāng)輸入不符合預(yù)期時(shí),向外拋出異常* @param params 函數(shù)的輸入實(shí)參* @return 函數(shù)輸出結(jié)果,簡(jiǎn)單類型或者復(fù)雜類型,支持簡(jiǎn)單類型,map,array,record 類型*/public abstract Object eval(Object... params);}

(可左右滑動(dòng)查看全部代碼)

一個(gè)判斷是否包含子串的UDF 寫(xiě)法:

所有的UDF都通過(guò)一個(gè)核心注冊(cè)類(這點(diǎn)類似 Hive 的FunctionRegistry)

public final class UdfRegistors {   /**    * UDF 函數(shù)映射    */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>();    static {UDF_CACHED.put("copy", new CopyUDF());  // 復(fù)制一個(gè)變量的值      UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個(gè)變量是否相等      UDF_CACHED.put("yn", new YnUDF()); // 根據(jù)輸入true,false 轉(zhuǎn)換為 Y、NUDF_CACHED.put("null", new NullUDF()); // 判斷變量是否為null// add udf methodUDF_CACHED.put("location", new LocationUDF());     // 獲得手機(jī)號(hào)碼的歸屬地   UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據(jù)國(guó)家名稱獲取國(guó)家代碼    UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據(jù)省名稱獲取省代碼    UDF_CACHED.put("city_code", new CityCodeUDF());    // 根據(jù)城市名稱獲取城市代碼    UDF_CACHED.put("phone_num", new PhoneNumUDF());  // 校驗(yàn)是否是手機(jī)號(hào)或者固話UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗(yàn)是否可以轉(zhuǎn)化成數(shù)字}/*** 添加一個(gè)UDF函數(shù)     * @param key UDF 函數(shù)     * @param value UDF 函數(shù) eval 應(yīng)線程安全    * @return     */    public static boolean addUdf(String key, UDF value) {        return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) 。 null;    }    /**     * 獲得內(nèi)置的 udf 函數(shù)     */    public static UDF getUdf(String udfName) {        return UDF_CACHED.get(udfName.toLowerCase());    }}

(可左右滑動(dòng)查看全部代碼)

UDF 函數(shù)注冊(cè)時(shí)期:

可在編譯期綁定內(nèi)置的 UDF 函數(shù);

可在系統(tǒng)啟動(dòng)時(shí)配置自加載的 UDF 函數(shù);

可在運(yùn)行期動(dòng)態(tài)注入U(xiǎn)DF 函數(shù);

五、數(shù)據(jù)測(cè)試工具

數(shù)據(jù)對(duì)接過(guò)程,面對(duì)數(shù)據(jù)是否能轉(zhuǎn)換為目標(biāo)結(jié)果常常無(wú)從所知;赬ML 控制文件的數(shù)據(jù)解析,可實(shí)現(xiàn)一個(gè)測(cè)試工具。該工具通過(guò)上傳數(shù)據(jù)文件和上傳 XML 控制文件,可對(duì)數(shù)據(jù)文件隨機(jī)的讀取行進(jìn)行匹配測(cè)試,只要數(shù)據(jù)列和目標(biāo) XML文件能通過(guò)列匹配測(cè)試,則數(shù)據(jù)可通過(guò) ETL 解析清洗。否則繼續(xù)修改 XML 控制文件,直到順利通過(guò)匹配。

六、FlumeOnYarn 架構(gòu)和分布式部署

本架構(gòu)適合以文件作為數(shù)據(jù)對(duì)接的方案,另一方面,通過(guò)擴(kuò)展 Flume 即可實(shí)現(xiàn)拿來(lái)主義。Flume 內(nèi)部實(shí)現(xiàn)對(duì) Channel 的 Transaction,對(duì)于每個(gè)以文件構(gòu)造的 Event 對(duì)象是原子操作,要么全部成功,要么失敗。flume依賴事務(wù)來(lái)保證event的可靠性。Flume 默認(rèn)沒(méi)有分布式實(shí)現(xiàn),因此開(kāi)發(fā)了 FlumeOnYarn 的架構(gòu),用于支持 Flume 的分布式部署。

FlumeOnYarn優(yōu)勢(shì):

無(wú)需每個(gè)節(jié)點(diǎn)安裝 Flume,可一鍵啟動(dòng)和停止;

配置文件在客戶端節(jié)點(diǎn)修改,自動(dòng)復(fù)制到 Yarn 上各實(shí)例,無(wú)需每個(gè)節(jié)點(diǎn)修改;

基于 CDH或HDP的發(fā)行版,即使實(shí)現(xiàn)了 Web 可視化化的配置和分布式部署,但是對(duì)于 Flume 只能實(shí)現(xiàn)單配置文件實(shí)例,無(wú)法實(shí)現(xiàn)多配置實(shí)例;

集群的規(guī)?梢愿鶕(jù)數(shù)據(jù)量大小進(jìn)行實(shí)時(shí)的調(diào)整(增減節(jié)點(diǎn)),實(shí)現(xiàn)彈性處理。通過(guò)命令或者 api 即可控制(CDH 等需要在頁(yè)面添加 host,繁瑣且不易動(dòng)態(tài)調(diào)整);

多個(gè)租戶或者同一租戶多個(gè)處理實(shí)例互不影響,且能隔離(Yarn Container);

FlumeOnYarn 架構(gòu)

上圖所示,提交FlumeOnYarn 需要客戶端,該客戶端沒(méi)有太多和Flume安裝包結(jié)構(gòu)特殊的地方,只是在 lib 下添加了 flume-yarn 的架構(gòu)支持和 bin 下 flume-on-yarn 的啟動(dòng)腳本。

Flume OnYarn 客戶端程序

通過(guò) bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群。如下的命令即可一次性申請(qǐng)多個(gè) Yarn 資源節(jié)點(diǎn),實(shí)現(xiàn)一鍵部署:

bin/flume-on-yarn yarn -s --name agent_name –conf  conf/flume-h(huán)dfs.conf  --num-instances 5

(可左右滑動(dòng)查看全部代碼)

總結(jié)

推薦閱讀

元數(shù)據(jù)新型存儲(chǔ)架構(gòu)的探索

基于 Spark 的數(shù)據(jù)分析實(shí)踐

本地讀寫(xiě)的多活數(shù)據(jù)存儲(chǔ)架構(gòu)設(shè)計(jì)要義

關(guān)于作者:震秦,普元資深開(kāi)發(fā)工程師,專注于大數(shù)據(jù)開(kāi)發(fā) 8 年,擅長(zhǎng) Hadoop 生態(tài)內(nèi)各工具的使用和優(yōu)化。參與某公關(guān)廣告(上市)公司DMP 建設(shè),負(fù)責(zé)數(shù)據(jù)分層設(shè)計(jì)和批處理,調(diào)度實(shí)現(xiàn),完成交付使用;參與國(guó)內(nèi)多省市公安社交網(wǎng)絡(luò)項(xiàng)目部署,負(fù)責(zé)產(chǎn)品開(kāi)發(fā)(Spark 分析應(yīng)用);參與數(shù)據(jù)清洗加工為我方主題庫(kù)并部署上層應(yīng)用。

關(guān)于EAWorld:微服務(wù),DevOps,數(shù)據(jù)治理,移動(dòng)架構(gòu)原創(chuàng)技術(shù)分享。關(guān)注微信公眾號(hào)EAWorld!

聲明: 本文由入駐維科號(hào)的作者撰寫(xiě),觀點(diǎn)僅代表作者本人,不代表OFweek立場(chǎng)。如有侵權(quán)或其他問(wèn)題,請(qǐng)聯(lián)系舉報(bào)。

發(fā)表評(píng)論

0條評(píng)論,0人參與

請(qǐng)輸入評(píng)論內(nèi)容...

請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字

您提交的評(píng)論過(guò)于頻繁,請(qǐng)輸入驗(yàn)證碼繼續(xù)

  • 看不清,點(diǎn)擊換一張  刷新

暫無(wú)評(píng)論

暫無(wú)評(píng)論

人工智能 獵頭職位 更多
掃碼關(guān)注公眾號(hào)
OFweek人工智能網(wǎng)
獲取更多精彩內(nèi)容
文章糾錯(cuò)
x
*文字標(biāo)題:
*糾錯(cuò)內(nèi)容:
聯(lián)系郵箱:
*驗(yàn) 證 碼:

粵公網(wǎng)安備 44030502002758號(hào)