JAVA高並發集合詳解


 

 

  •  Queue(隊列)
    主要是為了高並發准備的容器
    Deque:雙端隊列,可以反方向裝或者取
  • 最開始jdk1.0只有Vector和hashtable 默認所有方法都實現了synchronized鎖,線程安全但性能比較差,因此后續SUN意識到這個問題之后加了完全沒加鎖的hashmap,但是由於Hashmap完全沒鎖,SUN又想到能不能讓Hashmap在有鎖的時候用呢,此時添加了Collection,里面有一個Collection.synchronizedMap(new HashMap()),將Hashmap變成了加鎖的版本,里面鎖的粒度變小了,能部分提高性能,在JUC之后,新出來了ConcurrentHashtable由於Hashtable和Vectoy是古老的版本帶過來的,所以現在基本不用,知道就行。
    1、HashTable性能測試
    public class Constants {
        public static final int COUNT = 1000000;
        public static final int THREAD_COUNT = 100;
    }
    public class T01_TestHashtable {
        static Hashtable<UUID, UUID> m = new Hashtable<>();
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
            public MyThread(int start) {
                this.start = start;
            }
            @Override
            public void run() {
                for(int i=start; i<start+gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
        }
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            Thread[] threads = new Thread[THREAD_COUNT];
            for(int i=0; i<threads.length; i++) {
                threads[i] =
                new MyThread(i * (count/THREAD_COUNT));
            }
            for(Thread t : threads) {
                t.start();
            }
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            System.out.println(m.size());
            //-----------------------------------
            start = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
            for(Thread t : threads) {
                t.start();
            }
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }

    相對來說,插入的時候ConcurrentHashMap相對於HashMap效率沒有明顯提高,但是讀取的時候效率高很多。



  • ConcurrentSkipListMap 跳表結構,由於CAS在Tree這種結構上操作太復雜,然后冒出了跳表這種結構,底層時鏈表,為了查找效率,提取關鍵元素,制作新的鏈表,找的時候先找11->78,發現元素比78小,則與11比較,如果大,則在11-78中找,明顯效率提高了。
    跳表:
  • 多線程盡量使用Queue,少用List、Set
  • CopyOnWrite,寫時復制,讀的時候不加鎖,寫的時候加鎖,原理是:會在原來基礎上Copy一個出來數組,在新的數組上寫,寫完后將引用指向新數組,過程中讀的時候讀的是原來數組,當寫的過程結束之后讀得是新數組,寫時復制
  • CopyOnWriteArrayList,寫(添加)的時候是加了鎖的,里面先獲取原數組的長度,創建新數組長度為原數組+1,讀(get)的時候是加了鎖的
    public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }
        public E get(int index) {
            return get(getArray(), index);
        }

     

  • BlockingQueue  為線程池做准備,重點在blocking上,阻塞隊列,有很多對多線程友好的方法。
    1、常用的一共三種,分別是無界的LinkedBlockingQueue(表示用Linked實現是,無界,除非內存溢出,添加方法put的區別是,put方法非要添加,如果滿了就阻塞住,取方法    take(),非要取,如果沒有了就阻塞,阻塞的原理就是實現了park(),線程阻塞住,進入wait狀態)、
    2、有界的ArrayBlockingQueue(表示用Array實現是,有界,除非內存溢出,滿了之后程序會阻塞住,等消費者)、
    3、DelayQueue(實現時間上的排序)、
    4、synchronusQueue(實現線程中間傳輸內容、任務)、
    5、TransferQueue(是前面幾種的組合,也可以傳輸任務,並且不是傳遞一個,可以傳遞多個)
  • ConcurrentLinkedQueue
    里面一些友好的方法添加 offer()(返回值是Boolean類型,true表示成功,)、peak()(取)
  • Queue 與 List的區別,主要在於Queue添加了很多對線程友好的API,比如offer peek poll
    這其中一個子類BlockingQueue在上面(offer peek poll)的基礎上添加了put take ->主要是實現了阻塞,可以自然而然的實現任務隊列,也就是生產者、消費者模型,這是多線程中最重要的模型,是MQ的基礎(必問內容)
  • DelayQueue,可以實現等待時間,類實現Delayed接口,同時設置運行時間runningtime,時間等待越短的優先運行,實現Compare接口,重寫方法來確定任務執行隊列
    按時間進行任務調度。本質是priorityQueue。
        static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    
        static Random r = new Random();
        
        static class MyTask implements Delayed {
            String name;
            long runningTime;
            
            MyTask(String name, long rt) {
                this.name = name;
                this.runningTime = rt;
            }
    
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                    return -1;
                else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                    return 1;
                else 
                    return 0;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            
            
            @Override
            public String toString() {
                return name + " " + runningTime;
            }
        }

     main方法:

    public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask("t1", now + 1000);
            MyTask t2 = new MyTask("t2", now + 2000);
            MyTask t3 = new MyTask("t3", now + 1500);
            MyTask t4 = new MyTask("t4", now + 2500);
            MyTask t5 = new MyTask("t5", now + 500);
            
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
            
            System.out.println(tasks);
            
            for(int i=0; i<5; i++) {
                System.out.println(tasks.take());
            }
        }

    結果:

    t5 1587472415617
    t1 1587472416117
    t3 1587472416617
    t2 1587472417117
    t4 1587472417617
  • PriorityQueue,會默認對任務排序,最小最優先,里面是小頂堆
    Demo:
    public class T07_01_PriorityQueque {
        public static void main(String[] args) {
            PriorityQueue<String> q = new PriorityQueue<>();
            q.add("c");
            q.add("e");
            q.add("a");
            q.add("d");
            q.add("z");
            for (int i = 0; i < 5; i++) {
                System.out.println(q.poll());
            }
        }
    }
    //打印結果:a、c、d、e、z
  • synchronusQueue,容量為0,不是用來裝東西,主要是用來一個線程給另一個線程下達任務,與之前的exchanger容器類似,不可以用add方法,里面容器為0,不可以裝東西。應用場景是:做某件事需要等待結果完成才繼續執行接下來的任務。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM