Java編程的邏輯 (92) - 函數式數據處理 (上)


本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》,由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買,京東自營鏈接http://item.jd.com/12299018.html


上節我們介紹了Lambda表達式和函數式接口,本節探討它們的應用,函數式數據處理,針對常見的集合數據處理,Java 8引入了一套新的類庫,位於包java.util.stream下,稱之為Stream API,這套API操作數據的思路,不同於我們在38節55節介紹的容器類API,它們是函數式的,非常簡潔、靈活、易讀,具體有什么不同呢?由於內容較多,我們分為兩節來介紹,本節先介紹一些基本的API,下節討論一些高級功能。

基本概念

接口Stream類似於一個迭代器,但提供了更為豐富的操作,Stream API的主要操作就定義在該接口中。 Java 8給Collection接口增加了兩個默認方法,它們可以返回一個Stream,如下所示:

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

stream()返回的是一個順序流,parallelStream()返回的是一個並行。順序流就是由一個線程執行操作。而並行流背后可能有多個線程並行執行,與之前介紹的並發技術不同,使用並行流不需要顯式管理線程,使用方法與順序流是一樣的。

下面,我們主要針對順序流,學習Stream接口,包括其用法和基本原理,隨后我們再介紹下並行流。先來看一些簡單的示例。

基本示例

上節演示時使用了學生類Student和學生列表List<Student> lists,本節繼續使用它們。

基本過濾

返回學生列表中90分以上的,傳統上的代碼一般是這樣的:

List<Student> above90List = new ArrayList<>();
for (Student t : students) {
    if (t.getScore() > 90) {
        above90List.add(t);
    }
}

使用Stream API,代碼可以這樣:

List<Student> above90List = students.stream()
        .filter(t->t.getScore()>90)
        .collect(Collectors.toList());

先通過stream()得到一個Stream對象,然后調用Stream上的方法,filter()過濾得到90分以上的,它的返回值依然是一個Stream,為了轉換為List,調用了collect方法並傳遞了一個Collectors.toList(),表示將結果收集到一個List中。

代碼更為簡潔易讀了,這種數據處理方式被稱為函數式數據處理,與傳統代碼相比,它的特點是:

  • 沒有顯式的循環迭代,循環過程被Stream的方法隱藏了
  • 提供了聲明式的處理函數,比如filter,它封裝了數據過濾的功能,而傳統代碼是命令式的,需要一步步的操作指令
  • 流暢式接口,方法調用鏈接在一起,清晰易讀

基本轉換

根據學生列表返回名稱列表,傳統上的代碼一般是這樣:

List<String> nameList = new ArrayList<>(students.size());
for (Student t : students) {
    nameList.add(t.getName());
}

使用Stream API,代碼可以這樣:

List<String> nameList = students.stream()
        .map(Student::getName)
        .collect(Collectors.toList());

這里使用了Stream的map函數,它的參數是一個Function函數式接口,這里傳遞了方法引用。       

基本的過濾和轉換組合

返回90分以上的學生名稱列表,傳統上的代碼一般是這樣:

List<String> nameList = new ArrayList<>();
for (Student t : students) {
    if (t.getScore() > 90) {
        nameList.add(t.getName());
    }
}

使用函數式數據處理的思路,可以將這個問題分解為由兩個基本函數實現:

  1. 過濾:得到90分以上的學生列表
  2. 轉換:將學生列表轉換為名稱列表

使用Stream API,可以將基本函數filter()和map()結合起來,代碼可以這樣:

List<String> above90Names = students.stream()
        .filter(t->t.getScore()>90)
        .map(Student::getName)
        .collect(Collectors.toList());

這種組合利用基本函數、聲明式實現集合數據處理功能的編程風格,就是函數式數據處理。

代碼更為直觀易讀了,但你可能會擔心它的性能有問題。filter()和map()都需要對流中的每個元素操作一次,一起使用會不會就需要遍歷兩次呢?答案是否定的,只需要一次。實際上,調用filter()和map()都不會執行任何實際的操作,它們只是在構建操作的流水線,調用collect才會觸發實際的遍歷執行,在一次遍歷中完成過濾、轉換以及收集結果的任務。

像filter和map這種不實際觸發執行、用於構建流水線、返回Stream的操作被稱為中間操作(intermediate operation),而像collect這種觸發實際執行、返回具體結果的操作被稱為終端操作(terminal operation)。Stream API中還有更多的中間和終端操作,下面我們具體來看下。

中間操作

除了filter和map,Stream API的中間操作還有distinct, sorted, skip, limit, peek, mapToLong, mapToInt, mapToDouble, flatMap等,我們逐個來看下。

distinct

distinct返回一個新的Stream,過濾重復的元素,只留下唯一的元素,是否重復是根據equals方法來比較的,distinct可以與其他函數如filter, map結合使用。

比如,返回字符串列表中長度小於3的字符串、轉換為小寫、只保留唯一的,代碼可以為:

List<String> list = Arrays.asList(new String[]{"abc","def","hello","Abc"});
List<String> retList = list.stream()
        .filter(s->s.length()<=3)
        .map(String::toLowerCase)
        .distinct()
        .collect(Collectors.toList());
System.out.println(retList);

輸出為:

[abc, def]

雖然都是中間操作,但distinct與filter和map是不同的,filter和map都是無狀態的,對於流中的每一個元素,它的處理都是獨立的,處理后即交給流水線中的下一個操作,但distinct不同,它是有狀態的,在處理過程中,它需要在內部記錄之前出現過的元素,如果已經出現過,即重復元素,它就會過濾掉,不傳遞給流水線中的下一個操作。

對於順序流,內部實現時,distinct操作會使用HashSet記錄出現過的元素,如果流是有順序的,需要保留順序,會使用LinkedHashSet

sorted

有兩個sorted方法:

Stream<T> sorted()
Stream<T> sorted(Comparator<? super T> comparator)

它們都對流中的元素排序,都返回一個排序后的Stream,第一個方法假定元素實現了Comparable接口,第二個方法接受一個自定義的Comparator。

比如,過濾得到90分以上的學生,然后按分數從高到低排序,分數一樣的,按名稱排序,代碼可以為:

List<Student> list = students.stream()
        .filter(t->t.getScore()>90)
        .sorted(Comparator.comparing(Student::getScore)
                .reversed()
                .thenComparing(Student::getName))
        .collect(Collectors.toList());

這里,使用了Comparator的comparing, reversed和thenComparing構建了Comparator。

與distinct一樣,sorted也是一個有狀態的中間操作,在處理過程中,需要在內部記錄出現過的元素,與distinct不同的是,每碰到流中的一個元素,distinct都能立即做出處理,要么過濾,要么馬上傳遞給下一個操作,但sorted不能,它需要先排序,為了排序,它需要先在內部數組中保存碰到的每一個元素,到流結尾時,再對數組排序,然后再將排序后的元素逐個傳遞給流水線中的下一個操作。

skip/limit

它們的定義為:

Stream<T> skip(long n)
Stream<T> limit(long maxSize)

skip跳過流中的n個元素,如果流中元素不足n個,返回一個空流,limit限制流的長度為maxSize。

比如,將學生列表按照分數排序,返回第3名到第5名,代碼可以為:

List<Student> list = students.stream()
        .sorted(Comparator.comparing(
            Student::getScore).reversed())
        .skip(2)
        .limit(3)
        .collect(Collectors.toList());

skip和limit都是有狀態的中間操作。對前n個元素,skip的操作就是過濾,對后面的元素,skip就是傳遞給流水線中的下一個操作。limit的一個特點是,它不需要處理流中的所有元素,只要處理的元素個數達到maxSize,后面的元素就不需要處理了,這種可以提前結束的操作被稱為短路操作。

peek

peek的定義為:

Stream<T> peek(Consumer<? super T> action)

它返回的流與之前的流是一樣的,沒有變化,但它提供了一個Consumer,會將流中的每一個元素傳給該Consumer。這個方法的主要目的是支持調試,可以使用該方法觀察在流水線中流轉的元素,比如:

List<String> above90Names = students.stream()
        .filter(t->t.getScore()>90)
        .peek(System.out::println)
        .map(Student::getName)
        .collect(Collectors.toList());

mapToLong/mapToInt/mapToDouble

map函數接受的參數是一個Function<T, R>,為避免裝箱/拆箱,提高性能,Stream還有如下返回基本類型特定流的方法:

DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper)
LongStream mapToLong(ToLongFunction<? super T> mapper)

DoubleStream/IntStream/LongStream是基本類型特定的流,有一些專門的更為高效的方法。比如,求學生列表的分數總和,代碼可以為:

double sum = students.stream()
        .mapToDouble(Student::getScore)
        .sum();

flatMap                

flatMap的定義為:

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)

它接受一個函數mapper,對流中的每一個元素,mapper會將該元素轉換為一個流Stream,然后把新生成流的每一個元素傳遞給下一個操作。比如:

List<String> lines = Arrays.asList(new String[]{
        "hello abc",
        "老馬  編程"
    });
List<String> words = lines.stream()
        .flatMap(line -> Arrays.stream(line.split("\\s+")))
        .collect(Collectors.toList());
System.out.println(words);

這里的mapper將一行字符串按空白符分隔為了一個單詞流,Arrays.stream可以將一個數組轉換為一個流,輸出為:

[hello, abc, 老馬, 編程]

可以看出,實際上,flatMap完成了一個1到n的映射。

針對基本類型,flatMap還有如下類似方法:

DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper)
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper)
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper)

終端操作

中間操作不觸發實際的執行,返回值是Stream,而終端操作觸發執行,返回一個具體的值,除了collect,Stream API的終端操作還有max, min, count, allMatch, anyMatch, noneMatch, findFirst, findAny, forEach, toArray, reduce等,我們逐個來看下。

max/min

max/min的定義為:

Optional<T> max(Comparator<? super T> comparator)
Optional<T> min(Comparator<? super T> comparator)

它們返回流中的最大值/最小值,值的注意的是,它的返回值類型是Optional<T>,而不是T。

java.util.Optional是Java 8引入的一個新類,它是一個泛型容器類,內部只有一個類型為T的單一變量value,可能為null,也可能不為null。Optional有什么用呢?它用於准確地傳遞程序的語義,它清楚地表明,其代表的值可能為null,程序員應該進行適當的處理。

Optional定義了一些方法,比如:

// value不為null時返回true
public boolean isPresent()
// 返回實際的值,如果為null,拋出異常NoSuchElementException
public T get()
// 如果value不為null,返回value,否則返回other
public T orElse(T other)
// 構建一個空的Optional,value為null
public static<T> Optional<T> empty()
// 構建一個非空的Optional, 參數value不能為null
public static <T> Optional<T> of(T value)
// 構建一個Optional,參數value可以為null,也可以不為null
public static <T> Optional<T> ofNullable(T value)

在max/min的例子中,通過聲明返回值為Optional,我們就知道,具體的返回值不一定存在,這發生在流中不含任何元素的情況下。

看個簡單的例子,返回分數最高的學生,代碼可以為:

Student student = students.stream()
        .max(Comparator.comparing(Student::getScore).reversed())
        .get();

這里,假定students不為空。

count

count很簡單,就是返回流中元素的個數。比如,統計大於90分的學生個數,代碼可以為:

long above90Count = students.stream()
        .filter(t->t.getScore()>90)
        .count();

allMatch/anyMatch/noneMatch

這幾個函數都接受一個謂詞Predicate,返回一個boolean值,用於判定流中的元素是否滿足一定的條件,它們的區別是:

  • allMatch: 只有在流中所有元素都滿足條件的情況下才返回true
  • anyMatch: 只要流中有一個元素滿足條件就返回true
  • noneMatch: 只有流中所有元素都不滿足條件才返回true

如果流為空,這幾個函數的返回值都是true。

比如,判斷是不是所有學生都及格了(不小於60分),代碼可以為:

boolean allPass = students.stream()
        .allMatch(t->t.getScore()>=60);

這幾個操作都是短路操作,都不一定需要處理所有元素就能得出結果,比如,對於allMatch,只要有一個元素不滿足條件,就能返回false。

findFirst/findAny

它們的定義為:

Optional<T> findFirst()
Optional<T> findAny()

它們的返回類型都是Optional,如果流為空,返回Optional.empty()。findFirst返回第一個元素,而findAny返回任一元素,它們都是短路操作。

隨便找一個不及格的學生,代碼可以為:

Optional<Student> student = students.stream()
        .filter(t->t.getScore()<60)
        .findAny();
if(student.isPresent()){
    // 不及格的學生....
}

forEach

有兩個foreach方法:

void forEach(Consumer<? super T> action)
void forEachOrdered(Consumer<? super T> action)

它們都接受一個Consumer,對流中的每一個元素,傳遞元素給Consumer,區別在於,在並行流中,forEach不保證處理的順序,而forEachOrdered會保證按照流中元素的出現順序進行處理。

比如,逐行打印大於90分的學生,代碼可以為:

students.stream()
        .filter(t->t.getScore()>90)
        .forEach(System.out::println);

toArray

toArray將流轉換為數組,有兩個方法:

Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)

不帶參數的toArray返回的數組類型為Object[],這經常不是期望的結果,如果希望得到正確類型的數組,需要傳遞一個類型為IntFunction的generator,IntFunction的定義為:

public interface IntFunction<R> {
    R apply(int value);
}

generator接受的參數是流的元素個數,它應該返回對應大小的正確類型的數組。

比如,獲取90分以上的學生數組,代碼可以為:

Student[] above90Arr = students.stream()
        .filter(t->t.getScore()>90)
        .toArray(Student[]::new);

Student[]::new就是一個類型為IntFunction<Student[]>的generator。

reduce

reduce代表歸約或者叫折疊,它是max/min/count的更為通用的函數,將流中的元素歸約為一個值,有三個reduce函數: 

Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity,
    BiFunction<U, ? super T, U> accumulator,
    BinaryOperator<U> combiner); 

第一個基本等同於調用:

boolean foundAny = false;
T result = null;
for (T element : this stream) {
    if (!foundAny) {
        foundAny = true;
        result = element;
    }
    else
        result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();

比如,使用reduce求分數最高的學生,代碼可以為:

Student topStudent = students.stream().reduce((accu, t) -> {
    if (accu.getScore() >= t.getScore()) {
        return accu;
    } else {
        return t;
    }
}).get();

第二個reduce函數多了一個identity參數,表示初始值,它基本等同於調用:

T result = identity;
for (T element : this stream)
    result = accumulator.apply(result, element)
return result;

第一個和第二個reduce的返回類型只能是流中元素的類型,而第三個更為通用,它的歸約類型可以自定義,另外,它多了一個combiner參數,combiner用在並行流中,用於合並子線程的結果,對於順序流,它基本等同於調用:

U result = identity;
for (T element : this stream)
    result = accumulator.apply(result, element)
return result;

注意與第二個reduce函數相區分,它的結果類型不是T,而是U。比如,使用reduce函數計算學生分數的和,代碼可以為:

double sumScore = students.stream().reduce(0d,
        (sum, t) -> sum += t.getScore(),
        (sum1, sum2) -> sum1 += sum2
    );

以上,可以看出,reduce雖然更為通用,但比較費解,難以使用,一般情況,應該優先使用其他函數。

collect函數比reduce更為通用、強大和易用,關於它,我們下節再詳細介紹。

構建流

前面我們提到,可以通過Collection接口的stream/parallelStream獲取流,還有一些其他的方式可以獲取流。

Arrays有一些stream方法,可以將數組或子數組轉換為流,比如: 

public static IntStream stream(int[] array)
public static DoubleStream stream(double[] array, int startInclusive, int endExclusive)
public static <T> Stream<T> stream(T[] array)

比如,輸出當前目錄下所有普通文件的名字,代碼可以為:

File[] files = new File(".").listFiles();
Arrays.stream(files)
    .filter(File::isFile)    
    .map(File::getName)
    .forEach(System.out::println);

Stream也有一些靜態方法,可以構建流:

//返回一個空流
public static<T> Stream<T> empty()
//返回只包含一個元素t的流
public static<T> Stream<T> of(T t)
//返回包含多個元素values的流
public static<T> Stream<T> of(T... values)
//通過Supplier生成流,流的元素個數是無限的
public static<T> Stream<T> generate(Supplier<T> s)
//同樣生成無限流,第一個元素為seed,第二個為f(seed),第三個為f(f(seed)),依次類推
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)

比如,輸出10個隨機數,代碼可以為:

Stream.generate(()->Math.random())
    .limit(10)
    .forEach(System.out::println);

輸出100個遞增的奇數,代碼可以為:

Stream.iterate(1, t->t+2)
    .limit(100)
    .forEach(System.out::println);

並行流

前面我們主要使用的是Collection的stream()方法,換做parallelStream()方法,就會使用並行流,接口方法都是通用的。但並行流內部會使用多線程,線程個數一般與系統的CPU核數一樣,以充分利用CPU的計算能力。

進一步來說,並行流內部會使用Java 7引入的fork/join框架,簡單來說,處理由fork和join兩個階段組成,fork就是將要處理的數據拆分為小塊,多線程按小塊進行並行計算,join就是將小塊的計算結果進行合並,具體我們就不探討了。使用並行流,不需要任何線程管理的代碼,就能實現並行。

函數式數據處理思維

看的出來,使用Stream API處理數據集合,與直接使用容器類API處理數據的思路是完全不一樣的。

流定義了很多數據處理的基本函數,對於一個具體的數據處理問題,解決的主要思路就是組合利用這些基本函數,實現期望的功能,這種思路就是函數式數據處理思維,相比直接利用容器類API的命令式思維,思考的層次更高。

Stream API的這種思路也不是新發明,它與數據庫查詢語言SQL是很像的,都是聲明式地操作集合數據,很多函數都能在SQL中找到對應,比如filter對應SQL的where,sorted對應order by等。SQL一般都支持分組(group by)功能,Stream API也支持,但關於分組,我們下節再介紹。

Stream API也與各種基於Unix系統的管道命令類似,熟悉Unix系統的都知道,Unix有很多命令,大部分命令只是專注於完成一件事情,但可以通過管道的方式將多個命令鏈接起來,完成一些復雜的功能,比如:

cat nginx_access.log | awk '{print $1}' | sort | uniq -c | sort -rnk 1 | head -n 20

以上命令可以分析nginx訪問日志,統計出訪問次數最多的前20個IP地址及其訪問次數。具體來說,cat命令輸出nginx訪問日志到流,一行為一個元素,awk輸出行的第一列,這里為IP地址,sort按IP進行排序,"uniq -c"按IP統計計數,"sort -rnk 1"按計數從高到低排序,"head -n 20"輸出前20行。

小結

本節初步介紹了Java 8引入的函數式數據處理類庫,Stream API,它類似於Unix的管道命令,也類似於數據庫查詢語言SQL,通過組合利用基本函數,可以在更高的層次上思考問題,以聲明式的方式簡潔地實現期望的功能。

對於collect方法,本節只是演示了最基本的應用,它還有很多高級功能,比如實現類似SQL的group by功能,具體怎么實現?實現的原理是什么呢?

(與其他章節一樣,本節所有代碼位於 https://github.com/swiftma/program-logic,位於包shuo.laoma.java8.c92下)

----------------

未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM