背景
Elasticsearch版本前兩天升級到了7.x,每次打印日志都提示了[[types removal] Specifying types in bulk requests is deprecated.]警告,網上查了一通,發現是7.x版本后,類型已經棄用,在CMRESHandler的Issues中看到了遇到同樣問題的朋友,並向作者提交了一個PR,目前還是未合並狀態,所以自己先本地進行重寫,后續等作者合並代碼並發布最新版本后,再使用原生的
Issues鏈接
https://github.com/cmanaha/python-elasticsearch-logger/issues/76
PR鏈接
https://github.com/cmanaha/python-elasticsearch-logger/pull/79
本地重寫方案
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection
try:
from requests_kerberos import HTTPKerberosAuth, DISABLED
CMR_KERBEROS_SUPPORTED = True
except ImportError:
CMR_KERBEROS_SUPPORTED = False
try:
from requests_aws4auth import AWS4Auth
AWS4AUTH_SUPPORTED = True
except ImportError:
AWS4AUTH_SUPPORTED = False
class PrivateCMRESHandler(CMRESHandler):
'''
重寫CMRESHandler下的__get_es_client方法和flush方法,不指定es_doc_type,修復[[types removal] Specifying types in bulk requests is deprecated.]警告
'''
def __get_es_client(self):
if self.auth_type == PrivateCMRESHandler.AuthType.NO_AUTH:
if self._client is None:
self._client = Elasticsearch(hosts=self.hosts,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
if self.auth_type == PrivateCMRESHandler.AuthType.BASIC_AUTH:
if self._client is None:
return Elasticsearch(hosts=self.hosts,
http_auth=self.auth_details,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
if self.auth_type == PrivateCMRESHandler.AuthType.KERBEROS_AUTH:
if not CMR_KERBEROS_SUPPORTED:
raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"")
# For kerberos we return a new client each time to make sure the tokens are up to date
return Elasticsearch(hosts=self.hosts,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED),
serializer=self.serializer)
if self.auth_type == PrivateCMRESHandler.AuthType.AWS_SIGNED_AUTH:
if not AWS4AUTH_SUPPORTED:
raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"")
if self._client is None:
awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es')
self._client = Elasticsearch(
hosts=self.hosts,
http_auth=awsauth,
use_ssl=self.use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
serializer=self.serializer
)
return self._client
raise ValueError("Authentication method not supported")
def flush(self):
""" Flushes the buffer into ES
:return: None
"""
if self._timer is not None and self._timer.is_alive():
self._timer.cancel()
self._timer = None
if self._buffer:
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=self.__get_es_client(),
actions=actions,
stats_only=True
)
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception
調用
# 添加 CMRESHandler
es_handler = PrivateCMRESHandler(hosts=[{'host': self.ELASTIC_SEARCH_HOST, 'port': self.ELASTIC_SEARCH_PORT}],
# 可以配置對應的認證權限
auth_type=PrivateCMRESHandler.AuthType.BASIC_AUTH,
auth_details=self.AUTH_DETAILS,
es_index_name=self.ELASTIC_SEARCH_INDEX,
# 一個月分一個 Index
index_name_frequency=PrivateCMRESHandler.IndexNameFrequency.MONTHLY,
# 額外增加環境標識
es_additional_fields={'environment': self.APP_ENVIRONMENT}
)
es_handler.setLevel(level=self.es_output_level)
formatter = self.formatter
es_handler.setFormatter(formatter)
self.logger.addHandler(es_handler)
另外還找到一個已修復該問題的第三方庫
鏈接
https://github.com/IMInterne/python-elasticsearch-ecs-logger
安裝
pip install ElasticECSHandler