來!自己動手實現一個loghub(或kafka)分片消費負載均衡器


  一般地,像kafka之類的消息中間件,作為一個可以保持歷史消息的組件,其消費模型一般是主動拉取方式。這是為了給消費者足夠的自由,回滾或者前進。

  然而,也正是由於將消費消息的權力交給了消費者,所以,消費者往往需要承擔更多的責任。比如:需要自行保存消費偏移量,以便后續可以知道從哪里繼續。而當這一點處理不好時,則可能帶來一些麻煩。

  不管怎么樣,解決方案也都是現成的,咱們也不用擔心。

 

  今天我們要談論的是一個場景: 如何讓n個機器消費m個分片數據?(帶狀態的,即不能任意機器消費任意shard)

 

  這在消息中間件的解決方案里,明白地寫着,使用消費者群組就可以實現了。具體來說就是,每個分片至多會被一機器消費,每個機器則可以消費多個分片數據。即機器數據小於分片數時,分片會被均衡地分配到消費者中。當機器數大於分片數時,多余的機器將不做任何事情。

  好吧,既然官方已經說明白了,那咱們應該就不再需要自己搞一個輪子了吧。

  但是,我還有個場景:如果我要求在機器做負載重平衡時,需要保證被抽取出去的機器分片,至少保留一段時間,不允許任何機器消費該分片,因為可能還有數據需要備份。

  針對這種場景,我想官方也許是有提供回調函數之類的解決方案的吧。不管了,反正我沒找到,只能自己先造個輪子了。

 

本文場景前提:

  1. 使用loghub作為消息中間件(原理同kafka);
  2. 整個數據有m個分片shard;
  3. 整個消費者集群有n台機器;
  4. 每個分片的數據需要集中到一機器上做有狀態處理;
  5. 可以借助redis保存有狀態數據,以便消費者機器做優雅停機;

  最簡單的方案是,使 n=m, 每台機器消費一個shard, 這樣狀態永遠不會錯亂。

  但是這樣明顯可擴展能力太差了!

    比如有時數據量小了,雖然分片還在,但是完全不用那么多機器的時候,如何縮減機器?
    比如由於數據壓力大了,我想增加下分片數,以提高發送者性能,但是消費者我還不想理他,消費慢點無所謂?

  其實,我們可以使用官方的消費者群組方法,可以動態縮減機器。

  但是這個有狀態就比較難做到了。

  以上痛點,總結下來就是,可擴展性問題。

 

想象中的輪子是怎么樣的?

  1. 需要有個注冊中心,管理機器的上下線監控;
  2. 需要有負載均衡器,負載將shard的負載均衡的分布到在線機器中;
  3. 需要有每個機器自己消費的分片記錄,以使機器自身有據可查;
  4. 需要有每個分片的消費情況,以判定出哪些分片已分配給哪些機器;

 

我們來細看下實現:

【1】均衡協調器主框架:

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.test.common.config.LogHubProperties;
import com.test.utils.RedisPoolUtil;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.test.dispatcher.work.RedisKeyConstants.MAX_CONSUMER_SHARD_LOAD;


/**
 * loghub動態消費者 shard分配shard 協調器
 *
 */
public class LoghubConsumerShardCoWorker implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(LoghubConsumerShardCoWorker.class);

    private LogHubProperties logHubProperties;

    private RedisPoolUtil redisPoolUtil;

    private Client mClient;

    private ShardAssignMaster shardAssignMaster;

    private String HOST_NAME;

    public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties) {
        this(redisPoolUtil, logHubProperties, null);
    }

    public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties, String hostName) {
        this.redisPoolUtil = redisPoolUtil;
        this.logHubProperties = logHubProperties;
        this.HOST_NAME = hostName;

        initSharedVars();
        initConsumerClient();
        initShardAssigner();
        getAllShardList();
        registerSelfConsumer();
        startHeartBeatThread();
    }

    /**
     * 開啟心跳線程,保活
     */
    private void startHeartBeatThread() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> {
            String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME;
            redisPoolUtil.expire(serverConsumeCacheKey, 30);
            shardAssignMaster.sendHeartbeat(HOST_NAME);
        }, 30, 25, TimeUnit.SECONDS);
    }

    /**
     * 初始化客戶端實例
     */
    private void initConsumerClient() {
        this.mClient = new Client(logHubProperties.getEndpoint(),
                logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());
    }

    /**
     * 初始化分片分配控制器
     */
    private void initShardAssigner() {
        shardAssignMaster = new ShardAssignMaster(redisPoolUtil);
    }

    /**
     * 初始化公共變量
     */
    private void initSharedVars() {
        try {
            if(HOST_NAME != null) {
                return;
            }
            HOST_NAME = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            logger.error("init error : 獲取服務器主機名失敗", e);
            throw new RuntimeException("init error : 獲取服務器主機名失敗");
        }
    }

    /**
     * 將自己作為消費者注冊到消費者列表中,以判定后續可以進行消費
     */
    private void registerSelfConsumer() {
        shardAssignMaster.registerConsumer(HOST_NAME);
        shardAssignMaster.sendHeartbeat(HOST_NAME);
    }

    @Override
    public void run() {
        try {
            checkConsumerSharding();
        }
        catch (Exception e) {
            logger.error("動態分配shard 發生異常", e);
        }
    }

    /**
     * job 只做一件事,即檢查 shard 的消費情況,不平衡則處理
     */
    private void checkConsumerSharding() {
        try {
            if (tryCoWorkerLock()) {
                // step1. 檢查是否需要進行shard分配
                // 集群消費loghub數據動態伸縮策略
                // 1. 啟動時先去獲取部分片數,備用;
                // 2. 應用啟動后,把自己注冊到注冊中心或redis中;
                // 3. 根據注冊上來的機器列表,按平均分配策略分配shard(只能由一個機器來分配,其他機器處理分布式鎖競爭失敗,等待狀態);
                // 4. 分配好后,釋放鎖,各機器開始消費,如機器A消費shard 0/3,則機器1以輪詢的方式依次從shard 0/3 摘取數據消費;
                // 5. 分配好的數據結構為:prefix+ip保存具體數據,另外將自己的key添加到另一個zset中,標識自己存活;自己的key有效期為30秒;使用另一維度 shard,保存每個shard被占用情況,使用hash保存,key為shard,value為當有占用時為機器ip或主機名,當無占用時為null或空串;
                // 6. 以上數據刷入,將在機器搶占到shard更新數據;shard總數信息暫時不允許在運行期間進行變更;(即如果變理shard必須重啟服務器)
                // 7. 機器下線時,占用的key將自動過期;(考慮是否主動刪除)
                // 8. 各機器上啟動一個后台掃描線程,每隔30秒掃描一次。掃描zset,取出所有值后查看是否存在相應的key,如果不存在說明機器已下線,需要重新分配其占用的shard;
                // 9. 重新分配策略,使用一致性hash算法實現;
                // 10. 機器上線時,使用一致性hash算法重新平衡shard;
                // 11. 使用分布式鎖保證分配進程只有一個;
                CheckConsumerShardingResultContainer resultContainer = checkShardConsumerReBalanceStatus();
                if(resultContainer.getStatusResultType() != ReBalanceStatusResultEnum.OK) {
                    reBalanceConsumerShard(resultContainer);
                }
            }
        }
        finally {
            releaseCoWorkerLock();
        }
    }

    /**
     * 確認機器和shard是否需要再平衡
     *
     * @return 結果狀態集
     */
    private CheckConsumerShardingResultContainer checkShardConsumerReBalanceStatus() {
        // step1. 檢查自身是否存在shard, 不存在則立即進行一次重分配(消費者機器數大於分片數時,重平衡動作將是無效動作)
        // step2. 檢查所有shard列表,是否有未被分配的shard,如有,立即觸發一次重分配
        // step3. 檢查是否有負荷比較高的機器,如有觸發平衡(功能預留,此功能需要基於統計信息)
        CheckConsumerShardingResultContainer resultContainer = new CheckConsumerShardingResultContainer();

        final List<String> activeServersList = shardAssignMaster.getAllOnlineServerList();
        final List<String> allShardList = getAllShardList();

        // 計算空閑機器
        Map<String, Integer> hostConsumeLoadCountMap = new HashMap<>();
        List<String> idleServerList = filterIdleServerList(activeServersList, hostConsumeLoadCountMap);

        // 計算未被分配的shard
        List<String> unAssignedShardList = filterUnAssignedShardList(allShardList);

        // 根據資源信息,得出目前的負載狀態
        ReBalanceStatusResultEnum statusResult = computeReBalanceStatusOnResources(
                                            unAssignedShardList, idleServerList, hostConsumeLoadCountMap);

        resultContainer.setAllServerList(activeServersList);
        resultContainer.setAllShardList(allShardList);
        resultContainer.setIdleServerList(idleServerList);
        resultContainer.setUnAssignedShardList(unAssignedShardList);
        resultContainer.setServerConsumeShardLoad(hostConsumeLoadCountMap);
        resultContainer.setStatusResultType(statusResult);
        return resultContainer;
    }

    /**
     * 根據給定資源信息,計算出目前的負載狀態
     *
     * @param unAssignedShardList 未分配的shard列表
     * @param idleServerList 空閑機器列表
     * @param hostConsumeLoadMap 機器消費計數容器(負載情況)
     * @return 狀態值
     */
    private ReBalanceStatusResultEnum computeReBalanceStatusOnResources(
                                            List<String> unAssignedShardList,
                                            List<String> idleServerList,
                                            Map<String, Integer> hostConsumeLoadMap) {
        // 沒有未分配的shard,檢測是否平衡即可
        // 0. 有空閑機器,則直接分配給空閑機器即可
        // 1. 最大消費shard-最小消費shard數 >= 2, 則說明有機器消費過多shard,需重分配
        // 2. 機器負載平衡,無須調整
        if(unAssignedShardList.isEmpty()) {
            int minConsume = MAX_CONSUMER_SHARD_LOAD;
            int maxConsume = 0;
            for (Map.Entry<String, Integer> entry : hostConsumeLoadMap.entrySet()) {
                int gotCount = entry.getValue();
                if(gotCount > maxConsume) {
                    maxConsume = gotCount;
                }
                if(gotCount < minConsume) {
                    minConsume = gotCount;
                }
            }

            // 因有未分配的機器,假如現有的機器消費都是2,則需要重分配的大壓力的機器 shard 給空閑機器
            if(!idleServerList.isEmpty()) {
                if (maxConsume > 1) {
                    return ReBalanceStatusResultEnum.HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED;
                }
            }

            // 有消費相差2的機器,重新分配,從大數上借調到小數上
            if(maxConsume > minConsume + 1) {
                return ReBalanceStatusResultEnum.HEAVY_LOAD_BALANCE_NEEDED;
            }
            return ReBalanceStatusResultEnum.OK;
        }

        // 有可用shard
        // 3. 有空閑機器,直接讓空閑shard分配給這些空閑機器就ok了
        // 4. 沒有空閑機器,須將空閑shard 分配給負載小的機器
        if(idleServerList.isEmpty()) {
            return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS;
        }
        return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS;
    }

    /**
     * 過濾出空閑的機器列表
     *
     * @param activeServersList 所有機器列表
     * @return 空閑機器集, 且將各自消費數放入計數容器
     */
    private List<String> filterIdleServerList(List<String> activeServersList, Map<String, Integer> hostConsumeCountMap) {
        List<String> idleServerList = new ArrayList<>();
        for (String hostname1 : activeServersList) {
            if(!shardAssignMaster.isConsumerServerAlive(hostname1)) {
                shardAssignMaster.invalidateOfflineServer(hostname1);
                continue;
            }
            int consumeCount;
            Set<String> consumeShardSet = shardAssignMaster.getServerDutyConsumeShardSet(hostname1);
            if(consumeShardSet == null || consumeShardSet.isEmpty()) {
                idleServerList.add(hostname1);
                consumeCount = 0;
            }
            else {
                consumeCount = consumeShardSet.size();
            }
            hostConsumeCountMap.put(hostname1, consumeCount);
        }
        return idleServerList;
    }


    /**
     * 過濾出未分配的shard列表
     *
     * @param allShardList 所有shard
     * @return 未分配的shard
     */
    private List<String> filterUnAssignedShardList(List<String> allShardList) {
        List<String> unAssignedShardList = new ArrayList<>();
        for (String shardId1 : allShardList) {
            String consumeHostname = shardAssignMaster.getShardAssignedServer(shardId1);
            // 如果不為空,則之前分配過,檢查機器是否下線
            // 如果為空,則是第一次分配
            if(!StringUtils.isBlank(consumeHostname)) {
                if(!shardAssignMaster.isConsumerServerAlive(consumeHostname)) {
                    // 清除下線機器信息,將當前shard置為空閑
                    shardAssignMaster.invalidateOfflineServer(consumeHostname);
                    shardAssignMaster.invalidateShardAssignInfo(shardId1);
                    unAssignedShardList.add(shardId1);
                }
            }
            else {
                unAssignedShardList.add(shardId1);
            }
        }
        return unAssignedShardList;
    }

    /**
     * 嘗試獲取協調者協調鎖
     *
     *         在集群環境中,只允許有一個協調器在運行
     *
     * @return true:成功, false:失敗,不得進行協調分配工作
     */
    private boolean tryCoWorkerLock() {
        return redisPoolUtil.getDistributedLock("distributedLock", HOST_NAME, 30);
    }

    /**
     * 釋放協調鎖,以便下次再競爭
     */
    private void releaseCoWorkerLock() {
        redisPoolUtil.releaseDistributedLock("distributedLock", HOST_NAME);
    }

    /**
     * 重新平衡消費者和shard的關系
     *
     * @param resultContainer 待重平衡狀態
     */
    private void reBalanceConsumerShard(CheckConsumerShardingResultContainer resultContainer) {

        // 集群消費loghub數據動態伸縮策略,根據負載狀態,調用相應策略進行重平衡
        StatusReBalanceStrategy strategy = StatusReBalanceStrategyFactory.createStatusReBalanceAlgorithm(resultContainer, shardAssignMaster);
        strategy.loadBalance();
    }


    /**
     * 獲取分片列表
     *
     * @return 分片列表,如: 0,1,2,3
     */
    private List<String> getAllShardList() {
        // 實時讀取列表
        List<String> shardList = Lists.newArrayList();
        try {
            ListShardResponse listShardResponse = mClient.ListShard(logHubProperties.getProjectName(),
                    logHubProperties.getEventlogStore());
            ArrayList<Shard> getShards = listShardResponse.GetShards();
            for (Shard shard : getShards) {
                shardList.add(shard.GetShardId() + "");
            }
        }
        catch (LogException e) {
            logger.error("loghub 獲取shard列表 error :", e);
        }
        return shardList;
    }

}

  如上,就是協調均衡主框架。主要邏輯如下:

    1. 啟動時初始化各種端,分配器,注冊自己到控制中心等等;
    2. 以線程的形式,被外部以定時任務執行的方式調用;
    3. 檢查任務前,須獲得檢查鎖,否則直接返回;
    4. 先獲得目前機器的所有消費情況和shard的分配情況,得出資源負載數據;
    5. 根據得到的數據信息,推算出目前的平衡狀態;
    6. 根據平衡狀態,調用相應的平衡策略進行重平衡;
    7. 等待下一次調度;

檢查結果將作為后續選擇均衡策略的依據,所以需要相應的狀態容器保存。如下:

/**
 * 集群狀態預檢查 結果容器
 */
class CheckConsumerShardingResultContainer {

    /**
     * 所有shard列表
     */
    private List<String> allShardList;

    /**
     * 未被分配的shard列表
     */
    private List<String> unAssignedShardList;

    /**
     * 所有機器列表
     */
    private List<String> allServerList;

    /**
     * 空閑的機器列表(未被分配shard)
     */
    private List<String> idleServerList;

    /**
     * 機器消費shard的負載計數容器
     */
    private Map<String, Integer> serverConsumeShardLoad;

    /**
     * 狀態檢查結果類型
     */
    private ReBalanceStatusResultEnum statusResultType;

    public Map<String, Integer> getServerConsumeShardLoad() {
        return serverConsumeShardLoad;
    }

    public void setServerConsumeShardLoad(Map<String, Integer> serverConsumeShardLoad) {
        this.serverConsumeShardLoad = serverConsumeShardLoad;
    }

    public List<String> getAllShardList() {
        return allShardList;
    }

    public void setAllShardList(List<String> allShardList) {
        this.allShardList = allShardList;
    }

    public List<String> getUnAssignedShardList() {
        return unAssignedShardList;
    }

    public void setUnAssignedShardList(List<String> unAssignedShardList) {
        this.unAssignedShardList = unAssignedShardList;
    }

    public List<String> getAllServerList() {
        return allServerList;
    }

    public void setAllServerList(List<String> allServerList) {
        this.allServerList = allServerList;
    }

    public List<String> getIdleServerList() {
        return idleServerList;
    }

    public void setIdleServerList(List<String> idleServerList) {
        this.idleServerList = idleServerList;
    }

    public ReBalanceStatusResultEnum getStatusResultType() {
        return statusResultType;
    }

    public void setStatusResultType(ReBalanceStatusResultEnum statusResultType) {
        this.statusResultType = statusResultType;
    }
}

 

  針對多個平衡策略算法,使用一個工廠類來生產各種策略實例。如下:

/**
 * 再平衡算法工廠類
 */
class StatusReBalanceStrategyFactory {

    /**
     * 無需做平衡的控制器
     */
    private static final StatusReBalanceStrategy EMPTY_BALANCER = new EmptyReBalancer();

    /**
     * 根據當前的負載狀態,創建對應的負載均衡算法
     *
     * @param resultContainer 負載狀態集
     * @param shardAssignMaster 分片分配管理者實例
     * @return 算法實例
     */
    public static StatusReBalanceStrategy createStatusReBalanceAlgorithm(CheckConsumerShardingResultContainer resultContainer, ShardAssignMaster shardAssignMaster) {
        ReBalanceStatusResultEnum balanceStatus = resultContainer.getStatusResultType();
        switch (balanceStatus) {
            case OK:
                return EMPTY_BALANCER;
            case UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS:
                return new UnAssignedShardWithConsumerIdleReBalancer(shardAssignMaster,
                                    resultContainer.getUnAssignedShardList(), resultContainer.getIdleServerList());
            case UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS:
                return new UnassignedShardWithoutConsumerIdleReBalancer(shardAssignMaster,
                                    resultContainer.getUnAssignedShardList(), resultContainer.getServerConsumeShardLoad());
            case HEAVY_LOAD_BALANCE_NEEDED:
                return new HeavyLoadReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad());
            case HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED:
                return new HeavyLoadWithConsumerIdleReBalancer(shardAssignMaster,
                                    resultContainer.getServerConsumeShardLoad(), resultContainer.getIdleServerList());
            default:
                break;
        }
        return EMPTY_BALANCER;
    }
}

/**
 * 負載均衡策略統一接口
 */
interface StatusReBalanceStrategy {

    /**
     * 執行負載均衡方法
     */
    public void loadBalance();
}

 

  針對各種場景的負載均衡,各自實現如下。其中,無需操作時,將返回一個空操作實例!

1. 空操作實例

/**
 * 無需做平衡的控制器
 *
 * @see ReBalanceStatusResultEnum#OK 狀態枚舉
 */
class EmptyReBalancer implements StatusReBalanceStrategy {
    @Override
    public void loadBalance() {
        // ignore ...
    }
}

 

2. 分配剩余shard給空閑的機器控制器

/**
 * 為所有空閑的其他空閑機器分配可用 shard 的控制器
 *
 * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態枚舉
 */
class UnAssignedShardWithConsumerIdleReBalancer implements StatusReBalanceStrategy {

    /**
     * 未被分配的分片列表
     */
    private List<String> unAssignedShardList;

    /**
     * 分片分配管理者實例
     */
    private ShardAssignMaster shardAssignMaster;

    /**
     * 空閑的機器列表
     */
    private List<String> idleServerList;

    public UnAssignedShardWithConsumerIdleReBalancer(
                    ShardAssignMaster shardAssignMaster,
                    List<String> unAssignedShardList,
                    List<String> idleServerList) {
        this.shardAssignMaster = shardAssignMaster;
        this.unAssignedShardList = unAssignedShardList;
        this.idleServerList = idleServerList;
    }

    @Override
    public void loadBalance() {
        // 1. 找出還未被消費的shard
        // 2. 依次分配給各空閑機器,每個空閑機器只至多分配一個shard
        int serverIndex = 0;
        for (String shard1 : unAssignedShardList) {
            // 輪詢分配shard, 先只給一個機器分配一個shard
            if(serverIndex >= idleServerList.size()) {
                break;
            }
            String serverHostname = idleServerList.get(serverIndex++);
            shardAssignMaster.assignShardToServer(shard1, serverHostname);
        }
    }
}

 

3. 分配剩余shard給負載低的機器的控制器

/**
 * 有空閑shard場景 的控制器 , 須找出負載最低的機器塞入shard到現有的機器中(可能是有機器下線導致)
 *
 * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態枚舉
 */
class UnassignedShardWithoutConsumerIdleReBalancer implements StatusReBalanceStrategy {

    /**
     * 未被分配分片列表
     */
    private List<String> unAssignedShardList;

    /**
     * 分片管理者實例
     */
    private ShardAssignMaster shardAssignMaster;

    /**
     * 消費者負載情況
     */
    private Map<String, Integer> consumerLoadCount;

    public UnassignedShardWithoutConsumerIdleReBalancer(
                        ShardAssignMaster shardAssignMaster,
                        List<String> unAssignedShardList,
                        Map<String, Integer> consumerLoadCount) {
        this.shardAssignMaster = shardAssignMaster;
        this.unAssignedShardList = unAssignedShardList;
        this.consumerLoadCount = consumerLoadCount;
    }

    @Override
    public void loadBalance() {
        // 1. 找出負載最低的機器
        // 2. 依次分配shard給該機器
        // 3. 分配的后負載數+1, 循環分配
        // 先根據空閑數,計算出一個可以接受新shard的機器的shard負載最低值,然后依次分配給這些機器
        for (String shard1 : unAssignedShardList) {
            // 按負載最小分配原則 分配shard
            Map.Entry<String, Integer> minLoadServer = getMinLoadServer(consumerLoadCount);
            String serverHostname = minLoadServer.getKey();

            // 分配shard給機器
            shardAssignMaster.assignShardToServer(shard1, serverHostname);

            // 負載數 +1
            minLoadServer.setValue(minLoadServer.getValue() + 1);
        }
    }

    /**
     * 獲取負載最小的機器名備用
     *
     * @param loadCount 負載數據
     * @return 最小負載機器
     */
    private Map.Entry<String, Integer> getMinLoadServer(Map<String, Integer> loadCount) {
        int minCount = MAX_CONSUMER_SHARD_LOAD;
        Map.Entry<String, Integer> minLoadServer = null;
        for(Map.Entry<String, Integer> server1 : loadCount.entrySet()) {
            if(server1.getValue() < minCount) {
                minCount = server1.getValue();
                minLoadServer = server1;
            }
        }
        return minLoadServer;
    }
}

 

4. 將現有機器消費情況做重分配,從而使各自負載相近控制器

/**
 * 負載不均衡導致的 重新均衡控制器,將消費shard多的機器的 shard 拆解部分到 消費少的機器上 (須上鎖)
 *
 * @see ReBalanceStatusResultEnum#HEAVY_LOAD_BALANCE_NEEDED 狀態枚舉
 */
class HeavyLoadReBalancer implements StatusReBalanceStrategy {

    /**
     * 分片分配管理者實例
     */
    private ShardAssignMaster shardAssignMaster;

    /**
     * 機器消費負載情況
      */
    private Map<String, Integer> consumerLoadCount;

    public HeavyLoadReBalancer(ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount) {
        this.shardAssignMaster = shardAssignMaster;
        this.consumerLoadCount = consumerLoadCount;
    }

    @Override
    public void loadBalance() {
        // 1. 找出所有機器的消費數的平均線值
        // 2. 負載數大於均線1的,直接抽出多余的shard, 放到待分配容器中
        // 3. 從大到小排序負載機器
        // 4. 從大的負載上減少shard到最后的機器上,直到小的機器達到平均負載線最貼近的地方,或者小的機器到達平均負載線最貼近的地方
        // 5. ++大負載機器 或者 --小負載機器,下一次循環
        double avgLoadCount = computeAliveServersAvgLoadCount(consumerLoadCount);
        List<Map.Entry<String, Integer>> sortedLoadCountList = sortLoadCountByLoadWithSmallEndian(consumerLoadCount);
        int bigLoadIndex = 0;
        int smallLoadIndex = sortedLoadCountList.size() - 1;
        for (;;) {
            // 首先檢測是否已遍歷完成,完成后不再進行分配
            if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) {
                break;
            }
            Map.Entry<String, Integer> bigLoadServerEntry = sortedLoadCountList.get(bigLoadIndex);
            double canTakeCountFromBigLoad = bigLoadServerEntry.getValue() - avgLoadCount;
            if(canTakeCountFromBigLoad < 1) {
                bigLoadIndex += 1;
                continue;
            }
            for (int reAssignShardIndex = 0;
                     reAssignShardIndex < canTakeCountFromBigLoad; reAssignShardIndex++) {
                if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) {
                    break;
                }
                Map.Entry<String, Integer> smallLoadServerEntry = sortedLoadCountList.get(smallLoadIndex);
                double canPutIntoSmallLoad = avgLoadCount - smallLoadServerEntry.getValue();
                if(canPutIntoSmallLoad < 1) {
                    smallLoadIndex -= 1;
                    continue;
                }
                // 此處可以使用管道操作,更流暢, 或者更准確的說,使用事務操作
                // 從 bigLoad 中移除shard 0
                // 將移除的 shard 上鎖,以防后續新機器立即消費,導致數據異常
                // 添加新shard到 smallLoad 中
                String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(bigLoadServerEntry.getKey());
                bigLoadServerEntry.setValue(bigLoadServerEntry.getValue() - 1);

                // 上鎖分片,禁用消費
                shardAssignMaster.lockShardId(firstLoadSHardId);

                // 添加shard到 smallLoad 中
                shardAssignMaster.assignShardToServer(firstLoadSHardId, smallLoadServerEntry.getKey());
                smallLoadServerEntry.setValue(smallLoadServerEntry.getValue() + 1);
            }
        }
    }

    /**
     * 判定輪詢是否完成
     *
     * @param startIndex 開始下標
     * @param endIndex 結束下標
     * @return true: 輪詢完成, false: 未完成
     */
    private boolean isRoundRobinComplete(int startIndex, int endIndex) {
        return startIndex == endIndex;
    }

    /**
     * 從大到小排序 負載機器
     *
     * @param consumerLoadCount 總負載情況
     * @return 排序后的機器列表
     */
    private List<Map.Entry<String, Integer>> sortLoadCountByLoadWithSmallEndian(Map<String, Integer> consumerLoadCount) {
        List<Map.Entry<String, Integer>> sortedList = new ArrayList<>(consumerLoadCount.entrySet());
        sortedList.sort(new Comparator<Map.Entry<String, Integer>>() {
            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return o2.getValue() - o1.getValue();
            }
        });
        return sortedList;
    }

    /**
     * 計算平均每台機器的消費shard負載
     *
     * @param loadCount 總負載指標容器
     * @return 負載均線
     */
    private double computeAliveServersAvgLoadCount(Map<String, Integer> loadCount) {
        int totalServerCount = loadCount.size();
        int totalShardCount = 0;
        for(Integer consumeShardCount : loadCount.values()) {
            totalShardCount += consumeShardCount;
        }
        return (double) totalShardCount / totalServerCount;
    }
}

 

5. 從負載重的機器上剝奪shard,分配給空閑的機器 控制器

/**
 *  負載不均衡,且存在空閑的機器, 此時應是 均值與最大值之間相差較小值,但是至少有一個 消費2 的機器,可以剝奪其1個shard給空閑機器 的控制器
 *
 * @see ReBalanceStatusResultEnum#HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED 狀態枚舉
 */
class HeavyLoadWithConsumerIdleReBalancer implements StatusReBalanceStrategy {

    /**
     * 分片分配管理者實例
     */
    private ShardAssignMaster shardAssignMaster;

    /**
     * 空閑的機器列表
     */
    private List<String> idleServerList;

    /**
     * 機器消費負載情況
     */
    private Map<String, Integer> consumerLoadCount;

    public HeavyLoadWithConsumerIdleReBalancer(
            ShardAssignMaster shardAssignMaster,
            Map<String, Integer> consumerLoadCount,
            List<String> idleServerList) {
        this.shardAssignMaster = shardAssignMaster;
        this.consumerLoadCount = consumerLoadCount;
        this.idleServerList = idleServerList;
    }

    @Override
    public void loadBalance() {
        // 1. 找出還未被消費的shard
        // 2. 分配一個給自己
        // 3. 如果還有其他機器也未分配,則同樣進行分配
        for (String idleHostname1 : idleServerList) {
            Map.Entry<String, Integer> maxLoadEntry = getMaxLoadConsumerEntry(consumerLoadCount);
            // 本身只有一個則不再分配負擔了
            if(maxLoadEntry.getValue() <= 1) {
                break;
            }
            String maxLoadServerHostname = maxLoadEntry.getKey();

            // 此處可以使用管道操作,更流暢, 或者更准確的說,使用事務操作
            // 從 bigLoad 中移除shard 0
            // 將移除的 shard 上鎖,以防后續新機器立即消費,導致數據異常
            // 添加新shard到 smallLoad 中
            String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(maxLoadServerHostname);
            maxLoadEntry.setValue(maxLoadEntry.getValue() - 1);

            // 上鎖卸載下來的shard,鎖定50s
            shardAssignMaster.lockShardId(firstLoadSHardId);

            // 添加shard到 smallLoad 中
            shardAssignMaster.assignShardToServer(firstLoadSHardId, idleHostname1);
            consumerLoadCount.put(idleHostname1, 1);
        }
    }

    /**
     * 獲取負載最大的機器實例作
     *
     * @param consumerLoadCount 所有機器的負載情況
     * @return 最大負載機器實例
     */
    private Map.Entry<String, Integer> getMaxLoadConsumerEntry(Map<String, Integer> consumerLoadCount) {
        Integer maxConsumeCount = 0;
        Map.Entry<String, Integer> maxEntry = null;
        for (Map.Entry<String, Integer> server1 : consumerLoadCount.entrySet()) {
            if(server1.getValue() > maxConsumeCount) {
                maxConsumeCount = server1.getValue();
                maxEntry = server1;
            }
        }
        return maxEntry;
    }
}

  如上,各個平衡策略,實現各自的功能,就能掌控整個集群的消費控制了!

除了上面的主料,還有一些附帶的東西!

【2】均衡狀態枚舉值如下:

/**
 * 再平衡檢測結果類型枚舉
 *
 */
public enum ReBalanceStatusResultEnum {

    /**
     * 一切正常,無須操作
     */
    OK("一切正常,無須操作"),

    /**
     * 有新下線機器,可以將其分片分配給其他機器
     */
    UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS("有未分配的分片,可以分配給其他機器"),

    /**
     * 有未分配的分片,且有空閑機器,直接將空閑shard分配給空閑機器即可(最好只分配1個,以便其他機器啟動后可用)
     */
    UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS("有未分配的分片,且有空閑機器"),

    /**
     * 負載不均衡,須生平衡
     */
    HEAVY_LOAD_BALANCE_NEEDED("負載不均衡,須生平衡"),

    /**
     * 負載不均衡,且存在空閑的機器, 此時應是 均值與最大值之間相差較小值,但是至少有一個 消費2 的機器,可以剝奪其1個shard給空閑機器
     */
    HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED("負載不均衡,且存在空閑的機器"),

    ;

    private ReBalanceStatusResultEnum(String remark) {
        // ignore
    }
}

 

【3】RedisKeyConstants 常量定義

/**
 * redis 相關常量定義
 */
public class RedisKeyConstants {

    /**
     * 在線機器緩存key.與心跳同時作用
     *
     * @see #SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX
     */
    public static final String ALL_ONLINE_SERVER_CACHE_KEY = "prefix:active.servers";

    /**
     * 機器消費shard情況 緩存key前綴
     */
    public static final String SERVER_CONSUME_CACHE_KEY_PREFIX = "prefix:log.consumer:server:";

    /**
     * 分片被分配情況 緩存key前綴
     */
    public static final String SHARD_ASSIGNED_CACHE_KEY_PREFIX = "prefix:shard.assigned:id:";

    /**
     * 分片鎖 緩存key前綴, 當上鎖時,任何機器不得再消費
     */
    public static final String SHARD_LOCK_CONSUME_CACHE_PREFIX = "prefix:consume.lock.shard:id:";

    /**
     * 存活機器心跳,與上面的機器形成呼應
     *
     * @see #ALL_ONLINE_SERVER_CACHE_KEY
     */
    public static final String SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX = "prefix:log.consumer:server.heartbeat:";

    /**
     * 單個消費者最大消費負載數 (一個不可能達到的值)
     */
    public static final Integer MAX_CONSUMER_SHARD_LOAD = 9999;
}

 

【4】shard分配控制器負責所有shard分配

import com.test.utils.RedisPoolUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
 * shard分配管理者 (盡量使用接口表達)
 *
 */
public class ShardAssignMaster {

    private RedisPoolUtil redisPoolUtil;

    public ShardAssignMaster(RedisPoolUtil redisPoolUtil) {
        this.redisPoolUtil = redisPoolUtil;
    }

    /**
     * 注冊消費者到 控制中心(注冊中心)
     */
    public void registerConsumer(String serverHostname) {
        // 注冊server到 redis zset 中,如有條件,可以使用 zk 進行操作,也許更好
        redisPoolUtil.zadd(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, (double)System.currentTimeMillis(), serverHostname);
    }

    /**
     * 心跳發送數據
     */
    public void sendHeartbeat(String serverHostname) {
        String heartbeatCacheKey = RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + serverHostname;
        redisPoolUtil.set(heartbeatCacheKey, "1", 30);
    }

    /**
     * 檢測指定消費者服務器還存活與否
     *
     * @param consumeHostname 機器名
     * @return true: 存活, false: 宕機
     */
    public boolean isConsumerServerAlive(String consumeHostname) {
        String aliveValue = redisPoolUtil.get(RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + consumeHostname);
        return aliveValue != null
                && "1".equals(aliveValue);
    }
    /**
     * 獲取並刪除指定server的所屬消費的第一個 shardId
     *
     * @param serverHostname 機器名
     * @return 第一個shardId
     */
    public String popServerFirstConsumeShardId(String serverHostname) {
        String bigLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;
        Set<String> firstLoadShardSet = redisPoolUtil.zrange(bigLoadConsumerServerCacheKey, 0, 0);
        String firstLoadSHardId = firstLoadShardSet.iterator().next();
        redisPoolUtil.zrem(bigLoadConsumerServerCacheKey, firstLoadSHardId);
        redisPoolUtil.expire(bigLoadConsumerServerCacheKey, 60);
        return firstLoadSHardId;
    }

    /**
     * 對shard進行上鎖,禁止所有消費行為
     *
     * @param shardId 分片id
     */
    public void lockShardId(String shardId) {
        String shardLockCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId;
        redisPoolUtil.set(shardLockCacheKey, "1", 50);
    }

    /**
     * 分配shard分片數據給 指定server
     *
     * @param shardId 分片id
     * @param serverHostname 分配給的消費者機器名
     */
    public void assignShardToServer(String shardId, String serverHostname) {
        String smallLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;
        redisPoolUtil.zadd(smallLoadConsumerServerCacheKey, (double)System.currentTimeMillis(), shardId);
        redisPoolUtil.expire(smallLoadConsumerServerCacheKey, 60);

        // 更新新的shard消費者標識
        String shardIdAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;
        redisPoolUtil.set(shardIdAssignCacheKey, serverHostname);
    }

    /**
     * 獲取被分配了shardId的server信息
     *
     * @param shardId 要檢查的分片id
     * @return 被分配了shardId 的機器名
     */
    public String getShardAssignedServer(String shardId) {
        String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;
        return redisPoolUtil.get(shardAssignCacheKey);
    }

    /**
     * 刪除shard的分配信息,使無效化
     *
     * @param shardId 要刪除的分片id
     */
    public void invalidateShardAssignInfo(String shardId) {
        String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;
        redisPoolUtil.del(shardAssignCacheKey);
    }

    /**
     * 清理下線機器
     *
     * @param hostname 下線機器名
     */
    public void invalidateOfflineServer(String hostname) {
        redisPoolUtil.zrem(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, hostname);
    }

    /**
     * 獲取機器消費的shard列表
     *
     * @param serverHostname 機器主機名
     * @return shard列表 或者 null
     */
    public Set<String> getServerDutyConsumeShardSet(String serverHostname) {
        String serverDutyConsumeShardCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;
        return redisPoolUtil.zrange(serverDutyConsumeShardCacheKey, 0, -1);
    }

    /**
     * 獲取所有在線機器列表
     *
     * @return 在線機器列表
     */
    public List<String> getAllOnlineServerList() {
        Set<String> hostnameSet = redisPoolUtil.zrange(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, 0, -1);
        return new ArrayList<>(hostnameSet);
    }

}

  以上是協同負載均衡器代碼實現。

 

【5】當然你還需要一個消費者

  接下來我們還要看下消費者如何實現消費。

import com.test.utils.RedisPoolUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 消費者業務線程
 *
 */
public class LoghubConsumeWorker implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(LoghubConsumeWorker.class);

    private RedisPoolUtil redisPoolUtil;

    private String HOST_NAME;

    /**
     * 因消費者數目不一定,所以使用 CachedThreadPool
     */
    private ExecutorService consumeExecutorService = Executors.newCachedThreadPool();

    public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil) {
        this(redisPoolUtil, null);
    }

    public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil, String hostName) {
        this.redisPoolUtil = redisPoolUtil;
        // 為測試需要添加的 hostName
        HOST_NAME = hostName;
        initSharedVars();
    }

    /**
     * 初始化公共變量
     */
    private void initSharedVars() {
        try {
            if(HOST_NAME != null) {
                return;
            }
            HOST_NAME = InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            throw new RuntimeException("init error : 獲取服務器主機名失敗");
        }
    }


    @Override
    public void run() {
        while (!Thread.interrupted()) {
            // 先獲取所有分配給的shard列表,為空則進入下一次循環(注意此時阻塞鎖不能起作用)
            Set<String> shardsSet = blockingTakeAvailableConsumeShardList();
            try {
                // 消費所有給定shard數據
                consumeLogHubShards(shardsSet);
            } catch (Exception e) {
                logger.error("消費loghub, error", e);
            }

        }

    }

    /**
     * 獲取可用的分片列表(沒有則阻塞等待)
     *
     * @return 分片列表
     */
    private Set<String> blockingTakeAvailableConsumeShardList() {
        while (!Thread.interrupted()) {
            String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME;
            Set<String> shardsSet = redisPoolUtil.zrange(serverConsumeCacheKey, 0, -1);
            if (shardsSet != null && !shardsSet.isEmpty()) {
                return shardsSet;
            }
            logger.warn(" =========== 當前主機[hostname:{}]未查詢到任何shard =========", HOST_NAME);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                logger.error("LogHubClientWork run 未獲取到該主機的shard時,每隔1秒鍾獲取 ,error : {}", e);
            }
        }
        return null;
    }

    /**
     * 消費loghub 分片數據
     *
     * @param shardsSet 被分配的分片列表
     */
    public void consumeLogHubShards(Set<String> shardsSet) throws InterruptedException {
        if(shardsSet == null || shardsSet.isEmpty()) {
            return;
        }
        // 此處使用 CountdownLatch, 保證至少有一個任務完成時,才開始下一次任務的調入
//        Semaphore semaphoreLock = new Semaphore(shardsSet.size());
        CountDownLatch openDoorLatch = new CountDownLatch(1);
        boolean startNewJobAtLeastOnce = false;
        for (String shard : shardsSet) {
            // 檢測當前shard是否處於鎖定狀態,如果鎖定則不能消費, 注意鎖情況
            if(isShardLocked(shard)) {
                logger.info("=============== shard:{} is locked, continue... ======", shard);
                continue;
            }
            int shardId = Integer.parseInt(shard);
            LoghubConsumerTaskExecutor consumer = getConsumerExecutor(shardId);
            // consumer 應保證有所消費,如果沒有消費,則自行等待一個長周期,外部應只管調入請求
            // consumer 應保證所有消費,在上一個任務未完成時,不得再開啟下一輪提交消費
            boolean startNewJob = consumer.startNewConsumeJob(openDoorLatch);
            if(startNewJob) {
                // start failed, prev job is running maybe
                // ignore job, no blocking
                startNewJobAtLeastOnce = true;
            }
        }
        // 任意一個任務完成,都將打開新的分配周期,且后續 countDown 將無效,此處可能導致死鎖
        if(startNewJobAtLeastOnce) {
            openDoorLatch.await();
        }
        else {
            // 當本次分配調度一個任務都未提交時,則睡眠等待
            // (一般此情況為 消費者被分配了上了鎖的shard時,即搶占另的機器的shard, 需要給別的機器備份數據時間鎖)
            Thread.sleep(200);
        }
    }

    /**
     * 檢測分片是否被鎖定消費了
     *
     * @param shardId 分片id
     * @return true:鎖定, false:未鎖定可用
     */
    private boolean isShardLocked(String shardId) {
        String shardCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId;
        String lockValue = redisPoolUtil.get(shardCacheKey);
        return !StringUtils.isBlank(lockValue)
                    && "1".equals(lockValue);
    }

    /**
     * 獲取消費者實例,針對一個shard, 只創建一個實例
     */
    private Map<Integer, LoghubConsumerTaskExecutor> mShardConsumerMap = new ConcurrentHashMap<>();
    private LoghubConsumerTaskExecutor getConsumerExecutor(final int shardId) {
        LoghubConsumerTaskExecutor consumer = mShardConsumerMap.get(shardId);
        if (consumer != null) {
            return consumer;
        }
        consumer = new LoghubConsumerTaskExecutor(new SingleShardConsumerJob(shardId));
        mShardConsumerMap.put(shardId, consumer);
        logger.info(" ======================= create new consumer executor shard:{}", shardId);
        return consumer;
    }

    /**
     * 消費者調度器
     *
     *      統一控制消費者的運行狀態管控
     */
    class LoghubConsumerTaskExecutor {

        private Future<?> future;

        private ConsumerJob consumerJob;

        public LoghubConsumerTaskExecutor(ConsumerJob consumerJob) {
            this.consumerJob = consumerJob;
        }

        /**
         * 啟動一個新消費任務
         *
         * @return true: 啟動成功, false: 啟動失敗有未完成任務在前
         */
        public boolean startNewConsumeJob(CountDownLatch latch) {
            if(future == null
                    || future.isCancelled() || future.isDone()) {
                //沒有任務或者任務已取消或已完成 提交任務
                future = consumeExecutorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            consumerJob.consumeShardData();
                        }
                        finally {
                            latch.countDown();
                        }
                    }
                });
                return true;
            }
            return false;
        }

    }

}

/**
 * 消費者任務接口定義
 */
interface ConsumerJob {

    /**
     * 消費數據具體邏輯實現
     */
    public void consumeShardData();
}

/**
 * 單個shard消費的任務實現
 */
class SingleShardConsumerJob implements ConsumerJob {

    /**
     * 當前任務的消費 shardId
     */
    private int shardId;

    public SingleShardConsumerJob(int shardId) {
        this.shardId = shardId;
    }

    @Override
    public void consumeShardData() {
        System.out.println(LocalDateTime.now() + " - host -> consume shard: " + shardId);
        try {
            // do complex biz
            // 此處如果發現shard 不存在異常,則應回調協調器,進行shard的移除
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}

 

【6】當然你還需要一個demo

  看不到效果,我就是不信!
  所以來看個 demo 吧!
  我們使用單機開多個 單元測試用例,直接測試就好!

測試代碼:.

import com.test.common.config.LogHubProperties;
import com.test.utils.RedisPoolUtil;
import org.junit.Test;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 臨時測試 負載均衡
 *
 */
public class ShardConsumerLoadBalanceTest {

    public static void main(String[] args) throws IOException {
        startAConsumer();
        System.in.read();
    }

    // 啟動一個單元測試,就相當於啟動一個消費者應用
    @Test
    public void mainMock() throws IOException {
        startAConsumer();
        System.in.read();
    }

    // 啟動一個單元測試,就相當於啟動一個消費者應用
    @Test
    public void startNewConsumer() throws IOException {
        startAConsumer();
        System.in.read();
    }

    // 啟動一個單元測試,就相當於啟動一個消費者應用
    @Test
    public void startNewConsumer2() throws IOException {
        startAConsumer();
        System.in.read();
    }


    private static void startAConsumer() {
        RedisPoolUtil redisPoolUtil = new RedisPoolUtil();
        redisPoolUtil.setIp("127.0.0.1");
        redisPoolUtil.setMaxActive(111);
        redisPoolUtil.setMaxIdle(1000);
        redisPoolUtil.setPort(6379);
        redisPoolUtil.setMaxWait(100000);
        redisPoolUtil.setTimeout(100000);
        redisPoolUtil.setPassWord("123");
        redisPoolUtil.setDatabase(0);
        redisPoolUtil.initPool();

        LogHubProperties logHubProperties = new LogHubProperties();
        logHubProperties.setProjectName("test");
        logHubProperties.setEndpoint("cn-shanghai-finance-1.log.aliyuncs.com");
        logHubProperties.setAccessKey("xxxx");
        logHubProperties.setAccessKeyId("11111");

        // 使用隨機 hostname 模擬多台機器調用
        Random random = new Random();
        String myHostname = "my-host-" + random.nextInt(10);

        // 啟動管理線程
        LoghubConsumerShardCoWorker shardCoWorker = new LoghubConsumerShardCoWorker(redisPoolUtil, logHubProperties, myHostname);
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        scheduledExecutorService.scheduleAtFixedRate(shardCoWorker, 5, 30, TimeUnit.SECONDS);

        // 啟動業務線程
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        LoghubConsumeWorker worker = new LoghubConsumeWorker(redisPoolUtil, myHostname);

        executorService.submit(worker);

    }
}

 

  如上,就可以實現自己的負載均衡消費了。

  比如: 總分片數為4。

    1. 最開始啟動1個機器時,將會被分配 0,1,2,3。
    2. 啟動兩個后,將分為 0,1; 2,3;
    3. 啟動3個后,將分為 0; 1; 2,3;
    4. 反之,關閉一個機器后,將把壓力分擔到原機器上。
    當做負載重分配時,將有50秒的鎖定時間備份。

 

【7】待完善的點

  本文是基於loghub實現的分片拉取,其實在這方面loghub與kafka是如出一轍的,只是loghub更商業產品化。

  當shard縮減時,應能夠自動發現,從而去除原有的機器消費分配。而不是讓消費者報錯。

  注意進行再均衡時,消費者偏移量問題,尤其是你為了性能使用了jvm本地變量保存偏移時,注意刷新該變量偏移。本文沒有實現類似zookeeper強大的watch監聽功能,但是有一個上鎖等待的過程,你可以基於這個鎖做一些力所能及的事!

 

老話: 可以適當造輪子!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM