一、現象及異常
測試up1集群:
create database 操作 200s 多一點。
# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.141:9084
hive> create database wgtestdb_region3_1;
OK
Time taken: 200.826 seconds
測試up2集群:
# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.160:9084
hive> create database wgtestdb_region2_2;
OK
Time taken: 0.112 seconds
可以看到,當測試 up1 集群出現 HMS canary 異常時,up1 集群的 create/drop 操作時間基本都穩定在 200s 多一點。
同時 HMS 和 Sentry 服務端日志都出現一些異常。
HMS 異常日志:
2021-02-08 16:54:17,910 ERROR org.apache.sentry.core.common.transport.RetryClientInvocationHandler: [pool-5-thread-670]: failed to execute syncNotifications
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.sentry.core.common.transport.RetryClientInvocationHandler.invokeImpl(RetryClientInvocationHandler.java:95)
at org.apache.sentry.core.common.transport.SentryClientInvocationHandler.invoke(SentryClientInvocationHandler.java:41)
at com.sun.proxy.$Proxy26.syncNotifications(Unknown Source)
at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.syncNotificationEvents(SentrySyncHMSNotificationsPostEventListener.java:153)
at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.onDropDatabase(SentrySyncHMSNotificationsPostEventListener.java:113)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier$13.notify(MetaStoreListenerNotifier.java:69)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:167)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:197)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:235)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database_core(HiveMetaStore.java:1193)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database(HiveMetaStore.java:1229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99)
at com.sun.proxy.$Proxy11.drop_database(Unknown Source)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9624)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9608)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:110)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:118)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: timed out wait request for id 184474148. Server Stacktrace: java.util.concurrent.TimeoutException
at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at org.apache.sentry.service.thrift.Status.throwIfNotOk(Status.java:109)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl.syncNotifications(SentryPolicyServiceClientDefaultImpl.java:824)
... 35 more
Sentry 異常日志:
2021-02-08 15:29:09,028 WARN org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor: timed out wait request for id 184444094
java.util.concurrent.TimeoutException
at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
日志異常大概原因是 HMS 向 Sentry 同步消息時出現延遲,Sentry 服務處理不過來出現超時。
二、原因分析
HMS 會實時向 Sentry 同步 Notifications 請求,當需要大批同步消息需要處理,后台線程處理不過來,消息壓滯就會出現這個異常。這個異常不影響集群正常使用,只是會導致create, drop 等操作慢,需要等待 200s,等待的目的也是為了追上最新的 id。
下面是 sentry 和 hive 的消息同步的元數據信息,現象是 sentry 元數據的 SENTRY_HMS_NOTIFICATION_ID 表一直沒有更新,而 hive 元數據 NOTIFICATION_SEQUENCE 表一直在更新,也就是說 sentry 消費 HMS 端的信息不及時出現了滯后情況。此時 Hive HMS 便會出現 canary 異常,導致上面 create database 操作的時間基本都是在 200.**s 左右,這個參數是由 sentry.notification.sync.timeout.ms(200s) 參數控制的,這也能解釋為什么 create/drop 操作時間都在 200s 多一點。
# sentry 元數據信息
mysql -uroot -p123456
use sentry;
mysql> select * from SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 5;(一段時間不更新,此時 HMS canary 異常)
+-----------------+
| NOTIFICATION_ID |
+-----------------+
| 184485024 |
| 184485023 |
| 184485023 |
| 184485022 |
| 184485021 |
+-----------------+
5 rows in set (0.00 sec)
# hive 元數據信息
use hive_warehouse;
MySQL [hive_warehouse]> select * from NOTIFICATION_SEQUENCE limit 10;(持續更新)
+--------+---------------+
| NNI_ID | NEXT_EVENT_ID |
+--------+---------------+
| 1 | 184486656 |
+--------+---------------+
1 row in set (0.00 sec)
三、源碼分析
Hive 中 create/drop/alter 等操作都會向 Sentry Server 發送 Notification 請求,這里以 create_table 操作為例。
從源碼可以看出 create_table 操作會通過 MetaStoreListenerNotifier 類的靜態方法 notifyEvent() 發送事件請求。
//位置:org/apache/hadoop/hive/metastore/HiveMetaStore.java
private void create_table_core(final RawStore ms, final Table tbl,
final EnvironmentContext envContext)
throws AlreadyExistsException, MetaException,
InvalidObjectException, NoSuchObjectException {
...
if (!transactionalListeners.isEmpty()) {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.CREATE_TABLE,
new CreateTableEvent(tbl, true, this),
envContext);
}
...
}
@Override
public void create_table(final Table tbl) throws AlreadyExistsException,
MetaException, InvalidObjectException {
create_table_with_environment_context(tbl, null);
}
具體發送事件請求方式如下,可以看到這里會發送所有繼承自 MetaStoreEventListener 類的事件請求。
//位置:org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
/**
* Notify a list of listeners about a specific metastore event. Each listener notified might update
* the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
* be returned to the caller.
*
* @param listeners List of MetaStoreEventListener listeners.
* @param eventType Type of the notification event.
* @param event The ListenerEvent with information about the event.
* @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
* map if no parameters were updated or if no listeners were notified.
* @throws MetaException If an error occurred while calling the listeners.
*/
public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
EventType eventType,
ListenerEvent event) throws MetaException {
Preconditions.checkNotNull(listeners, "Listeners must not be null.");
Preconditions.checkNotNull(event, "The event must not be null.");
for (MetaStoreEventListener listener : listeners) {
notificationEvents.get(eventType).notify(listener, event);
}
// Each listener called above might set a different parameter on the event.
// This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
// method calls.
return event.getParameters();
}
HMS 發送事件請求后 Sentry Server 是如何接收的呢?Sentry 會通過 SentrySyncHMSNotificationsPostEventListener 類的 syncNotificationEvents() 方法對所有的 DDL 事情操作進行同步,交給 Sentry Server 處理。
//位置:org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
/**
* This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
* whenever a DDL event happens on the Hive metastore.
*/
public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
/**
* It requests the Sentry server the synchronization of recent notification events.
*
* After the sync call, the latest processed ID will be stored for future reference to avoid
* syncing an ID that was already processed.
*
* @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request.
*/
private void syncNotificationEvents(ListenerEvent event, String eventName) {
// Do not sync notifications if the event has failed.
if (failedEvent(event, eventName)) {
return;
}
Map<String, String> eventParameters = event.getParameters();
if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
return;
}
/* If the HMS is running in an active transaction, then we do not want to sync with Sentry
* because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
* forever or until a read time-out happens. */
if (isMetastoreTransactionActive(eventParameters)) {
return;
}
long eventId =
Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
// This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
// already processed the requested eventId.
if (eventId <= latestProcessedId.get()) {
return;
}
try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
LOGGER.debug("Finishedd Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
updateProcessedId(sentryLatestProcessedId);
} catch (Exception e) {
// This error is only logged. There is no need to throw an error to Hive because HMS sync is called
// after the notification is already generated by Hive (as post-event).
LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
}
}
}
Sentry Server 會對通過過來的 HMS 事件請求進行處理,即交由 SentryPolicyStoreProcessor 類處理,這里的 waitFor() 方法是關鍵,會等待 Sentry Server 處理對應的 HMS 請求,如果事件處理不及時會出現超時異常,也就是第一小節中 HMS 日志中的異常。
//位置:org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@Override
public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
throws TException {
TSentrySyncIDResponse response = new TSentrySyncIDResponse();
try (Timer.Context timerContext = hmsWaitTimer.time()) {
// Wait until Sentry Server processes specified HMS Notification ID.
response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
response.setStatus(Status.OK());
} catch (InterruptedException e) {
String msg = String.format("wait request for id %d is interrupted",
request.getId());
LOGGER.error(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
String msg = String.format("timed out wait request for id %d", request.getId());
LOGGER.warn(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
}
return response;
}
waitFor() 方法的具體邏輯,無非就是對接收的 HMS 事件 id 與 Sentry Server 中的最新 id (即 Sentry 元數據 SENTRY_HMS_NOTIFICATION_ID 表的最新值)進行比較,並進行更新。
//位置:org/apache/sentry/service/thrift/CounterWait.java
/**
* Wait for specified counter value.
* Returns immediately if the value is reached or blocks until the value
* is reached.
* Multiple threads can call the method concurrently.
*
* @param value requested counter value
* @return current counter value that should be no smaller then the requested
* value
* @throws InterruptedException if the wait was interrupted, TimeoutException if
* wait was not successfull within the timeout value specified at the construction time.
*/
public long waitFor(long value) throws InterruptedException, TimeoutException {
// Fast path - counter value already reached, no need to block
if (value <= currentId.get()) {
return currentId.get();
}
// Enqueue the waiter for this value
ValueEvent eid = new ValueEvent(value);
waiters.put(eid);
// It is possible that between the fast path check and the time the
// value event is enqueued, the counter value already reached the requested
// value. In this case we return immediately.
if (value <= currentId.get()) {
return currentId.get();
}
// At this point we may be sure that by the time the event was enqueued,
// the counter was below the requested value. This means that update()
// is guaranteed to wake us up when the counter reaches the requested value.
// The wake up may actually happen before we start waiting, in this case
// the event's blocking queue will be non-empty and the waitFor() below
// will not block, so it is safe to wake up before the wait.
// So sit tight and wait patiently.
eid.waitFor();
LOGGER.debug("CounterWait added new value to waitFor: value = {}, currentId = {}", value, currentId.get());
return currentId.get();
}
至此,Sentry Server 已對 HMS 的一次事件請求進行了處理。
四、解決措施/建議
1、適當調小 sentry.notification.sync.timeout.ms 參數
該參數默認是 200s,調小該參數,可適當減小 create/drop/alter 等操作的等待時間,消息積壓不多的情況可以選擇這種方式讓 sentry 自行消費處理掉。
Cloudera 修改 Sentry 服務的參數配置:

修改參數后重啟 Sentry 服務,發現 HMS 出現 canary 異常后超時時間在 50s 多一點,說明參數生效。

2、監控 sentry 元數據 SENTRY_HMS_NOTIFICATION_ID 信息
直接獲取 sentry 元數據 SENTRY_HMS_NOTIFICATION_ID 表的最新記錄,如果沒有更新則表示消息出現了滯后,此時 HMS 必會出現 canary 異常。
mysql> select * from SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 1;
+-----------------+
| NOTIFICATION_ID |
+-----------------+
| 184490926 |
+-----------------+
1 row in set (0.00 sec)
3、更新 sentry 消息同步記錄
如果消息積壓的太多,sentry 慢慢消費的時間太長的話,可能一直追不上 HMS 的最新 id,此時可以選擇丟掉這些信息,具體操作在 sentry 元數據的 SENTRY_HMS_NOTIFICATION_ID 表中插入一條最大值(該最大值等於當前消息的 id 值,從 hive 元數據的 NOTIFICATION_SEQUENCE 表中獲取 ),重啟 sentry 服務。
use sentry;
insert into SENTRY_HMS_NOTIFICATION_ID values(184472866);
更新后 create 操作時間正常
