【Java學習筆記】管道流


作者:gnuhpc
出處:http://www.cnblogs.com/gnuhpc/

1.引言

Java I/O系統是建立在數據流概念之上的,而在UNIX/Linux中有一個類似的概念,就是管道,它具有將一個程序的輸出當作另一個程序的輸入的能力。在Java中,可以使用管道流進行線程之間的通信,輸入流和輸出流必須相連接,這樣的通信有別於一般的Shared Data通信,其不需要一個共享的數據空間。

 

2.相關類及其關系

1)字節流:

分為管道輸出流(PipedOutputStream)和管道輸入流(PipedInputStream),利用 java.io.PipedOutputStream和java.io.PipedInputStream可以實現線程之間的二進制信息傳輸。如果要進行管道輸出,則必須把輸出流連在輸入流上。 java.io.PipedOutputStream是java.io.OutputStream的直接子類,而java.io. PipedInputStream是java.io.InputStream的直接子類。PipedOutputStream和 PipedInputStream往往成對出現、配合使用。舉例說明:

 

TestPipe.Java

import java.io.IOException;

public class TestPipe {

    public static void main(String[] args) {

        Send s = new Send();
        Receive r = new Receive();
        try {
            s.getPos().connect(r.getPis()); // 連接管道
        } catch (IOException e) {
            e.printStackTrace();
        }
        new Thread(s).start(); // 啟動線程
        new Thread(r).start(); // 啟動線程
    }
}

Receive.java

import java.io.IOException;
import java.io.PipedInputStream;

class Receive implements Runnable { // 實現Runnable接口
    private PipedInputStream pis = null;

    public Receive() {
        this.pis = new PipedInputStream(); // 實例化輸入流
    }

    public void run() {
        byte b[] = new byte[1024];
        int len = 0;
        try {
            len = this.pis.read(b); // 接收數據
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            this.pis.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("接收的內容為:" + new String(b, 0, len));
    }

    public PipedInputStream getPis() {
        return pis;
    }
}

Send.java

import java.io.IOException;
import java.io.PipedOutputStream;
class Send implements Runnable {
    // 實現Runnable接口
    private PipedOutputStream pos = null; // 管道輸出流

    public Send() {
        this.pos = new PipedOutputStream();// 實例化輸出流
    }

    public void run() {
        String str = "Hello World!!!";
        try {
            this.pos.write(str.getBytes()); // 輸出信息
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            this.pos.close(); // 關閉輸出流
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public PipedOutputStream getPos() { // 通過線程類得到輸出流
        return pos;
    }
}

我們可以看到使用管道流,通過connect方法進行連接,實現了Send線程和Receive線程之間的通信。

注意:

PipedInputStream中實際是用了一個1024字節固定大小的循環緩沖區。寫入PipedOutputStream的數據實際上保存到對應的 PipedInputStream的內部緩沖區。從PipedInputStream執行讀操作時,讀取的數據實際上來自這個內部緩沖區。如果對應的 PipedInputStream輸入緩沖區已滿,任何企圖寫入PipedOutputStream的線程都將被阻塞。而且這個寫操作線程將一直阻塞,直至出現讀取PipedInputStream的操作從緩沖區刪除數據。這也就是說往PipedOutputStream寫數據的線程Send若是和從PipedInputStream讀數據的線程Receive是同一個線程的話,那么一旦Send線程發送數據過多(大於1024字節),它就會被阻塞,這就直接導致接受數據的線程阻塞而無法工作(因為是同一個線程嘛),那么這就是一個典型的死鎖現象,這也就是為什么javadoc中關於這兩個類的使用時告訴大家要在多線程環境下使用的原因了。

JavaConsoleOutput_2

 

應用:過濾器模式

image

使用這個模式的典型例子是Unix的shell命令。這個模式的好處在於過濾器無需知道它與何種東西進行連接,並且這可以實現並行,而且系統的可擴展性可以根據添加刪除或者改變Filter進行增強。

在這舉一個不斷計算平均值的例子,producer作為前端的數據源,不斷產生隨機數,通過pipe進入filter進行數據處理,然后通過第二個pipe就行后端處理。

import java.util.*;
import java.io.*;

public class PipeTest
/* 建立3個線程(Producer、Filter、Consumer)類和兩組通信管道,通過多線程將管道1的數據傳送到管道2中,實現管道的通信。
* Producer => pout1->pin1 =>  Filter(pin1->pout2) => pout2->pin2 =>Consumer
*/
{
    public static void main(String args[]) {
        try {
            PipedOutputStream pout1 = new PipedOutputStream();
            PipedInputStream pin1 = new PipedInputStream(pout1);

            PipedOutputStream pout2 = new PipedOutputStream();
            PipedInputStream pin2 = new PipedInputStream(pout2);

            /* construct threads */

            Producer prod = new Producer(pout1);
            Filter filt = new Filter(pin1, pout2);
            Consumer cons = new Consumer(pin2);

            /* start threads */

            prod.start();
            filt.start();
            cons.start();
        } catch (IOException e) {
        }
    }
}

// 前端:該類的作用是產生隨機數,並將其放到管道1的輸出流中
class Producer extends Thread {
    private DataOutputStream out;// DataOutputStream是用於寫入一些基本類型數據的類,此類的實例用於生成偽隨機數流
    private Random rand = new Random();

    public Producer(OutputStream os) {
        out = new DataOutputStream(os);
    }

    public void run() {
        while (true) {
            try {
                double num = rand.nextDouble();
                // 將double值直接寫入流
                out.writeDouble(num);
                System.out.println("寫入流中的值是 :" + num);
                out.flush();
                sleep(Math.abs(rand.nextInt()%10));//隨機休眠一段時間
            } catch (Exception e) {
                System.out.println("Error:   " + e);
            }
        }
    }
}

// 過濾器,起數據處理作用,讀取管道1中輸入流的內容,並將其放到管道2的輸出流中
class Filter extends Thread {
    private DataInputStream in;
    private DataOutputStream out;
    private double total = 0;
    private int count = 0;

    public Filter(InputStream is, OutputStream os) {
        in = new DataInputStream(is);
        out = new DataOutputStream(os);
    }

    public void run() {
        for (;;) {
            try {
                double x = in.readDouble(); // 讀取流中的數據
                total += x;
                count++;
                if (count != 0) {
                    double d = total / count;
                    out.writeDouble(d); // 將得到的數據平均值寫入流
                }
            } catch (IOException e) {
                System.out.println("Error:   " + e);
            }
        }
    }
}

// 后端:讀取管道2輸入流的內容
class Consumer extends Thread {
    private double old_avg = 0;
    private DataInputStream in;

    public Consumer(InputStream is) {
        in = new DataInputStream(is);
    }

    public void run() {
        for (;;) {
            try {
                double avg = in.readDouble();
                if (Math.abs(avg - old_avg) > 0.01) {
                    System.out.println("現在的平均值是:   " + avg);
                    System.out.println();
                    old_avg = avg;
                }
            } catch (IOException e) {
                System.out.println("Error:   " + e);
            }
        }
    }

}

 

2)字符流

Java利用 java.io.PipedWriter和java.io.PipedReader在線程之間傳輸字符信息。與 java.io.PipedOutputStream和java.io.PipedInputStream類似,java.io.PipedWriter 是java.io.Writer的直接子類,java.io.PipedReader是java.io.Reader的直接子類。PipedWriter擁有一個允許指定輸入管道字符流的構造方法,而PipedReader擁有一個允許指定輸出管道字符流的構造方法。從而使得PipedWriter和PipedReader往往成對出現、配合使用。

 

以典型KWIC系統為例,下邊的代碼演示了如何使用字符流並且使用了過濾器模式:ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.StringTokenizer;

public class KwicPipe {
    public static void main(String[] args) {
        try {
            //get the input and output path
            String src = args[0];
            String dest = args[1];


            //(writeToShiftThread => readFromShiftThread) = Pipe1
            PipedReader readFromShiftThread = new PipedReader();
            PipedWriter writeToShiftThread = new PipedWriter(readFromShiftThread);


            //(writeToSortLinesThread => readFromSortLinesThread) = Pipe2
            PipedReader readFromSortLinesThread = new PipedReader();
            PipedWriter writeToSortLinesThread = new PipedWriter(readFromSortLinesThread);


            //ReadLineThread --Pipe1 --> ShiftThread -- Pipe2 --> SortLinesThread
            ReadLineThread R1 = new ReadLineThread(writeToShiftThread,src);
            ShiftThread R2 = new ShiftThread(readFromShiftThread,writeToSortLinesThread);
            SortLinesThread R3 = new SortLinesThread(readFromSortLinesThread,dest);


            //Start the three processing thread
            R1.start();
            R2.start();
            R3.start();
        }
        catch (IOException e) {
            System.out.println("NO I/O");
        }
    }
}

// read the content of kwici.dat and send the lines to another thread
class ReadLineThread extends Thread {
    PipedWriter PipeIn;
    String InputFilename= null;
    ReadLineThread(PipedWriter PlaceInPipe, String InputFilename) {
        PipeIn = PlaceInPipe;
        this.InputFilename = InputFilename;
    }
    private BufferedReader fileopen(String InputFilename) {
        BufferedReader input_file = null;
        try {
            input_file = new BufferedReader(new FileReader(InputFilename));
        } catch (IOException e) {
            System.err.println(("File not open" + e.toString()));
            System.exit(1);
        }
        return input_file;
    }
    public void run() {
        try {
            String Input;
            BufferedReader TheInput = fileopen(InputFilename);
            while ( (Input = TheInput.readLine()) != null) {
                System.out.println(Input);
                PipeIn.write(Input + "/n"); // Read from the file and then write to the pipe1
            }
        }
        catch (FileNotFoundException e) {
            System.out.println("NO FILE ");
        }
        catch (IOException e) {
            System.out.println("NO I/O");
        }
    }
}

// read the lines from ReadLineThread and shift them. Send all the shifted lines to SortLinesThread
class ShiftThread extends Thread {
    PipedReader PipeOut;
    PipedWriter PipeIn;
    ShiftThread(PipedReader ReadFromPipe, PipedWriter WriteToPipe) {
        PipeOut = ReadFromPipe;
        PipeIn = WriteToPipe;
    }
    public void run() {
        char[] cbuf = new char[80];
        int i, j;
        StringBuffer linebuff = new StringBuffer();
        try {
            // read from ReadLineThread
            i = PipeOut.read(cbuf, 0, 80);
            while (i != -1) {               
                for (j = 0; j < i; j++) {
                    //if new line
                    if (cbuf[j]=='/n'){
                        // When reach the end of line,shift it
                        shiftline(linebuff.toString());
                        // empty the buffer
                        linebuff.delete(0, linebuff.length());
                    }
                    else {
                        linebuff.append(cbuf[j]);
                    }
                }
                i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
            }
        }
        catch (FileNotFoundException e) {
            System.out.println("NO FILE ");
        }
        catch (IOException e) {
            System.out.println("NO I/O or end of stream (ShiftThread terminated)");
        }
        /* BECAUSE
         * If a thread was providing data characters to the connected piped output,
         * but the thread is no longer alive, then an IOException is thrown. (javadoc)
         */
    }
    private void shiftline( String line )
    {
        String onetoken = new String ();
        StringTokenizer tokens =
            new StringTokenizer( line );
        ArrayList<String> Tokens = new ArrayList<String> ();
        int count = tokens.countTokens();
        for ( int i = 0; i < count; i++)
        {
            onetoken = tokens.nextToken();
            if (!((onetoken.compareTo( "a" ) == 0) && (onetoken.compareTo( "an" ) == 0) && (onetoken.compareTo( "and" ) == 0) && (onetoken.compareTo( "the" ) == 0)))
            {
                Tokens.add(onetoken);
            }
        }
        for ( int tokencount = 0; tokencount < count; tokencount++ )
        {
            StringBuffer linebuffer = new StringBuffer ();
            int index = tokencount;
            for ( int i = 0; i< count; i++ )
            {
                if (index >= count)
                    index = 0;
                linebuffer.append ( Tokens.get(index)  );
                linebuffer.append (" ");
                index++;
            }  //for i
            line = linebuffer.toString();
            // send the line to the SortLinesThread
            try {
                PipeIn.write(line+ "/n");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }  // for token count
        return;
    }
}

class SortLinesThread extends Thread {
    PipedReader PipeOut;
    String OutputFilename;
    ArrayList<String>  KwicList = new ArrayList<String>();
    SortLinesThread(PipedReader ReadFromPipe, String OutputFilename) {
        PipeOut = ReadFromPipe;
        this.OutputFilename = OutputFilename;
    }
    public void run() {
        char[] cbuf = new char[80];
        int i, j;
        StringBuffer linebuff = new StringBuffer();
        try {
            // read from ShiftLineThread
            i = PipeOut.read(cbuf, 0, 80);
            while (i != -1) { // I don't know we're using that (The method Read blocks until at least one character of input is available.)
                for (j = 0; j < i; j++) {
                    //if new line
                    if (cbuf[j]=='/n'){
                        // add it to the ArrayList
                        KwicList.add(linebuff.toString());
                        // adn empty the buffer
                        linebuff.delete(0, linebuff.length());
                    }
                    else {
                        //append the character to the line
                        linebuff.append(cbuf[j]);
                    }
                }           
                i = PipeOut.read(cbuf, 0, 80); //get next buffer's worth
            }
        }
        catch (FileNotFoundException e) {
            System.out.println("NO FILE ");
        }
        catch (IOException e) {
            System.out.println("NO I/O or end of stream (SortLinesThread terminated)");
        }
        /* BECAUSE
         * If a thread was providing data characters to the connected piped output,
         * but the thread is no longer alive, then an IOException is thrown. (javadoc)
         */
        // when the reading is finished, sort the ArrayList and diplay
        Collections.sort(KwicList);//sort when added
        displaylist ( KwicList );//Standard Output
        //Export to file
        try {
            export(KwicList, OutputFilename);
        } catch (Exception e) {
            System.out.println("Error Output File ");
        }
    }
    private void displaylist (ArrayList<String> KwicList )
    {
        System.out.println ("/nList : " );
        for ( int count = 0; count < KwicList.size(); count++ )
            System.out.println (KwicList.get (count) );
    }
    private void export(ArrayList<String> List, String oufFilename) throws Exception{
        BufferedWriter writer = null;
        try {
            writer = new BufferedWriter(new FileWriter(oufFilename));
        } catch (FileNotFoundException e) {
            System.err.println(("File not open" + e.toString()));
            System.exit(1);
        }

        for (int count = 0; count < List.size(); count++) {
              writer.write(List.get(count));
              writer.write("/r/n");
        }
        writer.flush();
        writer.close();
        System.out.println("Processed Finished");
    }
}

 

作者:gnuhpc
出處:http://www.cnblogs.com/gnuhpc/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM