總體接收簡單點,就是控制Qos的響應復雜點

總體圖
在connect的時候開啟了Task:
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { synchronized (conLock) { MqttConnect connect = new MqttConnect(client.getClientId(), conOptions.getMqttVersion(), conOptions.isCleanSession(), conOptions.getKeepAliveInterval(), conOptions.getUserName(), conOptions.getPassword(), conOptions.getWillMessage(), conOptions.getWillDestination()); this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval()); this.clientState.setCleanSession(conOptions.isCleanSession()); this.clientState.setMaxInflight(conOptions.getMaxInflight()); tokenStore.open(); ConnectBG conbg = new ConnectBG(this, token, connect); conbg.start(); } ....... } private class ConnectBG implements Runnable { ........ public void run() { try { NetworkModule networkModule = networkModules[networkModuleIndex]; networkModule.start(); receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream()); receiver.start("MQTT Rec: "+getClient().getClientId()); sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream()); sender.start("MQTT Snd: "+getClient().getClientId()); callback.start("MQTT Call: "+getClient().getClientId()); internalSend(conPacket, conToken); } ........ } }
然后啟動CommsSender:
public class CommsReceiverimplements Runnable { private boolean running = false; private Object lifecycle = new Object(); private ClientState clientState = null; private ClientComms clientComms = null; private MqttInputStream in; private CommsTokenStore tokenStore = null; private Thread recThread = null; private volatile boolean receiving; ........ /** * Run loop to receive messages from the server. */ public void run() { final String methodName = "run"; MqttToken token = null; while (running && (in != null)) { try { receiving = in.available() > 0; MqttWireMessage message = in.readMqttWireMessage(); // 【: 1】 receiving = false; if (message instanceof MqttAck) { token = tokenStore.getToken(message); // 【: 2】 if (token!=null) { synchronized (token) { // Ensure the notify processing is done under a lock on the token // This ensures that the send processing can complete before the // receive processing starts! ( request and ack and ack processing // can occur before request processing is complete if not! clientState.notifyReceivedAck((MqttAck)message); // 【: 3】 } } else { // It its an ack and there is no token then something is not right. // An ack should always have a token assoicated with it. throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); } } else { // A new message has arrived clientState.notifyReceivedMsg(message); // 【: 4】 } } catch (MqttException ex) { running = false; // Token maybe null but that is handled in shutdown clientComms.shutdownConnection(token, ex); } catch (IOException ioe) { running = false; // An EOFException could be raised if the broker processes the // DISCONNECT and ends the socket before we complete. As such, // only shutdown the connection if we're not already shutting down. if (!clientComms.isDisconnecting()) { clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); } } finally { receiving = false; } } } }
異常處理我就先不介紹了,細節要自己看下。
1.MqttInputStream來自前面介紹TcpNetworkModule,這個類就是socket底層代碼。
2.tokenStore.getToken(message),因為是確認ack的,他重連和ping的關鍵,特殊message,在自己發出去接受到服務器應答的時候已經被持久化了。然后同步下完成ACK確認就3。
3.看下ACK確認實現
/** * Called by the CommsReceiver when an ack has arrived. * * @param message * @throws MqttException */ protected void notifyReceivedAck(MqttAck ack) throws MqttException { this.lastInboundActivity = System.currentTimeMillis(); MqttToken token = tokenStore.getToken(ack); MqttException mex = null; if (token == null) { // @TRACE 662=no message found for ack id={0} log.fine(CLASS_NAME, methodName, "662", new Object[] { new Integer(ack.getMessageId())}); } else if (ack instanceof MqttPubRec) { // Complete the QoS 2 flow. Unlike all other // flows, QoS is a 2 phase flow. The second phase sends a // PUBREL - the operation is not complete until a PUBCOMP // is received MqttPubRel rel = new MqttPubRel((MqttPubRec) ack); this.send(rel, token); } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) { // QoS 1 & 2 notify users of result before removing from // persistence notifyResult(ack, token, mex); // Do not remove publish / delivery token at this stage // do this when the persistence is removed later } else if (ack instanceof MqttPingResp) { synchronized (pingOutstandingLock) { pingOutstanding = Math.max(0, pingOutstanding-1); notifyResult(ack, token, mex); if (pingOutstanding == 0) { tokenStore.removeToken(ack); } } } else if (ack instanceof MqttConnack) { int rc = ((MqttConnack) ack).getReturnCode(); if (rc == 0) { synchronized (queueLock) { if (cleanSession) { clearState(); // Add the connect token back in so that users can be // notified when connect completes. tokenStore.saveToken(token,ack); } inFlightPubRels = 0; actualInFlight = 0; restoreInflightMessages(); connected(); } } else { mex = ExceptionHelper.createMqttException(rc); throw mex; } clientComms.connectComplete((MqttConnack) ack, mex); notifyResult(ack, token, mex); tokenStore.removeToken(ack); // Notify the sender thread that there maybe work for it to do now synchronized (queueLock) { queueLock.notifyAll(); } } else { // Sub ack or unsuback notifyResult(ack, token, mex); releaseMessageId(ack.getMessageId()); tokenStore.removeToken(ack); } checkQuiesceLock(); // 這貨就是確認隊列是否空,否則就釋放鎖 }
根據不同的消息實現邏輯轉換,MqttPubComp和MqttPubAck,處理 QoS 1 & 2持久化重連的遺留問題,MqttPubRec表示發布失敗需要重發,MqttConnack自己處理鏈接重開,心跳包很重要
4.這個就是你自己實現和關注的消息類容。
/** * Called by the CommsReceiver when a message has been received. * Handles inbound messages and other flows such as PUBREL. * * @param message * @throws MqttException */ protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException { final String methodName = "notifyReceivedMsg"; this.lastInboundActivity = System.currentTimeMillis(); if (!quiescing) { if (message instanceof MqttPublish) { MqttPublish send = (MqttPublish) message; switch (send.getMessage().getQos()) { case 0: case 1: if (callback != null) { callback.messageArrived(send); // 【: 5】 } break; case 2: persistence.put(getReceivedPersistenceKey(message), (MqttPublish) message); inboundQoS2.put(new Integer(send.getMessageId()), send); this.send(new MqttPubRec(send), null); // 【: 6】 break; default: //should NOT reach here } } else if (message instanceof MqttPubRel) { MqttPublish sendMsg = (MqttPublish) inboundQoS2 .get(new Integer(message.getMessageId())); if (sendMsg != null) { if (callback != null) { callback.messageArrived(sendMsg); // 【: 7】 } } else { // Original publish has already been delivered. MqttPubComp pubComp = new MqttPubComp(message .getMessageId()); this.send(pubComp, null); // 【: 8】 } } } }
5.Qos1或者0 就直接通知外部callback消息盒子,然后單獨線程回調界面消息了
/** * This method is called when a message arrives on a topic. Messages are * only added to the queue for inbound messages if the client is not * quiescing. * * @param sendMessage * the MQTT SEND message. */ public void messageArrived(MqttPublish sendMessage) { final String methodName = "messageArrived"; if (mqttCallback != null || callbacks.size() > 0) { // If we already have enough messages queued up in memory, wait // until some more queue space becomes available. This helps // the client protect itself from getting flooded by messages // from the server. synchronized (spaceAvailable) { while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) { try { spaceAvailable.wait(200); } catch (InterruptedException ex) { } } } if (!quiescing) { messageQueue.addElement(sendMessage); // Notify the CommsCallback thread that there's work to do... synchronized (workAvailable) { workAvailable.notifyAll(); } } } }
全部加入消息盒子隊列,如果滿了或者正在處理,會wait一下
6.Qos為2 是精確只發一次需要通知服務器和客戶端全部都收到了,不然重傳
7.就是Qos為2的情況的到達確認。
8.合並消息,然后重發。
然后就是消息隊列輪詢,沒有就阻塞,有就通知界面了。相對於發送,接收的類簡單點。大家可以回顧下前面的發送過程。
轉載:https://www.jianshu.com/p/c8a51a334bac