【原創】Talend ETL開發——基於joblet的統一的email發送


更新

2019/1/16:增加了允許5個附件發送、smtp等配置的字段。


一、背景

在ETL數據整合過程中,肯定會涉及到email的通知,比如ETL的執行情況匯報,執行耗時匯報,關鍵數據更新情況等信息匯報等,這些信息都是需要及時給到相應的operation人員或者使用BI數據的人員。

但是,如果一開始沒有規划好郵件推送的一些基本信息,有可能會導致后期郵件發送混亂和不好管理等問題,例如:每個人都有自己的etl,每個人都會去開發自己的郵件通知,那隨着時間推移,后期哪些郵件要取消、哪些通知人要屏蔽等都是個難事,可能需要打開所有的ETL job去檢查,去修改,耗時耗力,非常不利於管理。

二、實現

在實現該方案的時候,我主要考慮了以下幾個方面:

1、每個人需要調用發送郵件的時候,盡量不要重復再做一次拖拉整套組件了,拖拉一次公共組件就好了,所以我選擇用joblet來實現這個。

2、郵件的一些基礎公共信息必須在一個地方維護,比如發送、接收郵件列表,發送記錄等信息,所以我設計了數據庫表來存放這些信息,這樣只要更新數據庫信息,就可以使得全局都使用統一的信息。

3、信息的發送、狀態、生成的方式都可以靈活控制,所以我設計了一個表來存儲這些信息,而且通過存儲過程生成具體的email信息,這樣可以追蹤發送記錄等信息。

4、因為talend joblet支持變量,所以我盡量將發送郵件組件中的一些變量都設計到數據庫表中,這樣方便於維護和修改。

2.1、數據庫表設計

數據庫設計主要有2張表:mail_send_group、mail_send_list_rec

mail_send_group:該表是用於記錄發送者和接收者之間的信息,維護在這里可以讓后去維護更簡單,修改數據庫則全局啟用。

IF (OBJECT_ID(N'[chk].[mail_send_group]', N'U') IS NOT NULL)

BEGIN

PRINT N'刪除表:[chk].[mail_send_group]';

DROP TABLE [chk].[mail_send_group];

END

GO

CREATE TABLE [chk].[mail_send_group]

(

[group_id] NVARCHAR(50) NOT NULL,--主鍵

[mail_to] NVARCHAR(1000) NOT NULL,--接收者郵箱列表,多個郵箱用;分割

[mail_from] NVARCHAR(100) NOT NULL,--發送者郵箱

[sender_name] NVARCHAR(100) NOT NULL,--發送者昵稱

[mail_cc] NVARCHAR(1000) NULL,--抄送郵箱列表,多個郵箱用;分割

[mail_bcc] NVARCHAR(100) NULL,--密送郵箱列表,多個郵箱用;分割

[smtp_host] NVARCHAR(100) NOT NULL,--smtp host地址

[smtp_port] INT NOT NULL,--smtp host 端口號

[user_name] NVARCHAR(50) NOT NULL,--郵箱用戶名

[user_pwd] NVARCHAR(50) NOT NULL,--郵箱用戶密碼

[create_date] DATETIME NOT NULL,--創建日期

[status] SMALLINT NULL--狀態(0禁用,1啟用)

)

GO

SELECT * FROM [chk].[mail_send_group]

1

mail_send_list_rec:該表是用於記錄email生成的記錄和發送記錄的,每條信息通過group_id和上表關聯,就可以知道每條信息是由誰發給誰的,什么時候發送的。

IF (OBJECT_ID(N'[chk].[mail_send_list_rec]', N'U') IS NOT NULL)

BEGIN

PRINT N'刪除表:[chk].[mail_send_list_rec]';

DROP TABLE [chk].[mail_send_list_rec];

END

GO

CREATE TABLE [chk].[mail_send_list_rec]

(

[mail_id] NVARCHAR(50) NOT NULL,--主鍵

[group_id] NVARCHAR(50) NOT NULL,--所屬的group id,用於確認發送接收等信息

[scope] NVARCHAR(100) NOT NULL,--業務,用於區分不同業務生成的郵件,相當於一個分類

[subject] NVARCHAR(100) NOT NULL,--主題

[message] NVARCHAR(4000) NOT NULL,--正文,支持HTML代碼,建議是用HTML

[create_date] DATETIME NOT NULL,--創建日期

[send_date] DATETIME NULL,--發送日期

[send_status] SMALLINT NULL,--發送狀態(0創建完未發送,1已經發送)

[atta01_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名/data/mailatts/ids/checkret.csv

[atta02_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名

[atta03_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名

[atta04_path] NVARCHAR(200) NULL,--第1個發送附件路徑絕對路徑,包含文件名

[atta05_path] NVARCHAR(200) NULL--第1個發送附件路徑絕對路徑,包含文件名

)

GO

2


2.2、Joblet開發

3

1、 Joblet采用了input方式,輸入的參數是mail_id,即郵件的ID,這個是外部調用這個joblet的時候需要傳遞進來的一個參數。

4

2、 tFlowToIterate是用於將mail_id生成一個全局變量,用於傳遞給3的mssqlinput。

5

3、 該組件是用於根據mail_id去查詢數據庫表中的詳細email信息,為后面的發送信息提供明細。

"SELECT

[a].[mail_id]

,[a].[subject]

,[a].[message]

,[b].[mail_from]

,[b].[mail_to]

,[b].[sender_name]

,[b].[mail_cc]

,[b].[mail_bcc]

,[b].[status]

,[a].[atta01_path]

,[a].[atta02_path]

,[a].[atta03_path]

,[a].[atta04_path]

,[a].[atta05_path]

,[b].[smtp_host]

,[b].[smtp_port]

,[b].[user_name]

,[b].[user_pwd]

FROM [chk].[mail_send_list_rec] AS a

INNER JOIN [chk].[mail_send_group] AS b

ON ([a].[group_id] = [b].[group_id])

WHERE [a].[mail_id] = '" + ((String)globalMap.get("curr_mail_id")) + "'

AND ISNULL([b].[status], 0) = 1"

4、 發送郵件組件主要就是根據數據庫中查詢的數據,通過變量方式傳遞過來后,執行發送郵件的操作。

4

5、 更新數據庫中相應的mail_id的記錄為已發送和發送時間等信息。先用tfixedflowinput生成相應的存儲過程參數,然后MSSQL_SP調用存儲過程更新。

7

8


2.3、存儲過程開發生成&更新email內容

生成email:主要功能就是按照你想要發送的內容生成一個message,並插入到數據庫表中即可。

IF (OBJECT_ID(N'[chk].[usp_insert_ids_mail_send_list_rec]', N'P') IS NOT NULL)

BEGIN

PRINT N'刪除存儲過程:[chk].[usp_insert_ids_mail_send_list_rec]';

DROP PROC [chk].[usp_insert_ids_mail_send_list_rec];

END

GO

CREATE PROC [chk].[usp_insert_ids_mail_send_list_rec]

(

@curr_date NVARCHAR(20),

@atta01_path NVARCHAR(200),

@atta02_path NVARCHAR(200),

@atta03_path NVARCHAR(200),

@atta04_path NVARCHAR(200),

@atta05_path NVARCHAR(200)

)

AS

--====================================================================================================================================

-- ProcedureName : chk.usp_insert_ids_mail_send_list_rec

-- Author : john.xiong

-- CreateDate : 2019-01-02

-- Description : 生成daily的detail mail content

/*************************************Parameters參數說明*******************************************************************************

-- @curr_date : 數據實行日期YYYYMMDD

**************************************Modfied List修改記錄*****************************************************************************

-- Modified Date Modified User Version Modified Reason

**************************************************************************************************************************************

-- 2019-01-02 john.xiong V01.00.00 初始化版本

**************************************************************************************************************************************/

--====================================================================================================================================

BEGIN

BEGIN TRY

DECLARE

@begin_time DATETIME

,@end_time DATETIME

,@cost_time INT;

SET @begin_time = DATEADD(HOUR, 8, GETDATE());

INSERT INTO [chk].[tb_proc_cost_log]

(

[proc_name]

,[Object_name]

,[execute_time]

,[action]

,[remark]

,[cost_time]

)

SELECT

N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name]

,N'chk.mail_send_list_rec' AS [Object_name]

,@begin_time AS [execute_time]

,N'start' AS [action]

,'' AS [remark]

,0 AS [cost_time]

DECLARE

@mail_id UNIQUEIDENTIFIER,

@scope NVARCHAR(100),

@group_id UNIQUEIDENTIFIER,

@subject NVARCHAR(100),

@create_date DATETIME,

@message NVARCHAR(4000),

@temp_message NVARCHAR(4000),

@count INT,

@count1 INT,

@count2 INT,

@error_count INT

SET @mail_id = NEWID();

SET @scope = N'IDS';

SET @group_id = N'8D42D25D-59C7-4A5E-AE9C-4A5F24D910B0'

SET @subject = N'IDS daily - job運行情況';

SET @create_date = DATEADD(HOUR, 8, GETDATE());

SET @count1 = 0;

SET @count2 = 0;

SET @error_count = 0;

SET @message = '<span style="color:#000; line-height:30px"><ol>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT(*)

FROM [chk].[log_move_blob_rec] AS a

WHERE LEFT([a].[rec_load_time], 8) = @curr_date

AND ([a].[scope] IN ('ids_regular_data', 'ids_regular_rtm') OR [a].[blobFileName] LIKE '%LCH%')

SET @message = @message + N'<li>從landing搬移blob文件總數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0));

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT(*)

FROM [chk].[log_move_blob_rec] AS a

WHERE LEFT([a].[rec_load_time], 8) = @curr_date

AND [a].[scope] = 'ids_regular_data'

SET @message = @message + N'<br>經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0));

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT(*)

FROM [chk].[log_move_blob_rec] AS a

WHERE LEFT([a].[rec_load_time], 8) = @curr_date

AND [a].[scope] IN ('ids_regular_rtm')

SET @message = @message + N'<br>restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0));

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT(*)

FROM [chk].[log_move_blob_rec] AS a

WHERE LEFT([a].[rec_load_time], 8) = @curr_date

AND [a].[blobFileName] LIKE '%LCH%'

SET @message = @message + N'<br>local customer hierarchy daily文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = SUM([a].[file_count])

FROM [chk].[log_blob_file_deal] AS a

WHERE LOWER([a].[data_scope]) = 'ids'

AND LOWER([a].[deal_level]) = 'ext'

AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_Ongoing_Loop_New_1_3')

AND [a].[remark] LIKE '%tFileList Count%'

AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date

SET @message = @message + N'<li>實際處理經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = SUM([a].[file_count])

FROM [chk].[log_blob_file_deal] AS a

WHERE LOWER([a].[data_scope]) = 'ids'

AND LOWER([a].[deal_level]) = 'ext'

AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_SalesDaily_Rtm_New_1_4')

AND [a].[remark] LIKE '%tFileList Count%'

AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date

SET @message = @message + N'<li>實際處理restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT([a].[file_name])

FROM [chk].[log_file_deal_error_rec] AS a

WHERE LOWER([a].[data_scope]) = 'ids'

AND LOWER([a].[deal_level]) = 'ext'

AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_Ongoing_Loop_New_1_3')

AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date

SET @message = @message + N'<li>無法解壓的經銷商regular data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = COUNT([a].[file_name])

FROM [chk].[log_file_deal_error_rec] AS a

WHERE LOWER([a].[data_scope]) = 'ids'

AND LOWER([a].[deal_level]) = 'ext'

AND LOWER([a].[job_name]) = LOWER('IDS_Data_Blob_To_Stg_SalesDaily_Rtm_New_1_4')

AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date

SET @message = @message + N'<li>無法解壓的restatement data文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0)) + '</li>';

SET @temp_message = '';

SET @count = 0;

SELECT

@count = SUM([a].[file_count])

FROM [chk].[log_blob_file_deal] AS a

WHERE LOWER([a].[data_scope]) = 'ids'

AND LOWER([a].[deal_level]) = 'ext'

AND LOWER([a].[job_name]) = LOWER('IDS_RCS_Local_Master_Data_Daily_1_2')

AND [a].[remark] LIKE '%tFileList Count lch%'

AND CONVERT(NVARCHAR(8), [a].[deal_date], 112) = @curr_date

SET @message = @message + N'<li>處理local customer hierarchy daily文件數:' + CONVERT(NVARCHAR(20), ISNULL(@count, 0));

SET @temp_message = '';

SET @count = 0;

SET @count1 = 0;

SELECT TOP (1)

@count1 = [a].[row_count]

FROM [chk].[log_table_data_rec] AS a

WHERE [a].[data_scope] = 'rcs dim'

AND [a].[table_name] = 'stg.cust_ids_rcs_local_customer_hierarchy_daily'

AND CONVERT(NVARCHAR(8), [a].[action_time], 112) = @curr_date

ORDER BY [a].[action_time] DESC

SET @message = @message + N'<br>文件數據行數:' + CONVERT(NVARCHAR(20), ISNULL(@count1, 0));

SET @temp_message = '';

SET @count = 0;

SET @count2 = 0;

SELECT

@count2 = COUNT(*)

FROM [stg].[cust_ids_rcs_local_customer_hierarchy_daily] AS a

WHERE LEFT([a].[rec_load_time], 8) = @curr_date

SET @message = @message + N'<br>入庫數據行數:' + CONVERT(NVARCHAR(20), ISNULL(@count2, 0)) + '</li>';

IF (@count1 <> @count2)

BEGIN

SET @error_count = @error_count + 1;

END

IF (OBJECT_ID(N'[chk].[temp_mail_send_proc_error_list_ids_daily]', N'U') IS NOT NULL)

BEGIN

DROP TABLE [chk].[temp_mail_send_proc_error_list_ids_daily];

END

/*生成錯誤proc的記錄*/

CREATE TABLE [chk].[temp_mail_send_proc_error_list_ids_daily]

WITH

(

DISTRIBUTION = ROUND_ROBIN,

CLUSTERED COLUMNSTORE INDEX

)

AS

SELECT

[a].[proc_name]

,ROW_NUMBER() OVER(ORDER BY [a].[error_time] ASC) AS [Num]

FROM [chk].[log_proc_error_rec] AS a

WHERE [a].[proc_name] LIKE '%ids%'

AND [a].[proc_name] NOT LIKE '%mail%'

AND CONVERT(NVARCHAR(8), [a].[error_time], 112) = @curr_date

SET @count = 0;

SELECT @count = COUNT(*) FROM [chk].[temp_mail_send_proc_error_list_ids_daily];

IF (@count > 0)

BEGIN

SET @message = @message + N'<li style="color:red">有錯誤的PROC:' + CONVERT(NVARCHAR(20), @count);

SET @error_count = @error_count + @count;

END

WHILE (@count > 0)

BEGIN

SELECT @temp_message = [proc_name] FROM [chk].[temp_mail_send_proc_error_list_ids_daily] WHERE [Num] = @count;

SET @message = @message + N'<br />' + @temp_message + ';&nbsp;';

SET @count = @count - 1;

END

SET @message = @message + '</li>';

IF (@error_count <> 0)

BEGIN

SET @subject = @subject + ':有 ' + CONVERT(NVARCHAR(20), @error_count) + ' 個錯誤';

END

SET @subject = @curr_date + N' ' + @subject;

SET @message = @message + '</ol></span>'

PRINT @message

INSERT INTO [chk].[mail_send_list_rec]

(

[mail_id]

,[group_id]

,[scope]

,[subject]

,[message]

,[create_date]

,[send_date]

,[send_status]

,[atta01_path]

,[atta02_path]

,[atta03_path]

,[atta04_path]

,[atta05_path]

)

SELECT

@mail_id,

@group_id,

@scope,

@subject,

@message,

@create_date,

NULL,

0,

@atta01_path,

@atta02_path,

@atta03_path,

@atta04_path,

@atta05_path

SET @end_time = DATEADD(HOUR, 8, GETDATE());

SET @cost_time = DATEDIFF(SECOND, @begin_time, @end_time);

INSERT INTO [chk].[tb_proc_cost_log]

(

[proc_name]

,[Object_name]

,[execute_time]

,[action]

,[remark]

,[cost_time]

)

SELECT

N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name]

,N'chk.mail_send_list_rec' AS [Object_name]

,@end_time AS [execute_time]

,N'end' AS [action]

,CONVERT(NVARCHAR(50), @mail_id) AS [remark]

,@cost_time AS [cost_time]

PRINT N'Exec success';

SELECT @mail_id AS [curr_mail_id]

END TRY

BEGIN CATCH

INSERT INTO [chk].[log_proc_error_rec]

(

[proc_name]

,[error_source]

,[error_time]

,[error_severity]

,[error_state]

,[error_msg]

,[log_user]

)

SELECT

N'chk.usp_insert_ids_mail_send_list_rec' AS [proc_name]

,ERROR_PROCEDURE() AS [error_source]

,DATEADD(HOUR, 8, GETDATE()) AS [error_time]

,ERROR_SEVERITY() AS [error_severity]

,ERROR_STATE() AS [error_state]

,ERROR_MESSAGE() AS [error_msg]

,SUSER_SNAME() AS [log_user]

PRINT N'Exec failed';

END CATCH

END

更新email by mail_id

IF (OBJECT_ID(N'[chk].[usp_update_mail_send_list_rec_by_mail_id]', N'P') IS NOT NULL)

BEGIN

PRINT N'刪除存儲過程:[chk].[usp_update_mail_send_list_rec_by_mail_id]';

DROP PROC [chk].[usp_update_mail_send_list_rec_by_mail_id];

END

GO

CREATE PROC [chk].[usp_update_mail_send_list_rec_by_mail_id]

(

@mail_id NVARCHAR(50)

,@send_date DATETIME

,@send_status SMALLINT

)

AS

--====================================================================================================================================

-- ProcedureName : [chk].[usp_update_mail_send_list_rec_by_mail_id]

-- Author : john.xiong

-- CreateDate : 2018-12-24

-- Description : 根據mail_id更新mail發生記錄信息

/*************************************Parameters參數說明*******************************************************************************

-- @mail_id : 郵件id NEWID

**************************************Modfied List修改記錄*****************************************************************************

-- Modified Date Modified User Version Modified Reason

**************************************************************************************************************************************

-- 2018-12-24 john.xiong V01.00.00 初始化版本

**************************************************************************************************************************************/

--====================================================================================================================================

BEGIN

BEGIN TRY

DECLARE

@begin_time DATETIME

,@end_time DATETIME

,@cost_time INT

SET @begin_time = DATEADD(HOUR, 8, GETDATE());

INSERT INTO [chk].[tb_proc_cost_log]

(

[proc_name]

,[Object_name]

,[execute_time]

,[action]

,[remark]

,[cost_time]

)

SELECT

N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name]

,N'chk.mail_send_list_rec' AS [Object_name]

,@begin_time AS [execute_time]

,N'start' AS [action]

,N'' AS [remark]

,0 AS [cost_time]

IF (@mail_id IS NULL)

BEGIN

RAISERROR (N'mail id錯誤!強制退出', 16, 1);

END

IF (@send_date IS NULL)

BEGIN

SET @send_date = DATEADD(HOUR, 8, GETDATE());

END

UPDATE [chk].[mail_send_list_rec]

SET [send_date] = @send_date, [send_status] = @send_status

WHERE [mail_id] = @mail_id;

SET @end_time = DATEADD(HOUR, 8, GETDATE());

SET @cost_time = DATEDIFF(SECOND, @begin_time, @end_time);

INSERT INTO [chk].[tb_proc_cost_log]

(

[proc_name]

,[Object_name]

,[execute_time]

,[action]

,[remark]

,[cost_time]

)

SELECT

N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name]

,N'chk.mail_send_list_rec' AS [Object_name]

,@end_time AS [execute_time]

,N'end' AS [action]

,N'' AS [remark]

,@cost_time AS [cost_time]

PRINT N'exec successed'

END TRY

BEGIN CATCH

INSERT INTO [chk].[log_proc_error_rec]

(

[proc_name]

,[error_source]

,[error_time]

,[error_severity]

,[error_state]

,[error_msg]

,[log_user]

)

SELECT

N'chk.usp_update_mail_send_list_rec_by_mail_id' AS [proc_name]

,ERROR_PROCEDURE() AS [error_source]

,DATEADD(HOUR, 8, GETDATE()) AS [error_time]

,ERROR_SEVERITY() AS [error_severity]

,ERROR_STATE() AS [error_state]

,ERROR_MESSAGE() AS [error_msg]

,SUSER_SNAME() AS [log_user]

PRINT N'exec failed'

END CATCH

END

三、和job結合調用

在需要發送email的job中,將joblet拖拉過去即可,然后生成一個你需要發送的郵件的mail_id,通過input組件將其傳遞到joblet組件的input輸入中,這樣就可以將joblet融入到job中。

9

如果您覺得此文章對您有幫助,請點擊右下方【推薦】讓更多人看到,thanks!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM