Oracle和Elasticsearch數據同步


Python編寫Oracle和Elasticsearch數據同步腳本

標簽: elasticsearchoraclecx_Oraclepython數據同步

一、版本

Python版本 x64 2.7.12 

Oracle(x64 12.1.0.2.0)和Elasticsearch(2.2.0)

python編輯器 PyCharm
 
下載安裝請選擇適合自己機器的版本
 
二、下載模塊
通過官網下載和安裝cx_Oracle和pyes模塊,分別用於操作Oracle數據庫和ES。安裝fcntl模塊用於解決python腳本單例執行問題。

如果是遠程連接數據庫和ES,請一定注意安裝的模塊或包版本。務必選擇相應的版本,不然會遇到問題。

三、安裝過程中會遇到的問題

cx_Oracle在本地安裝過程中出現的一些問題:
1、安裝c++for python的環境
2、安裝Oracle數據庫(或者安裝API接口中需要的文件而不必下載配置整個oracle環境)
3、打開數據庫工具 oracle SQL developor 按要求創建連接,並新建用戶(創建數據庫用戶名時以c##開頭,不然會提示)
4、oracle連接不上遠程的服務器,檢查版本是否匹配
 
fcntl在windows上安裝時出現的問題:
1、用pip install fcntl 報錯:indentationerror: unexpected indent(模塊版本有問題)
 

四、源碼

[python]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. # -*- coding: utf-8 -*-  
  2. """ 
  3. 作者:陳龍 
  4. 日期:2016-7-22 
  5. 功能:oracle數據庫到ES的數據同步 
  6. """  
  7. import os  
  8. import sys  
  9. import datetime, time  
  10. # import fcntl  
  11. import threading  
  12. import pyes  # 引入pyes模塊,ES接口  
  13. import cx_Oracle  # 引入cx_Oracle模塊,Oracle接口  
  14.   
  15. os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'  # 中文編碼  
  16. reload(sys)  # 默認編碼設置為utf-8  
  17. sys.setdefaultencoding('utf-8')  
  18.   
  19. # 創建ES連接 並返回連接參數  
  20. def connect_ES(addr):  
  21.     try:  
  22.         global conn  
  23.         conn = pyes.ES(addr)  # 鏈接ES '127.0.0.1:9200'  
  24.         print 'ES連接成功'  
  25.         return conn  
  26.     except:  
  27.         print 'ES連接錯誤'  
  28.         pass  
  29.   
  30. # 創建ES映射mapping 注意各各個字段的類型  
  31. def create_ESmapping():  
  32.     global spiderInfo_mapping, involveVideo_mapping, involveCeefax_mapping,keyWord_mapping,sensitiveWord_mapping  
  33.     spiderInfo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
  34.                           'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
  35.                           'title': {'index': 'analyzed', 'type': 'string'},  
  36.                           'author': {'index': 'not_analyzed', 'type': 'string'},  
  37.                           'content': {'index': 'analyzed', 'type': 'string'},  
  38.                           'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
  39.                           'browseNum': {'index': 'not_analyzed', 'type': 'integer'},  
  40.                           'commentNum': {'index': 'not_analyzed', 'type': 'integer'},  
  41.                           'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 除去涉我部分內容的ES映射結構  
  42.     involveVideo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
  43.                             'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
  44.                             'title': {'index': 'analyzed', 'type': 'string'},  
  45.                             'author': {'index': 'not_analyzed', 'type': 'string'},  
  46.                             'summary': {'index': 'analyzed', 'type': 'string'},  
  47.                             'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
  48.                             'url': {'index': 'not_analyzed', 'type': 'string'},  
  49.                             'imgUrl': {'index': 'not_analyzed', 'type': 'string'},  
  50.                             'ranking': {'index': 'not_analyzed', 'type': 'integer'},  
  51.                             'playNum': {'index': 'not_analyzed', 'type': 'integer'},  
  52.                             'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我視音頻內容的ES映射結構  
  53.     involveCeefax_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},  
  54.                             'tableId': {'index': 'not_analyzed', 'type': 'integer'},  
  55.                             'title': {'index': 'analyzed', 'type': 'string'},  
  56.                             'author': {'index': 'not_analyzed', 'type': 'string'},  
  57.                             'content': {'index': 'analyzed', 'type': 'string'},  
  58.                             'publishTime': {'index': 'not_analyzed', 'type': 'string'},  
  59.                             'keyWords': {'index': 'not_analyzed', 'type': 'string'},  
  60.                             'popularity': {'index': 'not_analyzed', 'type': 'integer'},  
  61.                             'url': {'index': 'not_analyzed', 'type': 'string'},  
  62.                             'dataType': {'index': 'not_analyzed', 'type': 'integer'}}  # 涉我圖文資訊內容的ES映射結構  
  63.     keyWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},  
  64.                       'keywords':{'index': 'not_analyzed', 'type': 'string'}}  
  65.     sensitiveWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},  
  66.                             'sensitiveType':{'index': 'not_analyzed', 'type': 'string'},  
  67.                             'sensitiveTopic': {'index': 'not_analyzed', 'type': 'string'},  
  68.                             'sensitiveWords': {'index': 'not_analyzed', 'type': 'string'}}  
  69.   
  70. # 創建ES相關索引和索引下的type  
  71. def create_ESindex(ES_index, index_type1,index_type2,index_type3,index_type4,index_type5):  
  72.   
  73.     if conn.indices.exists_index(ES_index):  
  74.         pass  
  75.     else:  
  76.         conn.indices.create_index(ES_index)  # 如果所有Str不存在,則創建Str索引  
  77.         create_ESmapping()  
  78.         conn.indices.put_mapping(index_type1, {'properties': spiderInfo_mapping},[ES_index])  # 在索引pom下創建spiderInfo的_type  "spiderInfo"  
  79.         conn.indices.put_mapping(index_type2, {'properties': involveVideo_mapping},[ES_index])  # 在索引pom下創建involveVideo的_type  "involveVideo"  
  80.         conn.indices.put_mapping(index_type3, {'properties': involveCeefax_mapping},[ES_index])  # 在索引pom下創建involveCeefax的_type  "involveCeefax"  
  81.         conn.indices.put_mapping(index_type4, {'properties': keyWord_mapping}, [ES_index])  
  82.         conn.indices.put_mapping(index_type5, {'properties': sensitiveWord_mapping}, [ES_index])  
  83.     # conn.ensure_index  
  84.   
  85. # 創建數據庫連接 並返回連接參數  
  86. def connect_Oracle(name, password, address):  
  87.     try:  
  88.         global conn1  
  89.         # conn1 = cx_Oracle.connect('c##chenlong','1234567890','localhost:1521/ORCL') #鏈接本地數據庫  
  90.         conn1 = cx_Oracle.connect(name, password, address)  # 鏈接遠程數據庫 "pom","Bohui@123","172.17.7.118:1521/ORCL"  
  91.         print 'Oracle連接成功'  
  92.         return conn1  
  93.     except:  
  94.         print 'ES數據同步腳本連接不上數據庫,請檢查connect參數是否正確,或者模塊版本是否匹配'  
  95.         pass  
  96.   
  97. def fetch_account(accountcode):  # 取兩個‘_’之間的賬號名稱  
  98.     end = accountcode.find('_')  
  99.     return accountcode[0:end].strip()  
  100. # 根據表的個數創建不同的對象  
  101. # 從記錄文檔中讀取各個表的記錄ID,判斷各個表的ID是否有變化  
  102. # 分別讀取各個表中的相關數據  
  103.   
  104. # 讀取各個表的ID與記錄的ID(記錄在文本或者數據庫中)並判斷  
  105. """def read_compare_ID(): 
  106.     global tuple_tableName_IdNum 
  107.     global cur 
  108.     tuple_tableName_IdNum = {} 
  109.     tablename = [] 
  110.     cur = conn1.cursor() 
  111.     result1 = cur.execute("select * from tabs")  ##執行數據庫操作 讀取各個表名 
  112.     row = result1.fetchall() 
  113.     for x in row: 
  114.         tablename.append(x[0])  # 將表名取出並賦值給tablename數組 
  115.         result2 = cur.execute('select {}_ID  from {}'.format(x[0], x[0])) 
  116.         ID_num = result2.fetchall() 
  117.         tuple_tableName_IdNum[x[0]] = ID_num"""  
  118.   
  119. def readOracle_writeES(tableName, ES_index, index_type):  
  120.     global cc  
  121.     cur = conn1.cursor()  
  122.     #result_AlltableNames = cur.execute("select * from tabs")  
  123.     result_latestId = cur.execute("select max({}_Id) from {} ".format(tableName,tableName))  
  124.     num1 = result_latestId.fetchone() #當前表中的最大ID  
  125.     print '當前表中的最大ID{}'.format(num1[0])  
  126.     result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName.upper())) #通過數據庫表拿到更新的ID tablename 都轉化成大寫  
  127.     num2 = result_rememberId.fetchone() #上次記錄的更新ID  
  128.     print '上次記錄的更新ID{}'.format(num2[0])  
  129.     if tableName.upper() == 'T_SOCIAL':  
  130.         while num2[0] < num1[0]:  
  131.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,likeNum,forwardNum,commentNum,accountCode from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  132.             result_tuple1 = result_readOracle.fetchall()  #之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  133.             for i in result_tuple1:  #一條一條寫入ES,這個速度太慢,改進 通過bulk接口導入  
  134.                 aa= (i[5]+i[6])  
  135.                 bb=  (i[7]+i[8])  
  136.                 if conn.index(  
  137.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),  
  138.                     'content': unicode(i[3]), 'publishTime': str(i[4]), 'browseNum': aa,  
  139.                     'commentNum':bb, 'dataType':fetch_account(i[9])}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  140.                     cc += 1  
  141.                     print 'bulk導入后的ID:{}'.format(i[0])  
  142.             rememberId = i[0] #如果寫入成功才賦值  
  143.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
  144.             conn1.commit()  
  145.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  146.             num2 = result_rememberId.fetchone()  
  147.         print "{}讀{}寫成功".format(tableName,index_type)  
  148.     if tableName.upper() == 'T_HOTSEARCH':  
  149.         while num2[0] < num1[0]:  
  150.             result_readOracle = cur.execute("select {}_ID,accountCode,title,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  151.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  152.             for i in result_tuple1:  #一條一條寫入ES,這個速度太慢,改進 通過bulk接口導入  
  153.                 if conn.index(  
  154.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': '','content': '', 'publishTime': str(i[3]), 'browseNum': 0,  
  155.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  156.                     cc += 1  
  157.                     print 'bulk導入后的ID:{}'.format(i[0])  
  158.             rememberId = i[0]  
  159.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  160.             conn1.commit()  
  161.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  162.             num2 = result_rememberId.fetchone()  
  163.         print "{}讀{}寫成功".format(tableName, index_type)  
  164.     if tableName.upper() == 'T_VIDEO_HOT':  
  165.         while num2[0] < num1[0]:  
  166.             result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName,tableName,tableName,num2[0]))  
  167.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  168.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  169.                 if conn.index(  
  170.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
  171.                     'content': '', 'publishTime': str(i[4]), 'browseNum': 0,  
  172.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type, bulk=True):  # 將數據寫入索引pom的spiderInfo  
  173.                     cc += 1  
  174.                     print 'bulk導入后的ID:{}'.format(i[0])  
  175.             rememberId = i[0]  
  176.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  177.             conn1.commit()  
  178.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  179.             num2 = result_rememberId.fetchone()  
  180.         print "{}讀寫成功".format(tableName)  
  181.     if tableName.upper() == 'T_PRESS':  
  182.         while num2[0] < num1[0]:  
  183.             result_readOracle = cur.execute(  
  184.                 "select {}_ID,accountCode,title,Author,PublishDate,Content from {} where {}_ID > {} and rownum<=40 ".format(  
  185.                     tableName, tableName, tableName, num2[0]))  
  186.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  187.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  188.                 if conn.index(  
  189.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
  190.                     'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': 0,  
  191.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  192.                     cc += 1  
  193.                     print 'bulk導入后的ID:{}'.format(i[0])  
  194.             rememberId = i[0]  
  195.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  196.             conn1.commit()  
  197.             result_rememberId = cur.execute(  
  198.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  199.             num2 = result_rememberId.fetchone()  
  200.         print "{}讀寫成功".format(tableName)  
  201.     if tableName.upper() == 'T_INDUSTRY':  
  202.         while num2[0] < num1[0]:  
  203.             result_readOracle = cur.execute(  
  204.                 "select {}_ID,accountCode,title,Author,PublishTime,Content,BrowseNum from {} where {}_ID > {} and rownum<=40 ".format(  
  205.                     tableName, tableName, tableName, num2[0]))  
  206.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  207.   
  208.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  209.                 if conn.index(  
  210.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),  
  211.                     'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': i[6],  
  212.                     'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True) : # 將數據寫入索引pom的spiderInfo  
  213.                     cc += 1  
  214.                     print 'bulk導入后的ID:{}'.format(i[0])  
  215.             rememberId = i[0]  
  216.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  217.             conn1.commit()  
  218.             result_rememberId = cur.execute(  
  219.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  220.             num2 = result_rememberId.fetchone()  
  221.         print "{}讀寫成功".format(tableName)  
  222.     if tableName.upper() == 'T_SOCIAL_SITESEARCH':  
  223.         while num2[0] < num1[0]:  
  224.             result_readOracle = cur.execute('select {}_ID,title,author,content,publishTime,keyWords,browseNum,likeNum,forwardNum,commentNum,url,accountCode from {} where ({}_ID > {})'.format(tableName, tableName, tableName, num2[0]))  
  225.             result_tuple1 = result_readOracle.fetchmany(50)  #因為數據量太大,超過了變量的內存空間,所以一次性取40條  
  226.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  227.                 popularity = (i[6] + i[7] + i[8] * 2 + i[9] * 2)  
  228.                 if conn.index(  
  229.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
  230.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(i[5]),  
  231.                     'popularity':popularity,'url': i[10],  
  232.                     'dataType':fetch_account(i[11])}, ES_index, index_type, bulk=True):  # 將數據寫入索引pom的spiderInfo  
  233.                     cc += 1  
  234.                     print 'bulk導入后的ID:{}'.format(i[0])  
  235.             rememberId = i[0]  
  236.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
  237.             conn1.commit()  
  238.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  239.             num2 = result_rememberId.fetchone()  
  240.         print "{}讀寫成功".format(tableName)  
  241.     if tableName.upper() == 'T_REALTIME_NEWS':  
  242.         while num2[0] < num1[0]:  
  243.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  244.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  245.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  246.                 popularity = (i[5] + i[6] * 2)  
  247.                 if conn.index(  
  248.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
  249.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),  
  250.                     'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 將數據寫入索引pom的spiderInfo  
  251.                     cc += 1  
  252.                     print 'bulk導入后的ID:{}'.format(i[0])  
  253.             rememberId = i[0]  
  254.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  255.             conn1.commit()  
  256.             result_rememberId = cur.execute(  
  257.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  258.             num2 = result_rememberId.fetchone()  
  259.         print "{}讀{}寫成功".format(tableName, index_type)  
  260.     if tableName.upper() == 'T_KEY_NEWS':  
  261.         while num2[0] < num1[0]:  
  262.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  263.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  264.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  265.                 popularity = (i[5] + i[6] * 2)  
  266.                 if conn.index(  
  267.                     {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),  
  268.                     'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),  
  269.                     'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True):  # 將數據寫入索引pom的spiderInfo  
  270.                     cc += 1  
  271.                     print 'bulk導入后的ID:{}'.format(i[0])  
  272.             rememberId = i[0]  
  273.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  274.             conn1.commit()  
  275.             result_rememberId = cur.execute(  
  276.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  277.             num2 = result_rememberId.fetchone()  
  278.         print "{}讀{}寫成功".format(tableName, index_type)  
  279.     if tableName.upper() == 'T_LOCAL_NEWS':  
  280.         while num2[0] < num1[0]:  
  281.             result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  282.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  283.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  284.                 popularity = (i[5] + i[6] * 2)  
  285.                 if conn.index(  
  286.                     {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),  
  287.                     'content': unicode(i[3]), 'publishTime': str(i[4]), 'keyWords': unicode(''),  
  288.                     'popularity': popularity, 'url': i[8], 'dataType': fetch_account(i[7])}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  289.                     cc += 1  
  290.                     print 'bulk導入后的ID:{}'.format(i[0])  
  291.             rememberId = i[0]  
  292.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  293.             conn1.commit()  
  294.             result_rememberId = cur.execute(  
  295.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  296.             num2 = result_rememberId.fetchone()  
  297.         print "{}讀{}寫成功".format(tableName, index_type)  
  298.     if tableName.upper() == 'T_VIDEO_SITESEARCH':  
  299.         while num2[0] < num1[0]:  
  300.             result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime,url,imgUrl,playNum,keyWords from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))  
  301.             result_tuple1 = result_readOracle.fetchall()  # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條  后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率  
  302.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  303.                 if conn.index(  
  304.                     {  
  305.                     'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]), 'author': unicode(i[3]),  
  306.                     'summary': unicode('0'), 'publishTime': str(i[4]), 'browseNum': i[7],'url':i[5],'imgUrl':i[6],'ranking':0,  
  307.                     'playNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  308.                     cc += 1  
  309.                     print 'bulk導入后的ID:{}'.format(i[0])  
  310.             rememberId = i[0]  
  311.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  312.             conn1.commit()  
  313.             result_rememberId = cur.execute(  
  314.                 "select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  315.             num2 = result_rememberId.fetchone()  
  316.         print "{}讀{}寫成功".format(tableName,index_type)  
  317.     if tableName.upper() == 'T_BASE_KEYWORDS':  
  318.         while num2[0] < num1[0]:  
  319.             result_readOracle = cur.execute('select {}_ID,keywords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName, num2[0]))  
  320.             result_tuple1 = result_readOracle.fetchall()  #因為數據量太大,超過了變量的內存空間,所以一次性取40條  
  321.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  322.                 if conn.index({'id': i[0], 'keywords': i[1]}, ES_index, index_type,bulk=True):  # 將數據寫入索引pom的spiderInfo  
  323.                     cc += 1  
  324.                     print 'bulk導入后的ID:{}'.format(i[0])  
  325.             rememberId = i[0]  
  326.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))  
  327.             conn1.commit()  
  328.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  329.             num2 = result_rememberId.fetchone()  
  330.         print "{}讀寫成功".format(tableName)  
  331.     if tableName.upper() == 'T_BASE_SENSITIVEWORDS':  
  332.         while num2[0] < num1[0]:  
  333.             result_readOracle = cur.execute('select {}_ID,SensitiveType,SensitiveTopic,SensitiveWords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName,num2[0]))  
  334.             result_tuple1 = result_readOracle.fetchall()  # 因為數據量太大,超過了變量的內存空間,所以一次性取40條  
  335.             for i in result_tuple1:  # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?  
  336.                 if conn.index({'id':i[0],  
  337.                             'sensitiveType':unicode(i[1]),  
  338.                             'sensitiveTopic': unicode(i[2]),  
  339.                             'sensitiveWords':unicode(i[3])}, ES_index, index_type, bulk=True):  # 將數據寫入索引pom的spiderInfo  
  340.                     cc +=1  
  341.             print 'bulk導入后的ID:{}'.format(i[0])  
  342.             rememberId = i[0]  
  343.             cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))  
  344.             conn1.commit()  
  345.             result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName))  # 通過數據庫表拿到更新的ID  
  346.             num2 = result_rememberId.fetchone()  
  347.         print "{}讀寫成功".format(tableName)  
  348.     else:  
  349.         pass  
  350.   
  351. def ww(a):  
  352.     while True:  
  353.         print a  
  354.         time.sleep(0.5)  #用於多線程的一個實驗函數  
  355.   
  356. if __name__ == "__main__":  
  357.     cc = 0  
  358.     connect_ES('172.17.5.66:9200')  
  359.     # conn.indices.delete_index('_all')  # 清除所有索引  
  360.     create_ESindex("pom", "spiderInfo", "involveVideo", "involveCeefax","keyWord","sensitiveWord")  
  361.     connect_Oracle("pom", "Bohui@123", "172.17.7.118:1521/ORCL")  
  362.     # thread.start_new_thread(readOracle_writeES,("T_SOCIAL","pom","spiderInfo"),)#創建一個多線程  
  363.     # thread.start_new_thread(readOracle_writeES,("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"),)#創建一個多線程  
  364.     mm = time.clock()  
  365.     readOracle_writeES("T_SOCIAL", "pom", "spiderInfo") #表名雖然在程序中設置了轉化為大寫,但是還是全大寫比較好  
  366.     readOracle_writeES("T_HOTSEARCH", "pom", "spiderInfo")  
  367.     readOracle_writeES("T_VIDEO_HOT", "pom", "spiderInfo")  
  368.     readOracle_writeES("T_PRESS", "pom", "spiderInfo")  
  369.     readOracle_writeES("T_INDUSTRY", "pom", "spiderInfo")  
  370.     readOracle_writeES("T_VIDEO_SITESEARCH", "pom", "involveVideo")  
  371.     readOracle_writeES("T_REALTIME_NEWS", "pom", "involveCeefax")  
  372.     readOracle_writeES("T_KEY_NEWS", "pom", "involveCeefax")  
  373.     readOracle_writeES("T_LOCAL_NEWS", "pom", "involveCeefax")  
  374.     readOracle_writeES("T_SOCIAL_SITESEARCH", "pom", "involveCeefax")  
  375.     readOracle_writeES("T_BASE_KEYWORDS", "pom", "keyWord")  
  376.     readOracle_writeES("T_BASE_SENSITIVEWORDS", "pom", "sensitiveWord")  
  377.     nn = time.clock()  
  378.     # conn.indices.close_index('pom')  
  379.     conn1.close()  
  380.     print '數據寫入耗時:{}  成功寫入數據{}條'.format(nn-mm,cc)  
  381.   
  382. #實驗多線程  
  383.     """ 
  384.     while a < 100: 
  385.         conn.index( 
  386.             {'tableName': 'T_base_account', 'type': '1', 'tableId': '123', 'title': unicode('陳龍'), 'author': 'ABC', 
  387.             'content': 'ABC', 'publishTime': '12:00:00', 'browseNum': '12', 'commentNum': '12', 'dataType': '1'}, 
  388.             "pom", "spiderInfo", )  # 將數據寫入索引pom的spiderInfo 
  389.         a += 1 
  390.     print time.ctime() 
  391.     """  
  392. """ 
  393.     threads = [] 
  394.     t1 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL","pom","spiderInfo")) 
  395.     threads.append(t1) 
  396.     #t3 = threading.Thread(target=ww,args=(10,)) 
  397.     #threads.append(t3) 
  398.     #t2 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL_SITESEARCH", "pom", "spiderInfo")) 
  399.     #threads.append(t2) 
  400.     print time.ctime() 
  401.     for t in threads: 
  402.         t.setDaemon(True) 
  403.         t.start() 
  404.     t.join() 
  405. """  

五、編譯過程的問題
 
1、直接print游標cur.execute ( ) 將不能得到我們想要的結果
 
result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
print result2
返回:<__builtin__.OracleCursor on <cx_Oracle.Connection to pom@172.17.7.118:1521/ORCL>>
 
 
result2 = cur.execute('select T_SOCIAL_ID  from T_SOCIAL')
print result2
num = result2.fetchall()
print num
for i in num:
    print i[0]

 
返回:[(55,), (56,), (57,), (58,), (59,), (60,), (61,), (62,), (63,), (64,), (65,), (66,), (67,), (68,), (69,), (70,)]
     55
注意:用fetchall()得到的數據為:[(55,), (56,), (57,), (58,), (59,)] 元組而不是數字。
用 變量[num] 的方式取出具體的數值
 
2、cx_Oracle中文編碼亂碼問題
 
顯示中文亂碼:������DZ��� �����������
或者顯示未知的編碼:('\xce\xd2\xd5\xe6\xb5\xc4\xca\xc7\xb1\xea\xcc\xe2',)
需要注意一下幾個地方,將數據庫中的中文編碼轉化成utf-8編碼,並將中文寫入elasticsearch
 
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #中文編碼
 
 
reload(sys) #默認編碼設置為utf-8 一定需要reload(sys)
sys.setdefaultencoding('utf-8')
 
'title':unicode('中文')
 
python傳遞給js的列表中文亂碼怎么解決?  
json.dumps(dictionary,ensure_ascii=False)
 
 
3、遠程連接不上Oracle數據庫的問題
 
第一:確保connect()中各個參數的值都正確。例如
 
conn1 = cx_Oracle.connect("username","password","172.17.7.118:1521/ORCL")  #連接遠程數據庫
conn1 = cx_Oracle.connect('username','password','localhost:1521/ORCL') #連接本地數據庫
conn2 = pyes.ES('127.0.0.1:9200')  #連接ES
 
第二:確保安裝的版本都符合要求,包括模塊的版本。
 
4、提示TypeError: 'NoneType' object is not callable
 
確保mapping中的各個字段類型都設置正確
檢查索引和映射是否都書寫正確
 
5、腳本同時讀取多個數據庫表
涉及到Python中多線程的問題,給每一個表起一個線程,同時給每一個線程加鎖
編譯時碰到問題:AssertionError: group argument must be None for now(檢查函數是否書寫正確,讀寫沖突)
AttributeError: 'builtin_function_or_method' object has no attribute 'setDaemon'
cx_Oracle.ProgrammingError: LOB variable no longer valid after subsequent fetch(fetchall數據量過大,溢出 設置一次取數據庫中 rownum數)
TypeError: 'NoneType' object has no attribute '__getitem__'  (注意數據庫查詢對應的大小寫)
No handlers could be found for logger "pyes"  可能是連接超時
AttributeError: 'tuple' object has no attribute 'append'   tuple不能直接用append
TypeError: 'tuple' object does not support item assignment  tuple不能賦值
數據庫批量讀取
就多線程問題咨詢了大神,大神建議用多進程來實現會比較簡單
 
6、腳本定時觸發問題
Linux crontab定時執行任務,crontab防止腳本周期內未執行完重復執行
 
 
7、單實例的問題。防止腳本沒有執行完再次觸發
剛開始設想在腳本中完成,后來知道這個可以在系統中設定
 
8、數據同步插件
網上有大量的關於同步關系型數據庫的有關插件 logstash-input-jdbc  不太好安裝,不知道如何使用。
MySQL和ES同步插件的介紹,例如elasticsearch-river-jdbc
在這兒啟用的是bulk接口,批量導入。數據同步的速度大大提高
 
9、判斷數據是否同步成功
這個是之前一直沒有注意的問題,但其實在數據傳輸的時候是非常重要的。
目前的判斷方法是看ES中的數據量到底有多少,然后對照統計量進行判斷分析,,這也是在后期發現有部分數據沒有同步過去的方法。
 
10、統計寫入了多少數據
UnboundLocalError: local variable 'cc' referenced before assignment 
定義了全局變量cc,但是在局部進行了修改,所以報錯 修改同名的全局變量,則認為是一個局部變量
 
五、源碼改進
因為數據寫入的速度太慢(40條數據 800Kb大小 寫入花費2S左右),所有在原來的基礎上,修改了讀取數據庫中未寫入內容的策略和ES寫入的策略。
 
插入完成的源碼
 
調試問題:
1、pip install elasticsearch  引入helpers函數模塊,使用bulk函數批量導入。
2、AttributeError: 'ES' object has no attribute 'transport'  因為原來使用的是pyes模塊 現在換成了elasticsearch,所以改成對應模塊
conn2 = Elasticsearch( "127.0.0.1:9200")
其他常見錯誤
    SerializationError:JSON數據序列化出錯,通常是因為不支持某個節點值的數據類型
    RequestError:提交數據格式不正確
    ConflictError:索引ID沖突
    TransportError:連接無法建立
 
最后通過了解其實是找到了數據同步的插件 logstash-input-jdbc 能夠實現數據的同步增刪改查,按照網上的教程能夠很輕松的實現,遇到的問題就是插件同步過去的字段都必須是小寫。
 
------------
Python中cx_Oracle的一些函數:
commit() 提交
rollback() 回滾
cursor用來執行命令的方法:
callproc(self, procname, args):用來執行存儲過程,接收的參數為存儲過程名和參數列表,返回值為受影響的行數
execute(self, query, args):執行單條sql語句,接收的參數為sql語句本身和使用的參數列表,返回值為受影響的行數
executemany(self, query, args):執行單挑sql語句,但是重復執行參數列表里的參數,返回值為受影響的行數
nextset(self):移動到下一個結果集

cursor用來接收返回值的方法:
fetchall(self):接收全部的返回結果行.
fetchmany(self, size=None):接收size條返回結果行.如果size的值大於返回的結果行的數量,則會返回cursor.arraysize條數據.
fetchone(self):返回一條結果行.
scroll(self, value, mode='relative'):移動指針到某一行.如果mode='relative',則表示從當前所在行移動value條,如果 mode='absolute',則表示從結果集的第一行移動value條.
MySQL中關於中文編碼的問題
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python') 中加一個屬性:
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python',charset='utf8') 
charset是要跟你數據庫的編碼一樣,如果是數據庫是gb2312 ,則寫charset='gb2312'。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM