如何用java實現一個p2p種子搜索(2)-路由表實現


路由表實現

回顧一下上一篇講的內容,上一篇提到從dht網絡中獲取infohash,那么加入dht網絡后的最重要的第一步就是怎么去建立路由表。
路由表里面保存的是dht中其他node的信息,所以node可以這么設計

public class Node implements Comparable<Node>{

    private String nodeId;//16進制字符串

    private String ip; //node的ip

    private Integer port; //node的端口

    private Date updateTime;//最后更新時間

    private byte[] nodeIdBytes;//20字節

    private Integer k=0;//k桶應該有的位置

    private Integer currentK=0;//當前的位置

    private Integer rank=0; //node的rank分值 ,路由表滿的時候,優先移除分值低的
    .....
}

因為路由表的每個bucket最多只有存8個,所以當路由表的bucket滿的時候,需要不斷的刪除rank分最低的node,為了高效比較和刪除bucket我們可以用PriorityQueue,每個路由表最多有160個bucket,所以可以用map來存儲路由表

private Map<Integer,PriorityQueue<Node>> tableMap=new ConcurrentHashMap<>();

因為路由表一開始只有一個bucket,當節點數量超過8個就會分裂成兩個bucket,為了確定新節點應該插入到哪個bucket中,所以把每個bucket設計成鏈表

public static class Bucket{
    private int k; //當前是第幾個k桶
    private Bucket next;//下一個k桶
}

好了我們再來看怎么添加一個node

public void put(Node node) {
    int bucketIndex = getBucketIndex(node);
    if(bucketIndex==0){//是自己就不用加入了
        return;
    }
    PriorityQueue<Node> pq = tableMap.get(bucketIndex);
    if(CollectionUtils.isEmpty(pq)){
        //如果是空 那么找最近的那個節點加入
        boolean isAdd=false;
        while(bucket.next != null){
            if(bucketIndex > bucket.getK()
                    && bucketIndex < bucket.next.getK()){
                 //先往小的里面放
                node.setCurrentK(bucket.getK());
                isAdd=putAccurate(tableMap.get(bucket.getK()),node,false,bucket,tableMap);
                if(!isAdd){
                    node.setCurrentK(bucket.next.getK());
                    isAdd=putAccurate(tableMap.get(bucket.next.getK()),node,true,bucket,tableMap);
                }
            }
            bucket=bucket.next;

        }
        if(!isAdd){
            //沒有添加成功 那么往最后一個節點添加
            node.setCurrentK(bucket.getK());
            putAccurate(tableMap.get(bucket.getK()),node,true,bucket,tableMap);
        }

    }else{//如果不空 那么直接加 簡單點來吧
        if(pq.size()<8){
            if(!pq.contains(node)){
                node.setCurrentK(node.getK());
                pq.add(node);
            }else{
                reAdd(pq,node);
            }
        }else{
            pq.add(node);
            pq.poll();
        }
    }
}

其中比較重要的是方法是putAccurate

/**
 * @param pq 當前bucket
 * @param node 需要插入的node
 * @param isSplit 是否需要分裂
 * @param bucket 需要插入的bucket的位置
 * @param tableMap 路由表
 * @return 返回是否添加成功
 */
@SneakyThrows
public boolean putAccurate(PriorityQueue<Node> pq,Node node,boolean isSplit,Bucket bucket,Map<Integer,PriorityQueue<Node>> tableMap){
    boolean isAdd=false;
    if(pq.contains(node)){
        return reAdd(pq,node);
    }
    if(pq.size()<8){
        pq.add(node);
        isAdd=true;
    }
    if(isSplit && !isAdd){
        PriorityQueue<Node> priorityQueue=new PriorityQueue<Node>((x,y)->x.getRank()-y.getRank());
        priorityQueue.add(node);
        tableMap.putIfAbsent(node.getK(),priorityQueue);
        //創建新的k桶后需要把兩邊的bucket距離比較近的都放到自己的k桶里面 如果超過8個就丟了 最好是可以ping一下
        //先從小的開始放
        PriorityQueue<Node> collect1 = new PriorityQueue<>();
        collect1.addAll(tableMap.get(bucket.getK()).stream().filter(n -> {
            if (priorityQueue.size() < 8 &&
                    Math.abs(n.getK() - n.getCurrentK()) > Math.abs(n.getK() - node.getK())) {
                n.setCurrentK(node.getK());
                priorityQueue.add(n);
                return false;
            }
            return true;
        }).collect(Collectors.toSet()));
        tableMap.put(bucket.getK(),CollectionUtils.isNotEmpty(collect1)?collect1:new PriorityQueue<Node>());
        if(bucket.next!=null && CollectionUtils.isNotEmpty(tableMap.get(bucket.next.getK()))){
            PriorityQueue<Node> collect = new PriorityQueue<>();
            collect.addAll(tableMap.get(bucket.next.getK()).stream().filter(n -> {
                if (priorityQueue.size() < 8 &&
                        Math.abs(n.getK() - n.getCurrentK()) > Math.abs(n.getK() - node.getK())) {
                    n.setCurrentK(node.getK());
                    priorityQueue.add(n);
                    return false;
                }
                return true;
            }).collect(Collectors.toSet()));
            tableMap.put(bucket.next.getK(),CollectionUtils.isNotEmpty(collect)?collect:new PriorityQueue<Node>());
        }
        Bucket b=new Bucket(node.getK(),bucket.next);
        bucket.next=b;
        isAdd=true;
        node.setCurrentK(node.getK());
    }
    return isAdd;
}

上一篇我們知道路由表主要通過find_node來建立,那我們自己也會收到別人發起的find_node請求,所以我們還要實現根據nodeid來查找最近的8個node

/**
* 根據nodeid 查找最近的8個node
* @param trargetBytes 需要查找目標id
 * @return
 */
public List<Node> getForTop8(byte[] trargetBytes){
    int bucketIndex = getBucketIndex(trargetBytes);
    List<Node> l=new ArrayList<>();
    PriorityQueue<Node> pq = tableMap.get(bucketIndex);
    if(CollectionUtils.isEmpty(pq)){
        while(bucket.next != null){
            if(bucketIndex > bucket.getK()
                    && bucketIndex < bucket.next.getK()){

                tableMap.get(bucket.next.getK()).stream().forEach(x->{
                    if(l.size()<8){
                        l.add(x);
                    }
                });
            }
            bucket=bucket.next;
        }
        if(CollectionUtils.isEmpty(l)){
            tableMap.get(bucket.getK()).stream().forEach(x->{
                if(l.size()<8){
                    l.add(x);
                }
            });
        }

    }else{//如果不空 那么直接加 簡單點來吧
        l.addAll(pq.stream().collect(Collectors.toList()));
    }
    return l;
}

好了,到了這里路由表大致就實現啦。已經成功完成了第一步,現在呢路由表還沒有初始化剛開始什么數據都沒有,而且我們還是不能從dht中獲取infohash,下一篇再來講dht 協議,里面還會講怎么初始化路由表,實現了dht協議也就完成了一大半了。
本章路由表部分還可以參考源碼里面的RoutingTable,應該都能看得懂,地址:https://github.com/mistletoe9527/dht-spider


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM