基于XML描述的可編程函數(shù)式ETL實(shí)現(xiàn)
轉(zhuǎn)載本文需注明出處:微信公眾號(hào)EAWorld,違者必究。
引言:
傳統(tǒng) ETL 主要以 SQL 為主要技術(shù)手段,把數(shù)據(jù)經(jīng)抽取、清洗轉(zhuǎn)換之后加載到數(shù)據(jù)倉(cāng)庫(kù)。但是在如今移動(dòng)互聯(lián)網(wǎng)大力發(fā)展的場(chǎng)景下,產(chǎn)生大量碎片化和不規(guī)則的數(shù)據(jù)。政府,公安等行業(yè),傳統(tǒng)數(shù)據(jù)庫(kù)已經(jīng)遠(yuǎn)遠(yuǎn)無(wú)法滿足需求。數(shù)據(jù)原始文件通過(guò)文件導(dǎo)入到基礎(chǔ)庫(kù),再通過(guò)大數(shù)據(jù) HQL等技術(shù)手段提取出二級(jí)庫(kù),這中間的數(shù)據(jù)導(dǎo)入和 SQL ETL 的提取的過(guò)程,大量消耗 IO 性能和計(jì)算資源,在很多場(chǎng)景下已經(jīng)是數(shù)據(jù)處理的瓶頸所在。
普元在實(shí)施公安項(xiàng)目過(guò)程中開(kāi)發(fā)了一種基于 XML 描述的可編程的函數(shù) ETL 轉(zhuǎn)換方法。主要用于大數(shù)據(jù)文件處理領(lǐng)域,能從原始數(shù)據(jù)文件直接、快速加載到專題庫(kù)的技術(shù)手段。技術(shù)方案主要解決了用 XML 的技術(shù)手段描述數(shù)據(jù)文件的格式,包含文件字段切分、字段類型、默認(rèn)值、異常值校驗(yàn)、時(shí)間格式校驗(yàn)。在處理時(shí)可添加自行開(kāi)發(fā)的 JAVA UDF 函數(shù),函數(shù)實(shí)參支持變量、常量、表達(dá)式、函數(shù)和運(yùn)算符重載。同時(shí)函數(shù)支持多層嵌套,即內(nèi)部函數(shù)的返回值最為外部函數(shù)的實(shí)參。該方案實(shí)現(xiàn)了 XML 內(nèi)函數(shù)體的語(yǔ)法解析并在運(yùn)行過(guò)程中直接編譯為 Java 字節(jié)碼的技術(shù)。有效的解決了政府、公安、電信行業(yè)巨量的數(shù)據(jù)處理需要的大量計(jì)算資源和 IO 性能瓶頸,有效的提高了數(shù)據(jù)處理效率和降低了數(shù)據(jù)處理開(kāi)發(fā)難度。
目錄:
一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹
二、XML 控制文件結(jié)構(gòu)和語(yǔ)法
三、函數(shù)和多層嵌套函數(shù)傳參
四、UDF 函數(shù)編寫(xiě)方法
五、數(shù)據(jù)測(cè)試工具
六、FlumeOnYarn 架構(gòu)和分布式部署
一、基于 XML 控制文件解析數(shù)據(jù)文件方案介紹
對(duì)于數(shù)據(jù)開(kāi)發(fā)項(xiàng)目,我們常常會(huì)面臨眾多的數(shù)據(jù)對(duì)接,部分場(chǎng)景不僅數(shù)據(jù)量大,且數(shù)據(jù)種類多,數(shù)據(jù)解析開(kāi)發(fā)工作量巨大。對(duì)于大量數(shù)據(jù)對(duì)接,一般設(shè)計(jì)的 RPC 接口和 WebService 一般都達(dá)不到數(shù)據(jù)性能要求的。并且他們都是點(diǎn)對(duì)點(diǎn)的服務(wù),一旦上下游系統(tǒng)故障,都會(huì)造成整個(gè)數(shù)據(jù)對(duì)接異常。因此大部分都會(huì)選擇使用文件的方式進(jìn)行數(shù)據(jù)對(duì)接。
對(duì)于非實(shí)時(shí)數(shù)據(jù)對(duì)接需求,這種方式的優(yōu)點(diǎn):
在數(shù)據(jù)量大的情況下,可以通過(guò)文件傳輸,上游只寫(xiě)入,無(wú)需關(guān)心數(shù)據(jù)業(yè)務(wù)和故障;
方案簡(jiǎn)單,避免了網(wǎng)絡(luò)協(xié)議相關(guān)的概念;
維護(hù)簡(jiǎn)單,只需保證磁盤(pán)寫(xiě)入穩(wěn)定性即可;
我們常常會(huì)面臨基于此架構(gòu)的數(shù)據(jù)對(duì)接。但基于此架構(gòu)數(shù)據(jù)處理工作都在下游(即數(shù)據(jù)使用方)。
面對(duì)大量數(shù)據(jù)對(duì)接和眾多的數(shù)據(jù)類型,我們對(duì)于每種數(shù)據(jù)文件解析、解碼、清洗消耗大量的人力,并且基于編碼的方式對(duì)于較多數(shù)據(jù)類型的場(chǎng)景代碼量大,且難以管理。因此經(jīng)過(guò)多次數(shù)據(jù)開(kāi)發(fā)實(shí)踐,我們開(kāi)發(fā)了一種基于 XML 描述的方式來(lái)解析和清洗數(shù)據(jù)文件的實(shí)現(xiàn)。
本架構(gòu)實(shí)現(xiàn)適合以下幾個(gè)方面:
基于文件的數(shù)據(jù)對(duì)接;
文件無(wú)法直接導(dǎo)入到目標(biāo)數(shù)據(jù)庫(kù),需要做轉(zhuǎn)換,清洗為目標(biāo)格式;
如上數(shù)據(jù)對(duì)接架構(gòu)圖,F(xiàn)lume 基本實(shí)現(xiàn)了基于文件系統(tǒng)的自動(dòng)掃描和讀取,因此架構(gòu)實(shí)現(xiàn)了基于 Flume Sink 的模塊。本架構(gòu)也可作為SDK 作為框架集成到現(xiàn)有數(shù)據(jù)處理方案中。
二、XML數(shù)據(jù)控制文件結(jié)構(gòu)和語(yǔ)法
<?xml version="1.0" encoding="UTF-8"?><schema><key>JD_TYPE_V1</key><type>textfile</type><delimiter>,</delimiter><fields><field type="int">exp_flag</field> <field type="string">sender_id</field> <field type="string">sender_num</field> <field type="string" value="unknown">sender_address</field> <field type="string">receiver_num</field> <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field> <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field> <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field> <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field> <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field> </fields></schema>
(可左右滑動(dòng)查看全部代碼)
如上 XML 描述了一種數(shù)據(jù)文件類型及該類型的切分方法,數(shù)據(jù)每行經(jīng)過(guò)切分后,產(chǎn)生的多個(gè)數(shù)據(jù)列的轉(zhuǎn)換方法。
理論上,每種數(shù)據(jù)類型應(yīng)該對(duì)應(yīng)一個(gè)控制文件,意味著控制文件來(lái)描述該種數(shù)據(jù)類型如何解析和轉(zhuǎn)換。
Key 主要標(biāo)注該控制文件處理的類型ID;
Delimiter 為文件列切割字符;
Fields 中包含每列的字段描述;
數(shù)據(jù)類型支持Java基本類型和date類型;
Skip為數(shù)據(jù)對(duì)齊語(yǔ)法,控制在列中忽略某列的值;
Default = true 屬性為數(shù)據(jù)對(duì)齊語(yǔ)法,給某列提供默認(rèn)值,提供默認(rèn)值的列在數(shù)據(jù)列中不移動(dòng)位移;
Value 提供了給該字段提供當(dāng)列中無(wú)值時(shí)提供默認(rèn)值;value=null則指定列值為null;
Date 類型需 pattern 屬性;
三、函數(shù)和多層嵌套函數(shù)傳參
默認(rèn)值
詞法分析時(shí)字段field 的value 屬性值沒(méi)有以英文小括號(hào)閉合的實(shí)體。如下示例中的primeton:
<field type="string" default="true" value="primeton">data_vendor</field>
(可左右滑動(dòng)查看全部代碼)
函數(shù)
函數(shù)是由一組字符串、數(shù)字、下劃線組成的合法函數(shù)名和0 到多個(gè)形式參數(shù)組成。在詞法分析時(shí)字段field 的 value 屬性值由英文小括號(hào)閉合的實(shí)體。如下示例中的:
location(),yn(),concat();<field type="string" default="true" value=" unix_timestamp ">curr_time</field><field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field> <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>
(可左右滑動(dòng)查看全部代碼)
函數(shù)名
函數(shù)體小括號(hào)前面的部分。一般由字符串、數(shù)字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即為該函數(shù)的函數(shù)名稱。
函數(shù)的形式參數(shù):
1.無(wú)參數(shù)
詞法分析時(shí)value的值滿足函數(shù)條件且函數(shù)體內(nèi)無(wú)參數(shù)。如下示例中:unix_timestamp() 獲得當(dāng)前系統(tǒng)內(nèi)的 Unix 時(shí)間戳;
<field type="string" default="true" value=" unix_timestamp()">curr_time</field>
(可左右滑動(dòng)查看全部代碼)
2.常量型形參
詞法分析時(shí)函數(shù)體內(nèi)以英文單引號(hào)引用的值為函數(shù)體的常量型形參。如’100’,函數(shù)示例為:random_int(‘100’),生成 0-100 以內(nèi)的隨機(jī)整形數(shù)值;
<field type="string" default="true" value="random_int(‘100’)">rand_num</field>
(可左右滑動(dòng)查看全部代碼)
3.變量型形參
詞法分析時(shí)函數(shù)體內(nèi)參數(shù)沒(méi)有英文單引號(hào)引用并且不以英文小括號(hào)閉合的為函數(shù)體的變量型形參。如下示例中的receiver_tel;
<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>
(可左右滑動(dòng)查看全部代碼)
4.函數(shù)型形參
詞法分析時(shí)函數(shù)體內(nèi)沒(méi)有英文單引號(hào)并且以英文小括號(hào)閉合的參數(shù)類型參數(shù)為函數(shù)體的函數(shù)型參數(shù)。如下示例中的:none(sender_num)和none(receiver_num);
<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>
(可左右滑動(dòng)查看全部代碼)
詞法分析獲得到函數(shù)體的同時(shí),使用函數(shù)名調(diào)用UdfRegistors.getUdf(udfName) 函數(shù),以檢驗(yàn)當(dāng)前系統(tǒng)必要存在該函數(shù),否則則拋出無(wú)法識(shí)別的函數(shù)異常。
5.類型校驗(yàn)
詞法分析階段獲得了字段 field 的取值是默認(rèn)值或者函數(shù),下一步需校驗(yàn)其默認(rèn)值或函數(shù)的返回值是否能和定義的字段類型相匹配。如果是函數(shù)同時(shí)校驗(yàn)函數(shù)的形參和實(shí)參類型是否相匹配。
<field type="string" default="true" value="primeton">data_vendor</field><field type="int" default="true" value="2">call_flag</field>
(可左右滑動(dòng)查看全部代碼)
如上示例中的primeton 需能轉(zhuǎn)換為 string 類型,call_flag 需能轉(zhuǎn)換為 int 類型。如果類型不能轉(zhuǎn)換,則會(huì)拋出類型無(wú)法轉(zhuǎn)換異常。對(duì)于函數(shù),通過(guò) returnType 返回類型和字段類型進(jìn)行校驗(yàn),可匹配或者是該類型的子類型則類型驗(yàn)證通過(guò)。
四、UDF 函數(shù)編寫(xiě)方法
編寫(xiě)一個(gè)UDF函數(shù)的步驟:
繼承 UDF 類,實(shí)現(xiàn) eval 方法;
Eval 方法傳入的是一個(gè)數(shù)組參數(shù);
判斷參數(shù)長(zhǎng)度是否和預(yù)期的一致;
判斷位置參數(shù)類型是否和預(yù)期的一致;
實(shí)現(xiàn)函數(shù)體;
返回eval函數(shù)執(zhí)行的返回值,理論上該返回值的類型應(yīng)該一致,不應(yīng)該同一函數(shù)返回多種類型值;
函數(shù)編寫(xiě)者應(yīng)該保證函數(shù)體內(nèi)是線程安全的;
UDF 實(shí)現(xiàn)如下:
public abstract class UDF { /** * 是否支持該組參數(shù)類型,不支持拋出UnsupportedTypeException異常。默認(rèn)返回 true */ public void support(Class<?>... paramsClass)throws UnsupportedTypeException; /*** 該 UDF 返回值類型,用于校驗(yàn)嵌套函數(shù)類型是否匹配?煞祷睾(jiǎn)單類型,map,array,record 等類型.默認(rèn)返回 String 類型*/ public Class<?> returnType();/*** UDF 執(zhí)行函數(shù),當(dāng)輸入不符合預(yù)期時(shí),向外拋出異常* @param params 函數(shù)的輸入實(shí)參* @return 函數(shù)輸出結(jié)果,簡(jiǎn)單類型或者復(fù)雜類型,支持簡(jiǎn)單類型,map,array,record 類型*/public abstract Object eval(Object... params);}
(可左右滑動(dòng)查看全部代碼)
一個(gè)判斷是否包含子串的UDF 寫(xiě)法:
所有的UDF都通過(guò)一個(gè)核心注冊(cè)類(這點(diǎn)類似 Hive 的FunctionRegistry)
public final class UdfRegistors { /** * UDF 函數(shù)映射 */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>(); static {UDF_CACHED.put("copy", new CopyUDF()); // 復(fù)制一個(gè)變量的值 UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個(gè)變量是否相等 UDF_CACHED.put("yn", new YnUDF()); // 根據(jù)輸入true,false 轉(zhuǎn)換為 Y、NUDF_CACHED.put("null", new NullUDF()); // 判斷變量是否為null// add udf methodUDF_CACHED.put("location", new LocationUDF()); // 獲得手機(jī)號(hào)碼的歸屬地 UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據(jù)國(guó)家名稱獲取國(guó)家代碼 UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據(jù)省名稱獲取省代碼 UDF_CACHED.put("city_code", new CityCodeUDF()); // 根據(jù)城市名稱獲取城市代碼 UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校驗(yàn)是否是手機(jī)號(hào)或者固話UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗(yàn)是否可以轉(zhuǎn)化成數(shù)字}/*** 添加一個(gè)UDF函數(shù) * @param key UDF 函數(shù) * @param value UDF 函數(shù) eval 應(yīng)線程安全 * @return */ public static boolean addUdf(String key, UDF value) { return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) 。 null; } /** * 獲得內(nèi)置的 udf 函數(shù) */ public static UDF getUdf(String udfName) { return UDF_CACHED.get(udfName.toLowerCase()); }}
(可左右滑動(dòng)查看全部代碼)
UDF 函數(shù)注冊(cè)時(shí)期:
可在編譯期綁定內(nèi)置的 UDF 函數(shù);
可在系統(tǒng)啟動(dòng)時(shí)配置自加載的 UDF 函數(shù);
可在運(yùn)行期動(dòng)態(tài)注入U(xiǎn)DF 函數(shù);
五、數(shù)據(jù)測(cè)試工具
數(shù)據(jù)對(duì)接過(guò)程,面對(duì)數(shù)據(jù)是否能轉(zhuǎn)換為目標(biāo)結(jié)果常常無(wú)從所知;赬ML 控制文件的數(shù)據(jù)解析,可實(shí)現(xiàn)一個(gè)測(cè)試工具。該工具通過(guò)上傳數(shù)據(jù)文件和上傳 XML 控制文件,可對(duì)數(shù)據(jù)文件隨機(jī)的讀取行進(jìn)行匹配測(cè)試,只要數(shù)據(jù)列和目標(biāo) XML文件能通過(guò)列匹配測(cè)試,則數(shù)據(jù)可通過(guò) ETL 解析清洗。否則繼續(xù)修改 XML 控制文件,直到順利通過(guò)匹配。
六、FlumeOnYarn 架構(gòu)和分布式部署
本架構(gòu)適合以文件作為數(shù)據(jù)對(duì)接的方案,另一方面,通過(guò)擴(kuò)展 Flume 即可實(shí)現(xiàn)拿來(lái)主義。Flume 內(nèi)部實(shí)現(xiàn)對(duì) Channel 的 Transaction,對(duì)于每個(gè)以文件構(gòu)造的 Event 對(duì)象是原子操作,要么全部成功,要么失敗。flume依賴事務(wù)來(lái)保證event的可靠性。Flume 默認(rèn)沒(méi)有分布式實(shí)現(xiàn),因此開(kāi)發(fā)了 FlumeOnYarn 的架構(gòu),用于支持 Flume 的分布式部署。
FlumeOnYarn優(yōu)勢(shì):
無(wú)需每個(gè)節(jié)點(diǎn)安裝 Flume,可一鍵啟動(dòng)和停止;
配置文件在客戶端節(jié)點(diǎn)修改,自動(dòng)復(fù)制到 Yarn 上各實(shí)例,無(wú)需每個(gè)節(jié)點(diǎn)修改;
基于 CDH或HDP的發(fā)行版,即使實(shí)現(xiàn)了 Web 可視化化的配置和分布式部署,但是對(duì)于 Flume 只能實(shí)現(xiàn)單配置文件實(shí)例,無(wú)法實(shí)現(xiàn)多配置實(shí)例;
集群的規(guī)?梢愿鶕(jù)數(shù)據(jù)量大小進(jìn)行實(shí)時(shí)的調(diào)整(增減節(jié)點(diǎn)),實(shí)現(xiàn)彈性處理。通過(guò)命令或者 api 即可控制(CDH 等需要在頁(yè)面添加 host,繁瑣且不易動(dòng)態(tài)調(diào)整);
多個(gè)租戶或者同一租戶多個(gè)處理實(shí)例互不影響,且能隔離(Yarn Container);
FlumeOnYarn 架構(gòu)
上圖所示,提交FlumeOnYarn 需要客戶端,該客戶端沒(méi)有太多和Flume安裝包結(jié)構(gòu)特殊的地方,只是在 lib 下添加了 flume-yarn 的架構(gòu)支持和 bin 下 flume-on-yarn 的啟動(dòng)腳本。
Flume OnYarn 客戶端程序
通過(guò) bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群。如下的命令即可一次性申請(qǐng)多個(gè) Yarn 資源節(jié)點(diǎn),實(shí)現(xiàn)一鍵部署:
bin/flume-on-yarn yarn -s --name agent_name –conf conf/flume-h(huán)dfs.conf --num-instances 5
(可左右滑動(dòng)查看全部代碼)
總結(jié)
推薦閱讀
元數(shù)據(jù)新型存儲(chǔ)架構(gòu)的探索
基于 Spark 的數(shù)據(jù)分析實(shí)踐
本地讀寫(xiě)的多活數(shù)據(jù)存儲(chǔ)架構(gòu)設(shè)計(jì)要義
關(guān)于作者:震秦,普元資深開(kāi)發(fā)工程師,專注于大數(shù)據(jù)開(kāi)發(fā) 8 年,擅長(zhǎng) Hadoop 生態(tài)內(nèi)各工具的使用和優(yōu)化。參與某公關(guān)廣告(上市)公司DMP 建設(shè),負(fù)責(zé)數(shù)據(jù)分層設(shè)計(jì)和批處理,調(diào)度實(shí)現(xiàn),完成交付使用;參與國(guó)內(nèi)多省市公安社交網(wǎng)絡(luò)項(xiàng)目部署,負(fù)責(zé)產(chǎn)品開(kāi)發(fā)(Spark 分析應(yīng)用);參與數(shù)據(jù)清洗加工為我方主題庫(kù)并部署上層應(yīng)用。
關(guān)于EAWorld:微服務(wù),DevOps,數(shù)據(jù)治理,移動(dòng)架構(gòu)原創(chuàng)技術(shù)分享。關(guān)注微信公眾號(hào)EAWorld!
發(fā)表評(píng)論
請(qǐng)輸入評(píng)論內(nèi)容...
請(qǐng)輸入評(píng)論/評(píng)論長(zhǎng)度6~500個(gè)字
最新活動(dòng)更多
-
11月20日火熱報(bào)名中>> 2024 智能家居出海論壇
-
11月28日立即報(bào)名>>> 2024工程師系列—工業(yè)電子技術(shù)在線會(huì)議
-
12月19日立即報(bào)名>> 【線下會(huì)議】OFweek 2024(第九屆)物聯(lián)網(wǎng)產(chǎn)業(yè)大會(huì)
-
即日-12.26火熱報(bào)名中>> OFweek2024中國(guó)智造CIO在線峰會(huì)
-
即日-2025.8.1立即下載>> 《2024智能制造產(chǎn)業(yè)高端化、智能化、綠色化發(fā)展藍(lán)皮書(shū)》
-
精彩回顧立即查看>> 【在線會(huì)議】多物理場(chǎng)仿真助跑新能源汽車(chē)
推薦專題
- 1 腦機(jī)接口芯片,華為出了新專利!
- 2 今年諾獎(jiǎng)對(duì)人工智能的重視,給我們的基礎(chǔ)教育提了個(gè)醒
- 3 銀行業(yè)AI大模型,從入局到求變
- 4 巨頭搶布局,VC狂撒錢(qián),為了能讓「AI讀心」這些公司卷瘋了
- 5 阿斯麥ASML:“骨折級(jí)”洋相,又成AI第一殺手?
- 6 蘋(píng)果市值創(chuàng)新高,iPhone 16能否助力突破4萬(wàn)億美元大關(guān)?
- 7 洞見(jiàn)AI風(fēng)潮 第二屆vivo藍(lán)河操作系統(tǒng)創(chuàng)新賽開(kāi)啟招募
- 8 地平線開(kāi)啟配售,阿里百度各砸5000萬(wàn)美金,市值最高超500億
- 9 小馬智行沖刺納斯達(dá)克:或成「全球Robotaxi第一股」,兩年半營(yíng)收約12億元
- 10 云從科技:營(yíng)收低迷與虧損加劇,2025年盈利目標(biāo)挑戰(zhàn)重重
- 高級(jí)軟件工程師 廣東省/深圳市
- 自動(dòng)化高級(jí)工程師 廣東省/深圳市
- 光器件研發(fā)工程師 福建省/福州市
- 銷售總監(jiān)(光器件) 北京市/海淀區(qū)
- 激光器高級(jí)銷售經(jīng)理 上海市/虹口區(qū)
- 光器件物理工程師 北京市/海淀區(qū)
- 激光研發(fā)工程師 北京市/昌平區(qū)
- 技術(shù)專家 廣東省/江門(mén)市
- 封裝工程師 北京市/海淀區(qū)
- 結(jié)構(gòu)工程師 廣東省/深圳市