一、版本
Python版本 x64 2.7.12
Oracle(x64 12.1.0.2.0)和Elasticsearch(2.2.0)
python編輯器 PyCharm
下載安裝請選擇適合自己機器的版本
二、下載模塊
通過官網下載和安裝cx_Oracle和pyes模塊,分別用於操作Oracle數據庫和ES。安裝fcntl模塊用於解決python腳本單例執行問題。
如果是遠程連接數據庫和ES,請一定注意安裝的模塊或包版本。務必選擇相應的版本,不然會遇到問題。
三、安裝過程中會遇到的問題
cx_Oracle在本地安裝過程中出現的一些問題:
1、安裝c++for python的環境
2、安裝Oracle數據庫(或者安裝API接口中需要的文件而不必下載配置整個oracle環境)
3、打開數據庫工具 oracle SQL developor 按要求創建連接,並新建用戶(創建數據庫用戶名時以c##開頭,不然會提示)
4、oracle連接不上遠程的服務器,檢查版本是否匹配
fcntl在windows上安裝時出現的問題:
1、用pip install fcntl 報錯:indentationerror: unexpected indent(模塊版本有問題)
四、源碼
- # -*- coding: utf-8 -*-
- """
- 作者:陳龍
- 日期:2016-7-22
- 功能:oracle數據庫到ES的數據同步
- """
- import os
- import sys
- import datetime, time
- # import fcntl
- import threading
- import pyes # 引入pyes模塊,ES接口
- import cx_Oracle # 引入cx_Oracle模塊,Oracle接口
- os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' # 中文編碼
- reload(sys) # 默認編碼設置為utf-8
- sys.setdefaultencoding('utf-8')
- # 創建ES連接 並返回連接參數
- def connect_ES(addr):
- try:
- global conn
- conn = pyes.ES(addr) # 鏈接ES '127.0.0.1:9200'
- print 'ES連接成功'
- return conn
- except:
- print 'ES連接錯誤'
- pass
- # 創建ES映射mapping 注意各各個字段的類型
- def create_ESmapping():
- global spiderInfo_mapping, involveVideo_mapping, involveCeefax_mapping,keyWord_mapping,sensitiveWord_mapping
- spiderInfo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
- 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
- 'title': {'index': 'analyzed', 'type': 'string'},
- 'author': {'index': 'not_analyzed', 'type': 'string'},
- 'content': {'index': 'analyzed', 'type': 'string'},
- 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
- 'browseNum': {'index': 'not_analyzed', 'type': 'integer'},
- 'commentNum': {'index': 'not_analyzed', 'type': 'integer'},
- 'dataType': {'index': 'not_analyzed', 'type': 'integer'}} # 除去涉我部分內容的ES映射結構
- involveVideo_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
- 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
- 'title': {'index': 'analyzed', 'type': 'string'},
- 'author': {'index': 'not_analyzed', 'type': 'string'},
- 'summary': {'index': 'analyzed', 'type': 'string'},
- 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
- 'url': {'index': 'not_analyzed', 'type': 'string'},
- 'imgUrl': {'index': 'not_analyzed', 'type': 'string'},
- 'ranking': {'index': 'not_analyzed', 'type': 'integer'},
- 'playNum': {'index': 'not_analyzed', 'type': 'integer'},
- 'dataType': {'index': 'not_analyzed', 'type': 'integer'}} # 涉我視音頻內容的ES映射結構
- involveCeefax_mapping = {'tableName': {'index': 'not_analyzed', 'type': 'string'},
- 'tableId': {'index': 'not_analyzed', 'type': 'integer'},
- 'title': {'index': 'analyzed', 'type': 'string'},
- 'author': {'index': 'not_analyzed', 'type': 'string'},
- 'content': {'index': 'analyzed', 'type': 'string'},
- 'publishTime': {'index': 'not_analyzed', 'type': 'string'},
- 'keyWords': {'index': 'not_analyzed', 'type': 'string'},
- 'popularity': {'index': 'not_analyzed', 'type': 'integer'},
- 'url': {'index': 'not_analyzed', 'type': 'string'},
- 'dataType': {'index': 'not_analyzed', 'type': 'integer'}} # 涉我圖文資訊內容的ES映射結構
- keyWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},
- 'keywords':{'index': 'not_analyzed', 'type': 'string'}}
- sensitiveWord_mapping = {'id':{'index': 'not_analyzed', 'type': 'integer'},
- 'sensitiveType':{'index': 'not_analyzed', 'type': 'string'},
- 'sensitiveTopic': {'index': 'not_analyzed', 'type': 'string'},
- 'sensitiveWords': {'index': 'not_analyzed', 'type': 'string'}}
- # 創建ES相關索引和索引下的type
- def create_ESindex(ES_index, index_type1,index_type2,index_type3,index_type4,index_type5):
- if conn.indices.exists_index(ES_index):
- pass
- else:
- conn.indices.create_index(ES_index) # 如果所有Str不存在,則創建Str索引
- create_ESmapping()
- conn.indices.put_mapping(index_type1, {'properties': spiderInfo_mapping},[ES_index]) # 在索引pom下創建spiderInfo的_type "spiderInfo"
- conn.indices.put_mapping(index_type2, {'properties': involveVideo_mapping},[ES_index]) # 在索引pom下創建involveVideo的_type "involveVideo"
- conn.indices.put_mapping(index_type3, {'properties': involveCeefax_mapping},[ES_index]) # 在索引pom下創建involveCeefax的_type "involveCeefax"
- conn.indices.put_mapping(index_type4, {'properties': keyWord_mapping}, [ES_index])
- conn.indices.put_mapping(index_type5, {'properties': sensitiveWord_mapping}, [ES_index])
- # conn.ensure_index
- # 創建數據庫連接 並返回連接參數
- def connect_Oracle(name, password, address):
- try:
- global conn1
- # conn1 = cx_Oracle.connect('c##chenlong','1234567890','localhost:1521/ORCL') #鏈接本地數據庫
- conn1 = cx_Oracle.connect(name, password, address) # 鏈接遠程數據庫 "pom","Bohui@123","172.17.7.118:1521/ORCL"
- print 'Oracle連接成功'
- return conn1
- except:
- print 'ES數據同步腳本連接不上數據庫,請檢查connect參數是否正確,或者模塊版本是否匹配'
- pass
- def fetch_account(accountcode): # 取兩個‘_’之間的賬號名稱
- end = accountcode.find('_')
- return accountcode[0:end].strip()
- # 根據表的個數創建不同的對象
- # 從記錄文檔中讀取各個表的記錄ID,判斷各個表的ID是否有變化
- # 分別讀取各個表中的相關數據
- # 讀取各個表的ID與記錄的ID(記錄在文本或者數據庫中)並判斷
- """def read_compare_ID():
- global tuple_tableName_IdNum
- global cur
- tuple_tableName_IdNum = {}
- tablename = []
- cur = conn1.cursor()
- result1 = cur.execute("select * from tabs") ##執行數據庫操作 讀取各個表名
- row = result1.fetchall()
- for x in row:
- tablename.append(x[0]) # 將表名取出並賦值給tablename數組
- result2 = cur.execute('select {}_ID from {}'.format(x[0], x[0]))
- ID_num = result2.fetchall()
- tuple_tableName_IdNum[x[0]] = ID_num"""
- def readOracle_writeES(tableName, ES_index, index_type):
- global cc
- cur = conn1.cursor()
- #result_AlltableNames = cur.execute("select * from tabs")
- result_latestId = cur.execute("select max({}_Id) from {} ".format(tableName,tableName))
- num1 = result_latestId.fetchone() #當前表中的最大ID
- print '當前表中的最大ID{}'.format(num1[0])
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName.upper())) #通過數據庫表拿到更新的ID tablename 都轉化成大寫
- num2 = result_rememberId.fetchone() #上次記錄的更新ID
- print '上次記錄的更新ID{}'.format(num2[0])
- if tableName.upper() == 'T_SOCIAL':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,likeNum,forwardNum,commentNum,accountCode from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() #之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: #一條一條寫入ES,這個速度太慢,改進 通過bulk接口導入
- aa= (i[5]+i[6])
- bb= (i[7]+i[8])
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),
- 'content': unicode(i[3]), 'publishTime': str(i[4]), 'browseNum': aa,
- 'commentNum':bb, 'dataType':fetch_account(i[9])}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0] #如果寫入成功才賦值
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName,index_type)
- if tableName.upper() == 'T_HOTSEARCH':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,accountCode,title,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: #一條一條寫入ES,這個速度太慢,改進 通過bulk接口導入
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': '','content': '', 'publishTime': str(i[3]), 'browseNum': 0,
- 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName, index_type)
- if tableName.upper() == 'T_VIDEO_HOT':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime from {} where {}_ID > {} and rownum<=40 ".format(tableName,tableName,tableName,num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
- 'content': '', 'publishTime': str(i[4]), 'browseNum': 0,
- 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type, bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- if tableName.upper() == 'T_PRESS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute(
- "select {}_ID,accountCode,title,Author,PublishDate,Content from {} where {}_ID > {} and rownum<=40 ".format(
- tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
- 'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': 0,
- 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- if tableName.upper() == 'T_INDUSTRY':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute(
- "select {}_ID,accountCode,title,Author,PublishTime,Content,BrowseNum from {} where {}_ID > {} and rownum<=40 ".format(
- tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]),'author': unicode(i[3]),
- 'content': unicode(i[5]), 'publishTime': str(i[4]), 'browseNum': i[6],
- 'commentNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True) : # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- if tableName.upper() == 'T_SOCIAL_SITESEARCH':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute('select {}_ID,title,author,content,publishTime,keyWords,browseNum,likeNum,forwardNum,commentNum,url,accountCode from {} where ({}_ID > {})'.format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchmany(50) #因為數據量太大,超過了變量的內存空間,所以一次性取40條
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- popularity = (i[6] + i[7] + i[8] * 2 + i[9] * 2)
- if conn.index(
- {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
- 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(i[5]),
- 'popularity':popularity,'url': i[10],
- 'dataType':fetch_account(i[11])}, ES_index, index_type, bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- if tableName.upper() == 'T_REALTIME_NEWS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- popularity = (i[5] + i[6] * 2)
- if conn.index(
- {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
- 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),
- 'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName, index_type)
- if tableName.upper() == 'T_KEY_NEWS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- popularity = (i[5] + i[6] * 2)
- if conn.index(
- {'tableName': tableName,'tableId':i[0],'title': unicode(i[1]),'author':unicode(i[2]),
- 'content':unicode(i[3]),'publishTime':str(i[4]),'keyWords':unicode(''),
- 'popularity':popularity,'url': i[8],'dataType':fetch_account(i[7])}, ES_index, index_type, bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName, index_type)
- if tableName.upper() == 'T_LOCAL_NEWS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,title,author,content,publishTime,browseNum,commentNum,accountCode,url from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- popularity = (i[5] + i[6] * 2)
- if conn.index(
- {'tableName': tableName, 'tableId': i[0], 'title': unicode(i[1]), 'author': unicode(i[2]),
- 'content': unicode(i[3]), 'publishTime': str(i[4]), 'keyWords': unicode(''),
- 'popularity': popularity, 'url': i[8], 'dataType': fetch_account(i[7])}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName, index_type)
- if tableName.upper() == 'T_VIDEO_SITESEARCH':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute("select {}_ID,accountCode,title,Author,publishTime,url,imgUrl,playNum,keyWords from {} where {}_ID > {} and rownum<=40 ".format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 之前是因為數據量太大,超過了變量的內存空間,所以用fetchmany取40條 后來大神建議數據庫中限制查詢數 然后fetchall,這樣查詢更有效率
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index(
- {
- 'tableName': tableName, 'tableId': i[0], 'title': unicode(i[2]), 'author': unicode(i[3]),
- 'summary': unicode('0'), 'publishTime': str(i[4]), 'browseNum': i[7],'url':i[5],'imgUrl':i[6],'ranking':0,
- 'playNum': 0, 'dataType': fetch_account(i[1])}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute(
- "select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀{}寫成功".format(tableName,index_type)
- if tableName.upper() == 'T_BASE_KEYWORDS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute('select {}_ID,keywords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName, num2[0]))
- result_tuple1 = result_readOracle.fetchall() #因為數據量太大,超過了變量的內存空間,所以一次性取40條
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index({'id': i[0], 'keywords': i[1]}, ES_index, index_type,bulk=True): # 將數據寫入索引pom的spiderInfo
- cc += 1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId,tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- if tableName.upper() == 'T_BASE_SENSITIVEWORDS':
- while num2[0] < num1[0]:
- result_readOracle = cur.execute('select {}_ID,SensitiveType,SensitiveTopic,SensitiveWords from {} where {}_ID > {} and rownum<=50'.format(tableName, tableName, tableName,num2[0]))
- result_tuple1 = result_readOracle.fetchall() # 因為數據量太大,超過了變量的內存空間,所以一次性取40條
- for i in result_tuple1: # 一條一條寫入ES,這個速度太慢,強烈需要改進 通過bulk接口導入?
- if conn.index({'id':i[0],
- 'sensitiveType':unicode(i[1]),
- 'sensitiveTopic': unicode(i[2]),
- 'sensitiveWords':unicode(i[3])}, ES_index, index_type, bulk=True): # 將數據寫入索引pom的spiderInfo
- cc +=1
- print 'bulk導入后的ID:{}'.format(i[0])
- rememberId = i[0]
- cur.execute("update T_REMEMBERID set tableId = {} where tableName = '{}'".format(rememberId, tableName))
- conn1.commit()
- result_rememberId = cur.execute("select tableId from T_REMEMBERID where tableName='{}'".format(tableName)) # 通過數據庫表拿到更新的ID
- num2 = result_rememberId.fetchone()
- print "{}讀寫成功".format(tableName)
- else:
- pass
- def ww(a):
- while True:
- print a
- time.sleep(0.5) #用於多線程的一個實驗函數
- if __name__ == "__main__":
- cc = 0
- connect_ES('172.17.5.66:9200')
- # conn.indices.delete_index('_all') # 清除所有索引
- create_ESindex("pom", "spiderInfo", "involveVideo", "involveCeefax","keyWord","sensitiveWord")
- connect_Oracle("pom", "Bohui@123", "172.17.7.118:1521/ORCL")
- # thread.start_new_thread(readOracle_writeES,("T_SOCIAL","pom","spiderInfo"),)#創建一個多線程
- # thread.start_new_thread(readOracle_writeES,("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"),)#創建一個多線程
- mm = time.clock()
- readOracle_writeES("T_SOCIAL", "pom", "spiderInfo") #表名雖然在程序中設置了轉化為大寫,但是還是全大寫比較好
- readOracle_writeES("T_HOTSEARCH", "pom", "spiderInfo")
- readOracle_writeES("T_VIDEO_HOT", "pom", "spiderInfo")
- readOracle_writeES("T_PRESS", "pom", "spiderInfo")
- readOracle_writeES("T_INDUSTRY", "pom", "spiderInfo")
- readOracle_writeES("T_VIDEO_SITESEARCH", "pom", "involveVideo")
- readOracle_writeES("T_REALTIME_NEWS", "pom", "involveCeefax")
- readOracle_writeES("T_KEY_NEWS", "pom", "involveCeefax")
- readOracle_writeES("T_LOCAL_NEWS", "pom", "involveCeefax")
- readOracle_writeES("T_SOCIAL_SITESEARCH", "pom", "involveCeefax")
- readOracle_writeES("T_BASE_KEYWORDS", "pom", "keyWord")
- readOracle_writeES("T_BASE_SENSITIVEWORDS", "pom", "sensitiveWord")
- nn = time.clock()
- # conn.indices.close_index('pom')
- conn1.close()
- print '數據寫入耗時:{} 成功寫入數據{}條'.format(nn-mm,cc)
- #實驗多線程
- """
- while a < 100:
- conn.index(
- {'tableName': 'T_base_account', 'type': '1', 'tableId': '123', 'title': unicode('陳龍'), 'author': 'ABC',
- 'content': 'ABC', 'publishTime': '12:00:00', 'browseNum': '12', 'commentNum': '12', 'dataType': '1'},
- "pom", "spiderInfo", ) # 將數據寫入索引pom的spiderInfo
- a += 1
- print time.ctime()
- """
- """
- threads = []
- t1 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL","pom","spiderInfo"))
- threads.append(t1)
- #t3 = threading.Thread(target=ww,args=(10,))
- #threads.append(t3)
- #t2 = threading.Thread(target=readOracle_writeES,args=("T_SOCIAL_SITESEARCH", "pom", "spiderInfo"))
- #threads.append(t2)
- print time.ctime()
- for t in threads:
- t.setDaemon(True)
- t.start()
- t.join()
- """
五、編譯過程的問題
1、直接print游標cur.execute ( ) 將不能得到我們想要的結果
result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
print result2
返回:<__builtin__.OracleCursor on <cx_Oracle.Connection to pom@172.17.7.118:1521/ORCL>>
result2 = cur.execute('select T_SOCIAL_ID from T_SOCIAL')
print result2
num = result2.fetchall()
print num
for i in num:
print i[0]
返回:[(55,), (56,), (57,), (58,), (59,), (60,), (61,), (62,), (63,), (64,), (65,), (66,), (67,), (68,), (69,), (70,)]
55
注意:用fetchall()得到的數據為:[(55,), (56,), (57,), (58,), (59,)] 元組而不是數字。
用 變量[num] 的方式取出具體的數值
2、cx_Oracle中文編碼亂碼問題
顯示中文亂碼:������DZ��� �����������
或者顯示未知的編碼:('\xce\xd2\xd5\xe6\xb5\xc4\xca\xc7\xb1\xea\xcc\xe2',)
需要注意一下幾個地方,將數據庫中的中文編碼轉化成utf-8編碼,並將中文寫入elasticsearch
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #中文編碼
reload(sys) #默認編碼設置為utf-8 一定需要reload(sys)
sys.setdefaultencoding('utf-8')
'title':unicode('中文')
python傳遞給js的列表中文亂碼怎么解決?
json.dumps(dictionary,ensure_ascii=False)
3、遠程連接不上Oracle數據庫的問題
第一:確保connect()中各個參數的值都正確。例如
conn1 = cx_Oracle.connect("username","password","172.17.7.118:1521/ORCL") #連接遠程數據庫
conn1 = cx_Oracle.connect('username','password','localhost:1521/ORCL') #連接本地數據庫
conn2 = pyes.ES('127.0.0.1:9200') #連接ES
第二:確保安裝的版本都符合要求,包括模塊的版本。
4、提示TypeError: 'NoneType' object is not callable
確保mapping中的各個字段類型都設置正確
檢查索引和映射是否都書寫正確
5、腳本同時讀取多個數據庫表
涉及到Python中多線程的問題,給每一個表起一個線程,同時給每一個線程加鎖
編譯時碰到問題:AssertionError: group argument must be None for now(檢查函數是否書寫正確,讀寫沖突)
AttributeError: 'builtin_function_or_method' object has no attribute 'setDaemon'
cx_Oracle.ProgrammingError: LOB variable no longer valid after subsequent fetch(fetchall數據量過大,溢出 設置一次取數據庫中 rownum數)
TypeError: 'NoneType' object has no attribute '__getitem__' (注意數據庫查詢對應的大小寫)
No handlers could be found for logger "pyes" 可能是連接超時
AttributeError: 'tuple' object has no attribute 'append' tuple不能直接用append
TypeError: 'tuple' object does not support item assignment tuple不能賦值
數據庫批量讀取
就多線程問題咨詢了大神,大神建議用多進程來實現會比較簡單
6、腳本定時觸發問題
Linux crontab定時執行任務,crontab防止腳本周期內未執行完重復執行
7、單實例的問題。防止腳本沒有執行完再次觸發
剛開始設想在腳本中完成,后來知道這個可以在系統中設定
8、數據同步插件
網上有大量的關於同步關系型數據庫的有關插件 logstash-input-jdbc 不太好安裝,不知道如何使用。
MySQL和ES同步插件的介紹,例如elasticsearch-river-jdbc
在這兒啟用的是bulk接口,批量導入。數據同步的速度大大提高
9、判斷數據是否同步成功
這個是之前一直沒有注意的問題,但其實在數據傳輸的時候是非常重要的。
目前的判斷方法是看ES中的數據量到底有多少,然后對照統計量進行判斷分析,,這也是在后期發現有部分數據沒有同步過去的方法。
10、統計寫入了多少數據
UnboundLocalError: local variable 'cc' referenced before assignment
定義了全局變量cc,但是在局部進行了修改,所以報錯 修改同名的全局變量,則認為是一個局部變量
五、源碼改進
因為數據寫入的速度太慢(40條數據 800Kb大小 寫入花費2S左右),所有在原來的基礎上,修改了讀取數據庫中未寫入內容的策略和ES寫入的策略。
插入完成的源碼
調試問題:
1、pip install elasticsearch 引入helpers函數模塊,使用bulk函數批量導入。
2、AttributeError: 'ES' object has no attribute 'transport' 因為原來使用的是pyes模塊 現在換成了elasticsearch,所以改成對應模塊
conn2 = Elasticsearch(
"127.0.0.1:9200")
其他常見錯誤
SerializationError:JSON數據序列化出錯,通常是因為不支持某個節點值的數據類型
RequestError:提交數據格式不正確
ConflictError:索引ID沖突
TransportError:連接無法建立
最后通過了解其實是找到了數據同步的插件 logstash-input-jdbc 能夠實現數據的同步增刪改查,按照網上的教程能夠很輕松的實現,遇到的問題就是插件同步過去的字段都必須是小寫。
------------
Python中cx_Oracle的一些函數:
commit() 提交
rollback() 回滾
rollback() 回滾
cursor用來執行命令的方法:
callproc(self, procname, args):用來執行存儲過程,接收的參數為存儲過程名和參數列表,返回值為受影響的行數
execute(self, query, args):執行單條sql語句,接收的參數為sql語句本身和使用的參數列表,返回值為受影響的行數
executemany(self, query, args):執行單挑sql語句,但是重復執行參數列表里的參數,返回值為受影響的行數
nextset(self):移動到下一個結果集
cursor用來接收返回值的方法:
fetchall(self):接收全部的返回結果行.
fetchmany(self, size=None):接收size條返回結果行.如果size的值大於返回的結果行的數量,則會返回cursor.arraysize條數據.
fetchone(self):返回一條結果行.
callproc(self, procname, args):用來執行存儲過程,接收的參數為存儲過程名和參數列表,返回值為受影響的行數
execute(self, query, args):執行單條sql語句,接收的參數為sql語句本身和使用的參數列表,返回值為受影響的行數
executemany(self, query, args):執行單挑sql語句,但是重復執行參數列表里的參數,返回值為受影響的行數
nextset(self):移動到下一個結果集
cursor用來接收返回值的方法:
fetchall(self):接收全部的返回結果行.
fetchmany(self, size=None):接收size條返回結果行.如果size的值大於返回的結果行的數量,則會返回cursor.arraysize條數據.
fetchone(self):返回一條結果行.
scroll(self, value, mode='relative'):移動指針到某一行.如果mode='relative',則表示從當前所在行移動value條,如果 mode='absolute',則表示從結果集的第一行移動value條.
MySQL中關於中文編碼的問題
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python') 中加一個屬性:
conn = MySQLdb.Connect(host='localhost', user='root', passwd='root', db='python',charset='utf8')
charset是要跟你數據庫的編碼一樣,如果是數據庫是gb2312 ,則寫charset='gb2312'。