文章目錄
一、使用同一個共享變量控制
二、PipedInputStream、PipedOutputStream
三、利用BlockingQueue
四、利用LockSupport
一、使用同一個共享變量控制
Synchronized、wait、notify
public class Demo1 {
private final List<Integer> list =new ArrayList<>();
public static void main(String[] args) {
Demo1 demo =new Demo1();
new Thread(()->{
for (int i=0;i<10;i++){
synchronized (demo.list){
if(demo.list.size()%2==1){
try {
demo.list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
demo.list.notify();
}
}
}).start();
new Thread(()->{
for (int i=0;i<10;i++){
synchronized (demo.list){
if(demo.list.size()%2==0){
try {
demo.list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
demo.list.notify();
}
}
}).start();
}
}
Lock、Condition
public class Task {
private final Lock lock = new ReentrantLock();
private final Condition addConditon = lock.newCondition();
private final Condition subConditon = lock.newCondition();
private volatile int num = 0;
private List<String> list = new ArrayList<>();
public void add() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
if (list.size() == 10) {
addConditon.await();
}
num++;
Thread.sleep(100);
list.add("add " + num);
System.out.println("The list size is " + list.size());
System.out.println("The add thread is " + Thread.currentThread().getName());
System.out.println("-------------");
subConditon.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public void sub() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
if (list.size() == 0) {
subConditon.await();
}
num--;
Thread.sleep(100);
list.remove(0);
System.out.println("The list size is " + list.size());
System.out.println("The sub thread is " + Thread.currentThread().getName());
System.out.println("-------------");
addConditon.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
Task task = new Task();
new Thread(task::add).start();
new Thread(task::sub).start();
}
}
利用volatile
volatile修飾的變量值直接存在主內存里面,子線程對該變量的讀寫直接寫住內存,而不是像其它變量一樣在local thread里面產生一份copy。volatile能保證所修飾的變量對於多個線程可見性,即只要被修改,其它線程讀到的一定是最新的值。
public class Demo2 {
private volatile List<Integer> list =new ArrayList<>();
public static void main(String[] args) {
Demo2 demo =new Demo2();
new Thread(()->{
for (int i=0;i<10;i++){
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
}
}).start();
new Thread(()->{
for (int i=0;i<10;i++){
demo.list.add(i);
System.out.print(Thread.currentThread().getName());
System.out.println(demo.list);
}
}).start();
}
}
利用AtomicInteger
和volatile類似, 只是原子操作達到預估值非A即B

二、PipedInputStream、PipedOutputStream
這里用流在兩個線程間通信,但是Java中的Stream是單向的,所以在兩個線程中分別建了一個input和output
public class PipedDemo {
private final PipedInputStream inputStream1;
private final PipedOutputStream outputStream1;
private final PipedInputStream inputStream2;
private final PipedOutputStream outputStream2;
public PipedDemo(){
inputStream1 = new PipedInputStream();
outputStream1 = new PipedOutputStream();
inputStream2 = new PipedInputStream();
outputStream2 = new PipedOutputStream();
try {
inputStream1.connect(outputStream2);
inputStream2.connect(outputStream1);
} catch (IOException e) {
e.printStackTrace();
}
}
/**程序退出時,需要關閉stream*/
public void shutdown() throws IOException {
inputStream1.close();
inputStream2.close();
outputStream1.close();
outputStream2.close();
}
public static void main(String[] args) throws IOException {
PipedDemo demo =new PipedDemo();
new Thread(()->{
PipedInputStream in = demo.inputStream2;
PipedOutputStream out = demo.outputStream2;
for (int i = 0; i < 10; i++) {
try {
byte[] inArr = new byte[2];
in.read(inArr);
System.out.print(Thread.currentThread().getName()+": "+i+" ");
System.out.println(new String(inArr));
while(true){
if("go".equals(new String(inArr)))
break;
}
out.write("ok".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
PipedInputStream in = demo.inputStream1;
PipedOutputStream out = demo.outputStream1;
for (int i = 0; i < 10; i++) {
try {
out.write("go".getBytes());
byte[] inArr = new byte[2];
in.read(inArr);
System.out.print(Thread.currentThread().getName()+": "+i+" ");
System.out.println(new String(inArr));
while(true){
if("ok".equals(new String(inArr)))
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// demo.shutdown();
}
}
輸出:
Thread-0: 0 go Thread-1: 0 ok Thread-0: 1 go Thread-1: 1 ok Thread-0: 2 go Thread-1: 2 ok Thread-0: 3 go Thread-1: 3 ok Thread-0: 4 go Thread-1: 4 ok Thread-0: 5 go Thread-1: 5 ok Thread-0: 6 go Thread-1: 6 ok Thread-0: 7 go Thread-1: 7 ok Thread-0: 8 go Thread-1: 8 ok Thread-0: 9 go Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定義的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常。
- offer(Object):表示如果可能的話,將Object加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue里有空間再繼續。
- poll(time):獲取並刪除BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null。當不傳入time值時,立刻返回。
- peek():立刻獲取BlockingQueue里排在首位的對象,但不從隊列里刪除,如果隊列為空,則返回null。
- take():獲取並刪除BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的對象被加入為止。
BlockingQueue有四個具體的實現類:
- ArrayBlockingQueue:數組阻塞隊列,規定大小,其構造函數必須帶一個int參數來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:鏈阻塞隊列,大小不定,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO順序排序的。
- PriorityBlockingQueue:類似於LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,它的內部同時只能夠容納單個元素,對其的操作必須是放和取交替完成的。
- DelayQueue:延遲隊列,注入其中的元素必須實現 java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式類似,以下例子一個線程寫入,一個線程讀取,操作的是同一個Queue:
public class BlockingQueueDemo {
public static void main(String[] args) {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
//讀線程
new Thread(() -> {
int i =0;
while (true) {
try {
String item = queue.take();
System.out.print(Thread.currentThread().getName() + ": " + i + " ");
System.out.println(item);
i++;
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
//寫線程
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String item = "go"+i;
System.out.print(Thread.currentThread().getName() + ": " + i + " ");
System.out.println(item);
queue.put(item);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
四、利用LockSupport
用LockSupport的unpark()和park()方法,實現線程間通信。

五、利用ThreadLocal
ThreadLocal,即線程變量,是一個以 ThreadLocal 對象為鍵、任意對象為值的存儲結構。這個結構被依附在線程上,也就是說一個線程可以根據一個 ThreadLocal 對象查詢到綁定在這個線程上的一個值。
可以通過 set(T) 方法來設置一個值,在當前線程下再通過 get() 方法獲取到原先設置的值。
public class ThreadLocalDemo {
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<>() {
@Override
protected Long initialValue() {
return System.currentTimeMillis();
}
};
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws InterruptedException {
ThreadLocalDemo.begin();
Thread.sleep(2000);
System.out.println(ThreadLocalDemo.end());
}
}
//輸出 2003
