動機:前幾天去面試,重點問了一些關於多線程網絡通信的問題。這塊也的確很重要。特總結一下自己對多線程這塊的理解和實現方式。
帶着一些問題。。。。。。。
怎么進行網絡通信?
網絡通信需要注意哪些?
網絡連接池怎么來進行實現?
1.怎么進行網絡通信?
網絡接口+多線程
2. 網絡通信需要注意哪些?
穩定+健壯+效率
3. 網絡連接池的實現
有自定義實現
也可以使用第三方框架
無論是使用自定義實現還是使用第三方框架都會遇到這些問題。就是網絡連接池的基本框架是什么樣子的。基本原理是這樣的,有一個線程的列表,還有一個報文的列表。報文的列表分為有界的和無界的。這個需要根據需求來定了。 我遇到最多的就是無界的了。 當添加一個報文任務的時候,取得一個空閑的線程來做,當再增加一個報文任務的時候,再取得一條空閑的線程來做。當線程都在忙的時候。在增加報文任務的時候,把這些報文任務添加到報文列表中,當線程有空閑的時候,再來取報文列表中的任務來做。
自定義實現(無界的報文列表):
- notify,wait 方法是用的時候,包含這個方法的方法,需要加上同步鎖。
例如:
/** * 等待操作 */ private synchronized void httpThreadWait() { try { if(!isRunning()){ wait(); } } catch (InterruptedException e) { System.out.println(e.toString()); } }
2.同步鎖的三種方式
對類進行加鎖
例如:
/** * 發送數據包 * @param httpPacket 數據包 */ private synchronized static void sendHttpPacket(HttpPacket httpPacket){ //增加數據包到報文列表 HttpThreadPool httpThreadPool = HttpThreadPool.instance(); httpThreadPool.addHttpPacket(httpPacket); //如果有空閑的線程就取得,然后去處理報文 HttpThread httpThread = httpThreadPool.getFreeHttpThread(); if(null != httpThread){ httpThread.setRunning(true); } }
對象鎖:
一種是全局對象鎖
例如:
/** * 得到最前面的報文 * @return HttpPacket 報文 */ public synchronized HttpPacket getFirstHttpPacket(){ if(isHaveHttpPacket()){ HttpPacket temp = httpPackets.get(0); httpPackets.remove(0); return temp; } else{ return null; } }
一種是局部函數對象鎖
例如:
/** * 按鍵事件同步鎖 */ private byte[] keyLock = new byte[0]; /** * 按鍵按下事件 * * @param keyCode * 鍵值 * @param event * 事件 * @return 是否響應 */ public boolean onKeyDown(int keyCode, KeyEvent event) { synchronized (keyLock) { if (keyEventList.size() > MAX_EVENT) { keyEventList.remove(0); } keyEventList.add(new KeyAction(KeyAction.KEY_DOWN, keyCode)); } return true; }
自定義框架代碼:

1 /* 2 * 文 件 名: HttpPacket.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-23 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 /** 12 * http報文 13 * <一句話功能簡述> 14 * <功能詳細描述> 15 * 16 * @author 吳佳峻 17 * @version 2012-4-23 18 * @see [相關類/方法] 19 */ 20 public class HttpPacket 21 { 22 private int id; 23 private String url; 24 private IHttpHandler handler; 25 private boolean isCancel; 26 27 28 public boolean isCancel() 29 { 30 return isCancel; 31 } 32 public void setCancel(boolean isCancel) 33 { 34 this.isCancel = isCancel; 35 } 36 public IHttpHandler getHandler() 37 { 38 return handler; 39 } 40 public void setHandler(IHttpHandler handler) 41 { 42 this.handler = handler; 43 } 44 public int getId() 45 { 46 return id; 47 } 48 public void setId(int id) 49 { 50 this.id = id; 51 } 52 public String getUrl() 53 { 54 return url; 55 } 56 public void setUrl(String url) 57 { 58 this.url = url; 59 } 60 61 }

1 /* 2 * 文 件 名: IHttpHandler.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-23 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 12 13 public interface IHttpHandler 14 { 15 public static final int SERVER_ERROR = 0; 16 public static final int USER_CANCEL = 1; 17 public static final int CONNECT_ERROR = 2; 18 public static final int OK = 3; 19 20 void response(int result,Object data,int id,String desc); 21 }

1 /* 2 * 文 件 名: ProductHandler.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-25 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 12 public class ProductHandler implements IHttpHandler 13 { 14 15 @Override 16 public void response(int result, Object data, int id, String desc) 17 { 18 System.out.println("result:"+result+" data:"+data.toString()+" id:"+id+" desc:"+desc); 19 } 20 21 }

1 /* 2 * 文 件 名: HttpTask.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-23 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 12 import java.io.ByteArrayOutputStream; 13 import java.io.IOException; 14 import java.io.InputStream; 15 import java.net.HttpURLConnection; 16 import java.net.URL; 17 18 19 20 /** 21 * http任務 22 * <一句話功能簡述> 23 * <功能詳細描述> 24 * 25 * @author 吳佳峻 26 * @version 2012-4-23 27 * @see [相關類/方法] 28 */ 29 public class HttpThread implements Runnable 30 { 31 32 private boolean isRunning; 33 34 private HttpPacket httpPacket; 35 36 // private IHttpHandler handler; 37 38 private HttpURLConnection httpUrlConn; 39 40 public boolean isRunning() 41 { 42 return isRunning; 43 } 44 45 /** 46 * 設置是否運行 47 * @param isRunning 是否運行 48 */ 49 public synchronized void setRunning(boolean isRunning){ 50 this.isRunning = isRunning; 51 if(isRunning){ 52 notify(); 53 } 54 } 55 56 @Override 57 public void run() 58 { 59 while(true){ 60 try 61 { 62 //等待 63 httpThreadWait(); 64 65 boolean isHavePacket = HttpThreadPool.instance().isHaveHttpPacket(); 66 if(isHavePacket){ 67 //取得連接 68 httpPacket = HttpThreadPool.instance().getFirstHttpPacket(); 69 httpUrlConn = httpConnect(httpPacket); 70 71 // 處理服務器返回數據 72 dealResponse(httpUrlConn,httpPacket.getHandler()); 73 } 74 75 76 } 77 catch (IOException e) 78 { 79 e.printStackTrace(); 80 } 81 finally 82 { 83 if (null != httpUrlConn) 84 { 85 //httpConn.close(); 86 httpUrlConn.disconnect(); 87 httpUrlConn = null; 88 } 89 // 結束操作 90 endRun(); 91 } 92 } 93 } 94 95 96 97 /** 98 * 結束操作 99 */ 100 private void endRun() 101 { 102 setRunning(false); 103 if(HttpThreadPool.instance().isHaveHttpPacket()){ 104 setRunning(true); 105 } 106 } 107 108 /** 109 * 等待操作 110 */ 111 private synchronized void httpThreadWait() 112 { 113 try 114 { 115 if(!isRunning()){ 116 wait(); 117 } 118 } 119 catch (InterruptedException e) 120 { 121 System.out.println(e.toString()); 122 } 123 } 124 125 /* 126 * 建立HTTP連接 127 * 128 * @param request 129 * 請求對象 130 * @return HttpURLConnection http連接對象 131 */ 132 private HttpURLConnection httpConnect(HttpPacket httpPacket) throws IOException 133 { 134 HttpURLConnection conn = null; 135 URL url = new URL(httpPacket.getUrl()); 136 conn = (HttpURLConnection) url.openConnection(); 137 conn.setRequestProperty("Accept", "*/*"); 138 conn.setRequestProperty("User-Agent", "android"); 139 conn.setRequestMethod("GET"); 140 return conn; 141 } 142 143 /* 144 * 處理服務器響應 向回調Handler傳遞數據 145 * 146 * @param setHttpConns HTTP連接 147 * @param handler 回調Handler 148 * @return 149 * @throws IOException 流異常 150 * 151 * @return boolean 服務器是否成功響應 152 */ 153 private void dealResponse(HttpURLConnection setHttpConn,IHttpHandler handler) 154 throws IOException 155 { 156 157 158 // 獲取服務器響應數據 159 byte[] data = getResponseData(setHttpConn); 160 if (data == null) 161 { 162 System.out.println("err,response inputstream data is null!"); 163 if (!httpPacket.isCancel()) 164 { 165 handler.response(IHttpHandler.SERVER_ERROR, null, httpPacket.getId(), "無響應數據"); 166 } 167 else 168 { 169 handler.response(IHttpHandler.USER_CANCEL, null, httpPacket.getId(), "無響應數據"); 170 } 171 } 172 else 173 { 174 if (!httpPacket.isCancel()) 175 { 176 177 String getData = new String(data, "UTF-8"); 178 179 // System.out.println("receive data:" + getData); 180 if (getData == null || getData.equals("")) 181 { 182 System.out.println("receive data string empty!!!!!!"); 183 handler.response(IHttpHandler.SERVER_ERROR, null, httpPacket.getId(), "無響應數據"); 184 return; 185 } 186 String metaBean = null; 187 try 188 { 189 metaBean = "abasdkjfasjdf;laksjd;flkajs;dfj;asdjf;"; 190 } 191 catch (Exception e) 192 { 193 System.out.println("meta parse error!"); 194 handler.response(IHttpHandler.CONNECT_ERROR, null, httpPacket.getId(), null); 195 return; 196 } 197 handler.response(IHttpHandler.OK, metaBean, httpPacket.getId(), "成功"); 198 } 199 } 200 201 //closeInputStream(is); 202 //is = null; 203 } 204 205 /* 206 * 獲取服務器響應數據 207 * 208 * @param setHttpConn 連接對象 209 * @param is 輸入流 210 * @return 響應的byte[] 211 * @throws IOException 異常 212 */ 213 private byte[] getResponseData(HttpURLConnection setHttpConn) 214 { 215 InputStream is = null; 216 ByteArrayOutputStream baos = null; 217 try 218 { 219 //is = setHttpConn.openInputStream(); 220 is = setHttpConn.getInputStream(); 221 if (is == null) 222 { 223 return null; 224 } 225 baos = new ByteArrayOutputStream(); 226 boolean stopReading = false; 227 int c = 0; 228 while ((c = is.read()) != -1) 229 { 230 if (httpPacket.isCancel()) 231 { 232 stopReading = true; 233 break; 234 } 235 baos.write(c); 236 } 237 if (stopReading) 238 { 239 //closeOutputStream(baos); 240 241 return null; 242 } 243 byte[] data = baos.toByteArray(); 244 //closeOutputStream(baos); 245 return data; 246 } 247 catch (IOException e) 248 { 249 System.out.println(e.toString()); 250 } 251 finally 252 { 253 try 254 { 255 if (baos != null) 256 { 257 baos.close(); 258 baos = null; 259 } 260 261 if (is != null) 262 { 263 is.close(); 264 is = null; 265 } 266 } 267 catch (IOException e) 268 { 269 System.out.println(e.toString()); 270 } 271 } 272 return null; 273 } 274 }

1 /* 2 * 文 件 名: HttpThreadPool.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-23 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 12 import java.util.ArrayList; 13 14 /** 15 * http連接池 16 * <一句話功能簡述> 17 * <功能詳細描述> 18 * 19 * @author 吳佳峻 20 * @version 2012-4-23 21 * @see [相關類/方法] 22 */ 23 public class HttpThreadPool 24 { 25 //最小的線程數 26 public static final int COREPOOLSIZE = 2; 27 28 //最大的線程數 29 public static final int MAXPOOLSIZE = 5; 30 31 private static HttpThreadPool httpThreadPool; 32 33 //報文列表 34 private ArrayList<HttpPacket> httpPackets = new ArrayList<HttpPacket>(); 35 36 //線程列表 37 private ArrayList<HttpThread> httpThreads = new ArrayList<HttpThread>(); 38 39 private HttpThreadPool(){ 40 for(int i=0;i<COREPOOLSIZE;i++){ 41 HttpThread ht = new HttpThread(); 42 Thread t = new Thread(ht); 43 t.start(); 44 httpThreads.add(ht); 45 } 46 } 47 48 /** 49 * 取得一個實例 50 * 單例,同步鎖 51 * @return HttpThreadPool 線程池 52 */ 53 public synchronized static HttpThreadPool instance(){ 54 if(null == httpThreadPool){ 55 httpThreadPool = new HttpThreadPool(); 56 } 57 return httpThreadPool; 58 } 59 60 /** 61 * 增加一個報文 62 * @param e 報文 63 */ 64 public synchronized void addHttpPacket(HttpPacket e){ 65 httpPackets.add(e); 66 } 67 68 // /** 69 // * 刪除排在最前面的報文 70 // */ 71 // public synchronized void removeFirstHttpPacket(){ 72 // if(httpPackets.size()!=0){ 73 // httpPackets.remove(0); 74 // } 75 // } 76 77 /** 78 * 得到最前面的報文 79 * @return HttpPacket 報文 80 */ 81 public synchronized HttpPacket getFirstHttpPacket(){ 82 if(isHaveHttpPacket()){ 83 HttpPacket temp = httpPackets.get(0); 84 httpPackets.remove(0); 85 return temp; 86 } 87 else{ 88 return null; 89 } 90 } 91 /** 92 * 93 * 列表中是否有報文 94 * @return boolean 列表中是否有報文 95 */ 96 public synchronized boolean isHaveHttpPacket(){ 97 if(httpPackets.size()!=0){ 98 return true; 99 } 100 return false; 101 } 102 103 /** 104 * 取得空閑的線程 105 * 如果沒有到達最大值就生成一個返回,如果超過的話,就返回空 106 * @return HttpThread [返回類型說明] 107 */ 108 public synchronized HttpThread getFreeHttpThread(){ 109 //已經存在的列表中是否有空閑的線程 110 for(int i=0;i<httpThreads.size();i++){ 111 HttpThread t = httpThreads.get(i); 112 if(!t.isRunning()){ 113 return t; 114 } 115 } 116 //如果沒有的話,當線程數小於最大線程數的時候,創建一條線程 117 if(httpThreads.size()<MAXPOOLSIZE){ 118 HttpThread t = new HttpThread(); 119 httpThreads.add(t); 120 return t; 121 } 122 //大於最大線程數的時候,返回空 123 return null; 124 } 125 126 127 }

1 /* 2 * 文 件 名: HttpTest.java 3 * 描 述: <描述> 4 * 修 改 人: 吳佳峻 5 * 修改時間: 2012-4-24 6 * 跟蹤單號: <跟蹤單號> 7 * 修改單號: <修改單號> 8 * 修改內容: <修改內容> 9 */ 10 package thread.pool; 11 12 public class HttpTest 13 { 14 15 /** <一句話功能簡述> 16 * <功能詳細描述> 17 * @param args [參數說明] 18 * 19 * @return void [返回類型說明] 20 * @exception throws [違例類型] [違例說明] 21 * @see [類、類#方法、類#成員] 22 */ 23 public static void main(String[] args) 24 { 25 for(int i=0;i<100;i++){ 26 sendOnePacket(i); 27 } 28 } 29 30 private static void sendOnePacket(int i) 31 { 32 HttpPacket p1 = new HttpPacket(); 33 p1.setId(i); 34 p1.setUrl("http://www.baidu.com/"); 35 p1.setHandler(new ProductHandler()); 36 sendHttpPacket(p1); 37 } 38 39 40 /** 41 * 發送數據包 42 * @param httpPacket 數據包 43 */ 44 private synchronized static void sendHttpPacket(HttpPacket httpPacket){ 45 //增加數據包到報文列表 46 HttpThreadPool httpThreadPool = HttpThreadPool.instance(); 47 httpThreadPool.addHttpPacket(httpPacket); 48 //如果有空閑的線程就取得,然后去處理報文 49 HttpThread httpThread = httpThreadPool.getFreeHttpThread(); 50 if(null != httpThread){ 51 httpThread.setRunning(true); 52 } 53 } 54 }
使用第三方框架實現(無界的報文列表):
//待續。。。。