Java進程間通信學習


轉自:https://www.iteye.com/blog/polim-1278435

進程間通信的主要方法有:
(1)管道(Pipe):管道可用於具有親緣關系進程間的通信,允許一個進程和另一個與它有共同祖先的進程之間進行通信。
(2)命名管道(named pipe):命名管道克服了管道沒有名字的限制,因此,除具有管道所具有的功能外,它還允許無親緣關系進程間的通信。命名管道在文件系統中有對應的文件名。命名管道通過命令mkfifo或系統調用mkfifo來創建。
(3)信號(Signal):信號是比較復雜的通信方式,用於通知接受進程有某種事件發生,除了用於進程間通信外,進程還可以發送信號給進程本身;linux除了支持Unix早期信號語義函數sigal外,還支持語義符合Posix.1標准的信號函數sigaction(實際上,該函數是基於BSD的,BSD為了實現可靠信號機制,又能夠統一對外接口,用sigaction函數重新實現了signal函數)。Linux中可以使用kill -12 進程號,像當前進程發送信號,但前提是發送信號的進程要注冊該信號。
example:
OperateSignal operateSignalHandler = new OperateSignal();
Signal sig = new Signal("USR2");
Signal.handle(sig, operateSignalHandler);
(4)消息(Message)隊列:消息隊列是消息的鏈接表,包括Posix消息隊列system V消息隊列。有足夠權限的進程可以向隊列中添加消息,被賦予讀權限的進程則可以讀走隊列中的消息。消息隊列克服了信號承載信息量少,管道只能承載無格式字節流以及緩沖區大小受限等缺限。
(5)共享內存:使得多個進程可以訪問同一塊內存空間,是最快的可用IPC形式。是針對其他通信機制運行效率較低而設計的。往往與其它通信機制,如信號量結合使用,來達到進程間的同步及互斥。
(6)內存映射(mapped memory):內存映射允許任何多個進程間通信,每一個使用該機制的進程通過把一個共享的文件映射到自己的進程地址空間來實現它。
Java 中有類 MappedByteBuffer實現內存映射
(7)信號量(semaphore):主要作為進程間以及同一進程不同線程之間的同步手段。
(8)套接口(Socket):更為一般的進程間通信機制,可用於不同機器之間的進程間通信。起初是由Unix系統的BSD分支開發出來的,但現在一般可以移植到其它類Unix系統上:Linux和System V的變種都支持套接字。

管道方式
一、Java 啟動子進程方式

1 12 Runtime rt = Runtime.getRuntime();
3 Process process = rt.exec("java com.test.process.T3");
4 25 ProcessBuilder pb = new ProcessBuilder("java", "com.test.process.T3");
6 Process p = pb.start();


二、Java父、子進程通信方式(管道方式)
父進程獲取子進程輸出流方式

1 BufferedInputStream in = new BufferedInputStream(p.getInputStream());
2 BufferedReader br = new BufferedReader(new InputStreamReader(in));
3 String s;
4 while ((s = br.readLine()) != null) {
5   System.out.println(s);
6 }


子進程獲取父進程輸入流方式

 1 package com.test.process;
 2 import java.io.BufferedReader;
 3 import java.io.IOException;
 4 import java.io.InputStreamReader;
 5 
 6 public class T3 {
 7 
 8     public static void main(String[] args) throws IOException {
 9         System.out.println("子進程被調用成功!");
10 
11         BufferedReader bfr = new BufferedReader(new InputStreamReader(System.in));
12 
13         while (true) {
14             String strLine = bfr.readLine();
15             if (strLine != null) {
16                 System.out.println("hi:" + strLine);
17             }
18         }
19     }
20 
21 }


三、詳細測試類
父進程測試類:

 1 package com.test.process.pipe;
 2 import java.io.IOException;
 3 
 4 public class ProcessTest {
 5 
 6     public static void main(String[] args) throws IOException, InterruptedException {
 7         Process p = Runtime.getRuntime().exec("java com.test.process.pipe.MyTest");
 8 
 9         StringBuilder sbuilder = new StringBuilder();
10         for(int k=0;k<1;k++){
11             sbuilder.append("hello");
12         }
13 
14         int outSize = 1;
15         TestOut out[] = new TestOut[outSize];
16         for(int i=0;i<outSize;i++){
17             out[i] = new TestOut(p,sbuilder.toString().getBytes());
18             new Thread(out[i]).start();
19         }
20 
21         int inSize = 1;
22         TestIn in[] = new TestIn[inSize];
23         for(int j=0;j<inSize;j++){
24             in[j] = new TestIn(p);
25             new Thread(in[j]).start();
26         }
27     }
28 }


子進程類

 1 package com.test.process.pipe;
 2 import java.io.BufferedReader;
 3 import java.io.InputStreamReader;
 4 
 5 public class MyTest {
 6     public static void main(String[] args) throws Exception {
 7 //讀取父進程輸入流
 8         BufferedReader bfr = new BufferedReader(new InputStreamReader(System.in));
 9         while (true) {
10             String strLine = bfr.readLine();
11             if (strLine != null) {
12                 System.out.println(strLine);//這個地方的輸出在子進程控制台是無法輸出的,只可以在父進程獲取子進程的輸出
13             }else {
14                 return;
15             }
16         }
17     }
18 }


TestIn類

 1 package com.test.process.pipe;
 2 import java.io.BufferedReader;
 3 import java.io.InputStream;
 4 import java.io.InputStreamReader;
 5 
 6 public class TestIn implements Runnable{
 7 
 8     private Process p = null;
 9     public TestIn(Process process){
10         p = process;
11     }
12 
13     @Override
14     public void run() {
15         try {
16             InputStream in = p.getInputStream();
17             BufferedReader bfr = new BufferedReader(new InputStreamReader(in));
18             String rd = bfr.readLine();
19             if(rd != null){
20                 System.out.println(rd);//輸出子進程返回信息(即子進程中的System.out.println()內容)
21             }else{
22                 return;
23             }
24 //注意這個地方,如果關閉流則子進程的返回信息無法獲取,如果不關閉只有當子進程返回字節為8192時才返回,為什么是8192下面說明.
25 //bfr.close();
26 //in.close();
27         } catch (Exception e) {
28             e.printStackTrace();
29         }
30     }
31 }


TestOut類

 1 package com.test.process.pipe;
 2 import java.io.IOException;
 3 import java.io.OutputStream;
 4 
 5 public class TestOut implements Runnable {
 6 
 7     private Process p = null;
 8     private byte []b = null;
 9 
10     public TestOut(Process process,byte byt[]){
11         p = process;
12         b = byt;
13     }
14 
15     @Override
16     public void run() {
17         try {
18             OutputStream ops = p.getOutputStream();
19 //System.out.println("out--"+b.length);
20             ops.write(b);
21 //注意這個地方如果關閉,則父進程只可以給子進程發送一次信息,如果這個地方開啟close()則父進程給子進程不管發送大小多大的數據,子進程都可以返回
22 //如果這個地方close()不開啟,則父進程給子進程發送數據累加到8192子進程才返回。
23 //ops.close();
24         } catch (IOException e) {
25             e.printStackTrace();
26         }
27     }
28 }

備注:
1、子進程的輸出內容是無法在控制台輸出的,只能再父類中獲取並輸出。
2、父進程往子進程寫內容時如果關閉字節流,則子進程的輸入流同時關閉。
3、如果父進程中輸入、輸出流都不關閉,子進程獲取的字節流在達到8129byte時才返回。
4、關閉子進程一定要在父進程中關閉 p.destroy()

實例1:

 1 /**
 2  *如下另一種情況說明
 3  *如果像如下情況執行會出現說明情況呢
 4  *前提說明:TestOut類中開啟ops.close();
 5  */
 6 package com.test.process.pipe;
 7 import java.io.IOException;
 8 
 9 public class ProcessTest {
10 
11     public static void main(String[] args) throws IOException, InterruptedException {
12         Process p = Runtime.getRuntime().exec("java com.test.process.pipe.MyTest");
13 
14         TestOut out = new TestOut(p,"Hello everyone".getBytes());
15         new Thread(out).start();
16 
17         TestIn ti = new TestIn(p);
18         new Thread(ti).start();
19 
20         Thread.sleep(3000);
21 
22         TestOut out2 = new TestOut(p,"-Hello-everyone".getBytes());
23         new Thread(out2).start();
24 
25         TestIn ti2 = new TestIn(p);
26         new Thread(ti2).start();
27     }
28 }


執行后輸出結果為:

Hello everyone
java.io.IOException: Stream closed
        at java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:145
)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:308)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
        at java.io.InputStreamReader.read(InputStreamReader.java:167)
        at java.io.BufferedReader.fill(BufferedReader.java:136)
        at java.io.BufferedReader.readLine(BufferedReader.java:299)
        at java.io.BufferedReader.readLine(BufferedReader.java:362)
        at com.test.process.pipe.TestIn.run(TestIn.java:20)
        at java.lang.Thread.run(Thread.java:662)

由此可見當創建一個子進程后,p.getOutputStream();p.getInputStream();通過兩種方式使父進程與子進程建立管道連接,而當close()連接時管道關閉,在通過調用
p.getOutputStream();p.getInputStream();時直接出現IOException,結論為當父子進程建立連接后,通過管道長連接的方式進程信息傳輸,當close時在通過獲取子進程的輸入輸出流
都會出現IOException

實例2:
在實例1的基礎上進行修改,會出現什么結果呢,如下

 1 package com.test.process.pipe;
 2 import java.io.IOException;
 3 
 4 public class ProcessTest {
 5 
 6     public static void main(String[] args) throws IOException, InterruptedException {
 7         Process p = Runtime.getRuntime().exec("java com.test.process.pipe.MyTest");
 8 
 9         TestOut out = new TestOut(p,"Hello everyone".getBytes());
10         new Thread(out).start();
11 
12         TestIn ti = new TestIn(p);
13         new Thread(ti).start();
14 
15         Process p2 = Runtime.getRuntime().exec("java com.test.process.pipe.MyTest");
16         TestOut out2 = new TestOut(p2,"-Hello-everyone".getBytes());
17         new Thread(out2).start();
18 
19         TestIn ti2 = new TestIn(p2);
20         new Thread(ti2).start();
21     }
22 }


輸出結果:

Hello everyone
-Hello-everyone

綜上可見每個父進程創建一個子進程后,通過p.getOutputStream();p.getInputStream();建立管道連接后,無法關閉流,如果關閉了則需要重新建立進程才可以達到通信的效果。
如果不關閉流,則傳輸的字符內容累加到8192byte時才可以返回。

為什么是8192byte呢?


JDK 源碼分析

 1 class TestLambda {
 2     @FunctionalInterface
 3     interface A {
 4         int use();
 5     }
 6 
 7     public static int getValue(int value) {
 8         return value;
 9     }
10 
11     public void useValue(int value) {
12         A a = () -> { return getValue(value); };
13     }
14 }
15 
16     Process p = Runtime.getRuntime().exec("java com.test.process.pipe.MyTest");
17 
18     public Process exec(String command) throws IOException {
19         return exec(command, null, null);
20     }
21 
22     public Process exec(String command, String[] envp, File dir)
23             throws IOException {
24         if (command.length() == 0)
25             throw new IllegalArgumentException("Empty command");
26 
27         StringTokenizer st = new StringTokenizer(command);
28         String[] cmdarray = new String[st.countTokens()];
29         for (int i = 0; st.hasMoreTokens(); i++)
30             cmdarray[i] = st.nextToken();
31         return exec(cmdarray, envp, dir);
32     }
33 
34     public Process exec(String[] cmdarray, String[] envp, File dir)
35             throws IOException {
36         return new ProcessBuilder(cmdarray)
37                 .environment(envp)
38                 .directory(dir)
39                 .start();
40     }


接下來會執行 ProcessBuilder.start   

1 return ProcessImpl.start(cmdarray,environment,dir,redirectErrorStream);

執行ProcessImpl.start(final class ProcessImpl extends Process )
OutputStream
InputStream 是在這里聲明的
如下:

 1 //關鍵這個地方 創建的為FileDescriptor 管理的方式底層也是通過文件的方式實現的,原理跟linux的管道相同
 2 stdin_fd  = new FileDescriptor();
 3 stdout_fd = new FileDescriptor();
 4 stderr_fd = new FileDescriptor();
 5 
 6 handle = create(cmdstr, envblock, path, redirectErrorStream,
 7 stdin_fd, stdout_fd, stderr_fd);
 8 
 9 java.security.AccessController.doPrivileged(
10     new java.security.PrivilegedAction() {
11     public Object run() {
12 stdin_stream =
13     new BufferedOutputStream(new FileOutputStream(stdin_fd));
14 stdout_stream =
15     new BufferedInputStream(new FileInputStream(stdout_fd));
16 stderr_stream =
17     new FileInputStream(stderr_fd);
18 return null;
19     }
20 });
21 }


Process類中的說明

 1 public abstract class Process
 2 {
 3     /**
 4      * Gets the output stream of the subprocess.
 5      * Output to the stream is piped into the standard input stream of
 6      * the process represented by this <code>Process</code> object.
 7      * <p> //該處說明OutputStream 是通過管道的方式進行的處理
 8      * Implementation note: It is a good idea for the output stream to
 9      * be buffered.
10      *
11      * @return  the output stream connected to the normal input of the
12      *          subprocess.
13      */
14     abstract public OutputStream getOutputStream()
15 }   

BufferedReader類中

1 private static int defaultCharBufferSize = 8192;//默認字符數組長度

另外Java中還提供了PipedInputStream、PipedOutputStream類,但這2個類用在多進程間交互是無法實現的。

總結:
1、如果Java中要涉及到多進程之間交互,子進程只是簡單的做一些功能處理的話建議使用
Process p = Runtime.getRuntime().exec("java ****類名");
p.getOutputStream()
p.getInputStream() 的方式進行輸入、輸出流的方式進行通信
如果涉及到大量的數據需要在父子進程之間交互不建議使用該方式,該方式子類中所有的System都會返回到父類中,另該方式不太適合大並發多線程
2、內存共享(MappedByteBuffer)
該方法可以使用父子進程之間通信,但在高並發往內存內寫數據、讀數據時需要對文件內存進行鎖機制,不然會出現讀寫內容混亂和不一致性,Java里面提供了文件鎖FileLock,但這個在父/子進程中鎖定后另一進程會一直等待,效率確實不夠高。
RandomAccessFile raf = new RandomAccessFile("D:/a.txt", "rw");
FileChannel fc = raf.getChannel(); 
MappedByteBuffer mbb = fc.map(MapMode.READ_WRITE, 0, 1024);
FileLock fl = fc.lock();//文件鎖
3、Socket 這個方式可以實現,需要在父子進程間進行socket通信
4、隊列機制 這種方式也可以實現,需要父/子進程往隊列里面寫數據,子/父進程進行讀取。不太好的地方是需要在父子進程之間加一層隊列實現,隊列實現有ActiveMQ,FQueue等
5、通過JNI方式,父/子進程通過JNI對共享進程讀寫
6、基於信號方式,但該方式只能在執行kill -12或者在cmd下執行ctrl+c 才會觸發信息發生。

 1 OperateSignal operateSignalHandler = new OperateSignal();
 2 Signal sig = new Signal("SEGV");//SEGV 這個linux和window不同
 3 Signal.handle(sig, operateSignalHandler);
 4 
 5 public class OperateSignal implements SignalHandler{
 6 @Override
 7 public void handle(Signal arg0) {
 8 System.out.println("信號接收");
 9 }
10 }

7、要是在線程間也可以使用Semaphore
8、說明一下Java中沒有命名管道

參考:

Java常用消息隊列原理介紹及性能對比

JMS(Java消息服務)入門教程

Java 消息隊列之 RabbitMQ 使用

FileLock——Java文件鎖

Java中處理Linux信號量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM