一個java 8的stream是由三部分組成的。數據源,零個或一個或多個中間操作,一個或零個終止操作。
中間操作是對數據的加工,注意,中間操作是lazy操作,並不會立馬啟動,需要等待終止操作才會執行。
終止操作是stream的啟動操作,只有加上終止操作,stream才會真正的開始執行。
lambda實現惰性求值
什么是惰性求值(惰性計算)
在編程語言理論中,惰性求值(英語:Lazy Evaluation),又譯為惰性計算、懶惰求值,也稱為傳需求調用(call-by-need),是一個計算機編程中的一個概念,它的目的是要最小化計算機要做的工作。它有兩個相關而又有區別的含意,可以表示為“延遲求值”和“最小化求值”,本條目專注前者,后者請參見最小化計算條目。除可以得到性能的提升外,惰性計算的最重要的好處是它可以構造一個無限的數據類型。
惰性求值的相反是及早求值(熱情求值),這是在大多數編程語言中隨處可見的一種計算方式,例如:
int x = 1; String name = getUserName();
上面的表達式在綁定了變量后就立即求值,得到計算的結果。
Java中的惰性求值
以下Java代碼就是惰性求值的范例。這段代碼在定義 nameStream 這個流的時候,System.out.println 語句不會被立即執行。
public static void main(String[] args) { // 定義流 Stream<String> nameStream = Stream.of("Zebe", "July", "Yaha").filter(name -> { if (!name.isEmpty()) { System.out.println("過濾流,當前名稱:" + name); return true; } return false; }); // 取出流的值,這時候才會調用計算 List<String> names1 = nameStream.collect(Collectors.toList()); // 流只能被使用一次,下面這行代碼會報錯,提示流已經被操作或者關閉了 List<String> names2 = nameStream.collect(Collectors.toList()); }
在jdk8的stream流編程里面,沒有調用最終操作的時候,中間操作的方法都不會執行,這也是惰性求值。
stream流編程
stream編程主要是學習API的使用,但前提是學好lambda,基礎好了,看這些方法定義非常簡單,要是沒有打好基礎,你會有很多東西需要記憶。
內部迭代和外部迭代
一般來說,我們之前的編碼方法,叫外部迭代,stream的寫法叫內部迭代。內部迭代代碼更加可讀更加優雅,關注點是做什么(外部迭代關注是怎么樣做),也很容易讓我們養成編程小函數的好習慣!這點在編程習慣里面非常重要!看例子:
package com.dxz.stream; import java.util.stream.IntStream; public class StreamDemo1 { public static void main(String[] args) { int[] nums = { 1, 2, 3 }; // 外部迭代 int sum = 0; for (int i : nums) { sum += i; } System.out.println("結果1為:" + sum); // 使用stream的內部迭代 // map就是中間操作(返回stream的操作) // sum就是終止操作 int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum(); System.out.println("結果2為:" + sum2); System.out.println("惰性求值就是終止沒有調用的情況下,中間操作不會執行,下面的不會打印了"); IntStream.of(nums).map(StreamDemo1::doubleNum); } public static int doubleNum(int i) { System.out.println("執行了乘以2"); return i * 2; } }
結果:
結果1為:6 執行了乘以2 執行了乘以2 執行了乘以2 結果2為:12 惰性求值就是終止沒有調用的情況下,中間操作不會執行,下面的不會打印了
操作類型
操作類型概念要理清楚。有幾個維度。
首先分為 中間操作 和 最終操作,在最終操作沒有調用的情況下,所有的中級操作都不會執行。那么那些是中間操作那些是最終操作呢? 簡單來說,返回stream流的就是中間操作,可以繼續鏈式調用下去,不是返回stream的就是最終操作。這點很好理解。
最終操作里面分為短路操作和非短路操作,短路操作就是limit/findxxx/xxxMatch這種,就是找了符合條件的就終止,其他的就是非短路操作。在無限流里面需要調用短路操作,否則像炫邁口香糖一樣根本停不下來!
中間操作又分為 有狀態操作 和 無狀態操作,怎么樣區分呢? 一開始很多同學需要死記硬背,其實你主要掌握了狀態這個關鍵字就不需要死記硬背。狀態就是和其他數據有關系。我們可以看方法的參數,如果是一個參數的,就是無狀態操作,因為只和自己有關,其他的就是有狀態操作。如map/filter方法,只有一個參數就是自己,就是無狀態操作;而distinct/sorted就是有狀態操作,因為去重和排序都需要和其他數據比較,理解了這點,就不需要死記硬背了!
為什么要知道有狀態和無狀態操作呢?在多個操作的時候,我們需要把無狀態操作寫在一起,有狀態操作放到最后,這樣效率會更加高。
運行機制
我們可以通過下面的代碼來理解stream的運行機制
package com.dxz.stream; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; /** * 驗證stream運行機制 * * 1. 所有操作是鏈式調用, 一個元素只迭代一次 * 2. 每一個中間操作返回一個新的流. 流里面有一個屬性sourceStage指向同一個 地方,就是Head * 3. Head->nextStage->nextStage->... -> null * 4. 有狀態操作會把無狀態操作階段,單獨處理 * 5. 並行環境下, 有狀態的中間操作不一定能並行操作. * * 6. parallel/ sequetial 這2個操作也是中間操作(也是返回stream) * 但是他們不創建流, 他們只修改 Head的並行標志 * */ public class RunStream { public static void main(String[] args) { Random random = new Random(); // 隨機產生數據 Stream<Integer> stream = Stream.generate(() -> random.nextInt()) // 產生500個 ( 無限流需要短路操作. ) .limit(50) // 第1個無狀態操作 .peek(s -> print("peek相當於debug操作: " + s)) // 第2個無狀態操作,大於1000000的值留下 .filter(s -> { print("filter: " + s); return s > 1000000; }) // 有狀態操作 .sorted((i1, i2) -> { print("排序: " + i1 + ", " + i2); return i1.compareTo(i2); }) // 又一個無狀態操作 .peek(s -> { print("peek相當於debug操作2: " + s); }).parallel(); // 終止操作 long count = stream.count(); System.out.println("end=" + count); } /** * 打印日志並sleep 5 毫秒 * * @param s */ public static void print(String s) { // System.out.println(s); // 帶線程名(測試並行情況) System.out.println(Thread.currentThread().getName() + " > " + s); /*try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { }*/ } }
結果:
ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 467408314 ForkJoinPool.commonPool-worker-1 > filter: 467408314 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -694002895 ForkJoinPool.commonPool-worker-1 > filter: -694002895 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1018018271 ForkJoinPool.commonPool-worker-1 > filter: 1018018271 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2055711792 ForkJoinPool.commonPool-worker-1 > filter: -2055711792 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 362753392 ForkJoinPool.commonPool-worker-1 > filter: 362753392 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1420256006 ForkJoinPool.commonPool-worker-1 > filter: 1420256006 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1759751716 ForkJoinPool.commonPool-worker-1 > filter: 1759751716 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1451030142 ForkJoinPool.commonPool-worker-1 > filter: -1451030142 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1091423344 ForkJoinPool.commonPool-worker-1 > filter: -1091423344 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1377817193 ForkJoinPool.commonPool-worker-1 > filter: 1377817193 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1159545806 ForkJoinPool.commonPool-worker-1 > filter: -1159545806 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -173182413 ForkJoinPool.commonPool-worker-1 > filter: -173182413 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 600209961 ForkJoinPool.commonPool-worker-1 > filter: 600209961 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1200504990 ForkJoinPool.commonPool-worker-1 > filter: 1200504990 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 260857935 ForkJoinPool.commonPool-worker-1 > filter: 260857935 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2078810651 ForkJoinPool.commonPool-worker-1 > filter: -2078810651 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 402189452 ForkJoinPool.commonPool-worker-1 > filter: 402189452 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -946047183 ForkJoinPool.commonPool-worker-1 > filter: -946047183 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1516256407 ForkJoinPool.commonPool-worker-1 > filter: -1516256407 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -161522096 ForkJoinPool.commonPool-worker-1 > filter: -161522096 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 346222819 ForkJoinPool.commonPool-worker-1 > filter: 346222819 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1492195164 ForkJoinPool.commonPool-worker-1 > filter: -1492195164 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -823860607 ForkJoinPool.commonPool-worker-1 > filter: -823860607 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1400321699 ForkJoinPool.commonPool-worker-1 > filter: -1400321699 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -594055512 ForkJoinPool.commonPool-worker-1 > filter: -594055512 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -968008982 ForkJoinPool.commonPool-worker-1 > filter: -968008982 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -538739410 ForkJoinPool.commonPool-worker-1 > filter: -538739410 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1403417684 ForkJoinPool.commonPool-worker-1 > filter: -1403417684 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -938859220 ForkJoinPool.commonPool-worker-1 > filter: -938859220 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1962390769 ForkJoinPool.commonPool-worker-1 > filter: -1962390769 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1952141841 ForkJoinPool.commonPool-worker-1 > filter: 1952141841 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 68659719 ForkJoinPool.commonPool-worker-1 > filter: 68659719 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -397822084 ForkJoinPool.commonPool-worker-1 > filter: -397822084 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1047484685 ForkJoinPool.commonPool-worker-1 > filter: 1047484685 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 2051123152 ForkJoinPool.commonPool-worker-1 > filter: 2051123152 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1789203084 ForkJoinPool.commonPool-worker-1 > filter: 1789203084 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1447008850 ForkJoinPool.commonPool-worker-1 > filter: -1447008850 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1982526816 ForkJoinPool.commonPool-worker-1 > filter: 1982526816 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 432837685 ForkJoinPool.commonPool-worker-1 > filter: 432837685 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 2139658303 ForkJoinPool.commonPool-worker-1 > filter: 2139658303 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1386330515 ForkJoinPool.commonPool-worker-1 > filter: 1386330515 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 5070967 ForkJoinPool.commonPool-worker-1 > filter: 5070967 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -2131175714 ForkJoinPool.commonPool-worker-1 > filter: -2131175714 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 390402009 ForkJoinPool.commonPool-worker-1 > filter: 390402009 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1096668893 ForkJoinPool.commonPool-worker-1 > filter: -1096668893 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -929907211 ForkJoinPool.commonPool-worker-1 > filter: -929907211 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1859217343 ForkJoinPool.commonPool-worker-1 > filter: 1859217343 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -1526881589 ForkJoinPool.commonPool-worker-1 > filter: -1526881589 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: 1122349191 ForkJoinPool.commonPool-worker-1 > filter: 1122349191 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作: -882024070 ForkJoinPool.commonPool-worker-1 > filter: -882024070 main > 排序: 1018018271, 467408314 main > 排序: 362753392, 1018018271 main > 排序: 362753392, 1018018271 main > 排序: 362753392, 467408314 main > 排序: 1420256006, 467408314 main > 排序: 1420256006, 1018018271 main > 排序: 1759751716, 1018018271 main > 排序: 1759751716, 1420256006 main > 排序: 1377817193, 1018018271 main > 排序: 1377817193, 1759751716 main > 排序: 1377817193, 1420256006 main > 排序: 600209961, 1377817193 main > 排序: 600209961, 467408314 main > 排序: 600209961, 1018018271 main > 排序: 1200504990, 1018018271 main > 排序: 1200504990, 1420256006 main > 排序: 1200504990, 1377817193 main > 排序: 260857935, 1200504990 main > 排序: 260857935, 600209961 main > 排序: 260857935, 467408314 main > 排序: 260857935, 362753392 main > 排序: 402189452, 1018018271 main > 排序: 402189452, 467408314 main > 排序: 402189452, 362753392 main > 排序: 346222819, 1018018271 main > 排序: 346222819, 402189452 main > 排序: 346222819, 362753392 main > 排序: 346222819, 260857935 main > 排序: 1952141841, 600209961 main > 排序: 1952141841, 1377817193 main > 排序: 1952141841, 1759751716 main > 排序: 68659719, 1018018271 main > 排序: 68659719, 402189452 main > 排序: 68659719, 346222819 main > 排序: 68659719, 260857935 main > 排序: 1047484685, 600209961 main > 排序: 1047484685, 1420256006 main > 排序: 1047484685, 1200504990 main > 排序: 1047484685, 1018018271 main > 排序: 2051123152, 1018018271 main > 排序: 2051123152, 1420256006 main > 排序: 2051123152, 1952141841 main > 排序: 1789203084, 1018018271 main > 排序: 1789203084, 1420256006 main > 排序: 1789203084, 1952141841 main > 排序: 1789203084, 1759751716 main > 排序: 1982526816, 1047484685 main > 排序: 1982526816, 1759751716 main > 排序: 1982526816, 1952141841 main > 排序: 1982526816, 2051123152 main > 排序: 432837685, 1047484685 main > 排序: 432837685, 402189452 main > 排序: 432837685, 600209961 main > 排序: 432837685, 467408314 main > 排序: 2139658303, 1047484685 main > 排序: 2139658303, 1789203084 main > 排序: 2139658303, 1982526816 main > 排序: 2139658303, 2051123152 main > 排序: 1386330515, 1047484685 main > 排序: 1386330515, 1789203084 main > 排序: 1386330515, 1420256006 main > 排序: 1386330515, 1377817193 main > 排序: 5070967, 1200504990 main > 排序: 5070967, 432837685 main > 排序: 5070967, 346222819 main > 排序: 5070967, 260857935 main > 排序: 5070967, 68659719 main > 排序: 390402009, 1047484685 main > 排序: 390402009, 402189452 main > 排序: 390402009, 260857935 main > 排序: 390402009, 362753392 main > 排序: 1859217343, 1047484685 main > 排序: 1859217343, 1789203084 main > 排序: 1859217343, 2051123152 main > 排序: 1859217343, 1982526816 main > 排序: 1859217343, 1952141841 main > 排序: 1122349191, 1047484685 main > 排序: 1122349191, 1789203084 main > 排序: 1122349191, 1386330515 main > 排序: 1122349191, 1377817193 main > 排序: 1122349191, 1200504990 main > peek相當於debug操作2: 1386330515 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1200504990 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1377817193 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1982526816 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 2051123152 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 2139658303 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1122349191 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 1789203084 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 432837685 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 467408314 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 402189452 main > peek相當於debug操作2: 1420256006 main > peek相當於debug操作2: 1759751716 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 1018018271 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 1047484685 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 346222819 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 362753392 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 390402009 ForkJoinPool.commonPool-worker-3 > peek相當於debug操作2: 5070967 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1859217343 ForkJoinPool.commonPool-worker-1 > peek相當於debug操作2: 1952141841 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 68659719 ForkJoinPool.commonPool-worker-2 > peek相當於debug操作2: 260857935 main > peek相當於debug操作2: 600209961 end=24
大家自己測試一下代碼,能發現stream的調用方法,就像現實中的流水線一樣,一個元素只會迭代一次,但如果中間有無狀態操作,前后的操作會單獨處理(元素就會被多次迭代)。
jdk9的響應式流
就是reactive stream,也就是flow。其實和jdk8的stream沒有一點關系。說白了就一個發布-訂閱模式,一共只有4個接口,3個對象,非常簡單清晰。寫一個入門例子就可以掌握。
package jdk9; import java.util.concurrent.Flow.Processor; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.SubmissionPublisher; /** * 帶 process 的 flow demo */ /** * Processor, 需要繼承SubmissionPublisher並實現Processor接口 * * 輸入源數據 integer, 過濾掉小於0的, 然后轉換成字符串發布出去 */ class MyProcessor extends SubmissionPublisher<String> implements Processor<Integer, String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { // 保存訂閱關系, 需要用它來給發布者響應 this.subscription = subscription; // 請求一個數據 this.subscription.request(1); } @Override public void onNext(Integer item) { // 接受到一個數據, 處理 System.out.println("處理器接受到數據: " + item); // 過濾掉小於0的, 然后發布出去 if (item > 0) { this.submit("轉換后的數據:" + item); } // 處理完調用request再請求一個數據 this.subscription.request(1); // 或者 已經達到了目標, 調用cancel告訴發布者不再接受數據了 // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { // 出現了異常(例如處理數據的時候產生了異常) throwable.printStackTrace(); // 我們可以告訴發布者, 后面不接受數據了 this.subscription.cancel(); } @Override public void onComplete() { // 全部數據處理完了(發布者關閉了) System.out.println("處理器處理完了!"); // 關閉發布者 this.close(); } 