硬件設備
首先是硬件設備,這類短信模塊,modem pool大都是基於Q2406A, Q2406B之類的串口設備,只支持GSM和GPRS,不支持電信CDMA,早先的設備只有COM口,如果是一個pool,對應每一個模塊都會引出一個COM口,后來出的設備改成了USB2.0接口,其芯片主要是PL2303系列的USB2 Serial Comm方案。在連接到主機后,每一個模塊都會顯示為一個單獨的COM口,COM口編號根據系統當前的計數自動增長。
硬件驅動
Linux
絕大多數發行版都自帶PL2303的驅動,插上即可識別,可以通過dmesg看到硬件變化
[ 6096.417435] usb 2-2.1.1: New USB device found, idVendor=067b, idProduct=2303, bcdDevice= 3.00 [ 6096.417437] usb 2-2.1.1: New USB device strings: Mfr=1, Product=2, SerialNumber=0 [ 6096.417439] usb 2-2.1.1: Product: USB-Serial Controller [ 6096.417440] usb 2-2.1.1: Manufacturer: Prolific Technology Inc. [ 6096.424963] pl2303 2-2.1.1:1.0: pl2303 converter detected [ 6096.426240] usb 2-2.1.1: pl2303 converter now attached to ttyUSB0
通過 lsusb, 也可以看到具體的vendor, idproduct等信息
us 002 Device 045: ID 067b:2303 Prolific Technology, Inc. PL2303 Serial Port Bus 002 Device 044: ID 067b:2303 Prolific Technology, Inc. PL2303 Serial Port Bus 002 Device 043: ID 067b:2303 Prolific Technology, Inc. PL2303 Serial Port Bus 002 Device 042: ID 067b:2303 Prolific Technology, Inc. PL2303 Serial Port
配置udev規則
這一步很重要, 否則執行java代碼查找設備時會無法看到comm設備, 即執行 CommPortIdentifier.getPortIdentifiers() 代碼時返回的結果數為0. 參考的解決方案在這里.
Create a file /etc/udev/rules.d/51-my_usb_device (for instance) 這個文件名可以自己定義, 但是最好以 51- 開頭以確保執行順序. And put the following line:
SUBSYSTEMS=="usb", ATTRS{idVendor}=="067b", ATTRS{idProduct}=="2303", GROUP="users", MODE="0666"
然后關閉COMM設備電源后重新加電, 就能看到變化了, 對應的mod變成了666, 用戶組變成了users
$ ll /dev/ttyU* crw-rw-rw- 1 root users 188, 0 Aug 23 15:20 /dev/ttyUSB0 crw-rw-rw- 1 root users 188, 1 Aug 23 15:20 /dev/ttyUSB1 crw-rw-rw- 1 root users 188, 2 Aug 23 15:20 /dev/ttyUSB2 crw-rw-rw- 1 root users 188, 3 Aug 23 15:56 /dev/ttyUSB3
Windows
Win32未測試,主要是在Win64。對於PL2303系列芯片,Windows會自動安裝驅動,如果未自動安裝的可以手動安裝。
遇到的問題:在Windows 10下PL2303設備會顯示為“pl2303hxa自2012已停產,請聯系供貨商”,或者"pl2303hxa phased out since 2012",無法使用,此時需要安裝舊版驅動,並手動選擇使用舊驅動。搜索 PL2303_Prolific_DriverInstaller_3.3.3.zip 或者 PL2303_Prolific_GPS_1013_20090319.zip,推薦使用前者。
下載后執行安裝程序,待安裝結束后,在Device Manager -> Ports 下對應的串口上右鍵,點擊Update driver,點擊 Browse my computer for drivers,在下一步點擊“Let me pick from a list of available drivers from my computer",在里面選擇日期為2009(對應3.3.3),或2008(對應3.3.2)的驅動,下一步確認后,Ports列表里的串口設備都會一致變更為Prolific USB-to-Serial Comm Port,並展示COM口編號。
Windows 10重啟后, 再次接入硬件時, 依然會使用日期較新的驅動, 需要手動再操作 Update driver.
JAVA配置(方案一)
現在無論是開發環境,還是生產環境,已經全面使用64位操作系統,推薦直接使用RXTX的"rxtx 2.2pre2 (prerelease)"版本,下載地址 里面包含了Windows和Linux 64位版本的jar, dll 和 so. 在jLog的頁面上也有可用的dll和jar推薦,經過CRC SHA-256比對,jLog提供的jar和dll和2.2pre2里的文件是一樣的。
如果沒有將文件放入正確目錄,運行時會出現 java.lang.NoClassDefFoundError: gnu/io/CommPortIdentifier 錯誤。
Linux
Linux下一般只用JDK, 需要在JDK目錄下放置so和jar文件, 確認自己的java環境對應的目錄, 然后
1. 將RXTXcomm.jar 復制到 [JAVA_HOME]/jre/lib/ext/, 例如我使用的是 /opt/jdk/jdk1.8.0_251/jre/lib/ext/
2. 將librxtxSerial.so 復制到[JAVA_HOME]/jre/lib/amd64/, 例如我使用的是 /opt/jdk/jdk1.8.0_251/jre/lib/amd64/
如果沒有正確放置so文件, 運行時會報錯 java.lang.UnsatisfiedLinkError: no rxtxSerial in java.library.path
Windows
需要在JDK和JRE目錄下都放置dll和jar文件,首先確認自己環境下java對應的安裝目錄,然后
1. 將RXTXcomm.jar復制到
C:\Program Files\Java\jre1.8.0_251\lib\ext
C:\Program Files\Java\jdk1.8.0_251\jre\lib\ext
2. 將rxtxSerial.dll復制到
C:\Program Files\Java\jre1.8.0_251\bin
C:\Program Files\Java\jdk1.8.0_251\jre\bin
JAVA配置(方案二)
使用 nrjavaserial , 這個jar中包含了各個操作系統的native文件, 不需要在JDK中添加dll, so和jar. 經過測試完全可以替代方案一. 在dependency中加入jar依賴就可以了.
這個方案的另一個優點在於, 提供了arm和arm64 linux 的支持, 並且支持串口設備掉電通知.
JAVA配置(方案三)
使用jSerialComm https://fazecast.github.io/jSerialComm/
JAVA程序
Java程序都是基於smslib v3的庫方法. 使用的版本為3.5.4
Serial COMM端口和波特率檢測
通過這段代碼,可以列出機器上可用的COMM口及其對應的波特率,以及上面的設備信息
public class CommUtil { private static final Logger logger = LoggerFactory.getLogger(CommUtil.class); private static final int[] baudRates = {9600, 19200, 57600, 115200}; public static List<CommPortDevice> getCommPortDevices() { //System.setProperty("gnu.io.rxtx.SerialPorts", "/dev/ttyUSB0"); List<CommPortDevice> devices = new ArrayList<>(); logger.info("Detecting serial comm devices..."); Enumeration<CommPortIdentifier> portList = CommPortIdentifier.getPortIdentifiers(); while (portList.hasMoreElements()) { CommPortIdentifier portId = portList.nextElement(); if (portId.getPortType() == CommPortIdentifier.PORT_SERIAL) { logger.info("Found serial comm device: {}", portId.getName()); for (int baudRate : baudRates) { logger.info(" Trying at {} ...", baudRate); SerialPort serialPort = null; try { serialPort = portId.open("SMSLibCommTester", 1971); serialPort.setFlowControlMode(SerialPort.FLOWCONTROL_RTSCTS_IN); serialPort.setSerialPortParams(baudRate, SerialPort.DATABITS_8, SerialPort.STOPBITS_1, SerialPort.PARITY_NONE); InputStream inStream = serialPort.getInputStream(); OutputStream outStream = serialPort.getOutputStream(); serialPort.enableReceiveTimeout(1000); int c; while ((c = inStream.read()) != -1) { // do nothing; } outStream.write("AT\r".getBytes()); StringBuilder sb = new StringBuilder(); while ((c = inStream.read()) != -1) { sb.append((char)c); } if (!sb.toString().contains("OK")) { logger.info(" No device detected, response: {}", sb); } else { logger.info(" Device found,"); sb.setLength(0); // clear the StringBuilder outStream.write("AT+CGMM\r".getBytes()); while ((c = inStream.read()) != -1) { sb.append((char)c); } String model = sb.toString() .replaceAll("(\n|\r|AT\\+CGMM|OK)", "") .replaceAll("\\s+", " ").trim(); logger.info(" Device model: {}", model); CommPortDevice device = new CommPortDevice(); device.setSerialPort(portId.getName()); device.setBaudRate(baudRate); device.setModel(model); devices.add(device); break; } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if (serialPort != null) { serialPort.close(); } } } } } return devices; } public static void main(String[] args) { List<CommPortDevice> devices = getCommPortDevices(); logger.info("size: {}", devices.size()); } }
使用service.sendMessage()方法發送短信
sendMessage()使用的是同步的發送方式, 當服務添加了多個gateway時, sendMessage()使用默認的Round Robin輪詢發送, 依次循環使用每一個gateway, 一次發送耗時大約3~5秒. 如果自己管理gateway路由也許可以多個gateway並行發送. 因為現在運營商對短信發送的頻率有限制, 上限為200/hour, 1000/day(節假日500/hour, 2000/day), 均分到每分鍾的可發短信其實很少, 如果gateway數量不超過4個, 並行意義不大.
這里的代碼加了兩個列表, 分別用於記錄inbound和outbound的歷史, 如果需要在重啟后依然保持, 可以換成mysql或者redis.
public class ModemService { private static final Logger logger = LoggerFactory.getLogger(ModemService.class); private static final int HISTORY_SIZE = 5000; private final Service service = Service.getInstance(); private final LinkedList<OutboundMessage> outboundMessages = new LinkedList<>(); private final LinkedList<InboundMessage> inboundMessages = new LinkedList<>(); private boolean ready = false; synchronized public void init() { service.setOutboundMessageNotification(new OutboundNotification()); service.setInboundMessageNotification(new InboundNotification(inboundMessages)); service.setOrphanedMessageNotification(new OrphanedNotification()); service.setCallNotification(new CallNotification()); service.setGatewayStatusNotification(new GatewayStatusNotification()); List<CommPortDevice> devices = CommUtil.getCommPortDevices(); List<SerialModemGateway> modemGateways = new ArrayList<>(); for (CommPortDevice device : devices) { logger.info("Add gateway: {},{}", device.getSerialPort(), device.getBaudRate()); SerialModemGateway gateway = new SerialModemGateway( "modem." + device.getSerialPort(), device.getSerialPort(), device.getBaudRate(), "WAVECOM", ""); gateway.setInbound(true); gateway.setOutbound(true); try { service.addGateway(gateway); modemGateways.add(gateway); } catch (GatewayException e) { logger.error(e.getMessage(), e); } } // 啟用輪循模式 service.S.SERIAL_POLLING = true; // 關閉運營商選擇 service.S.DISABLE_COPS = true; try { logger.info("ModemService starting"); service.startService(); ready = true; logger.info("ModemService started"); } catch (SMSLibException |IOException|InterruptedException e) { logger.error(e.getMessage(), e); } for (SerialModemGateway gateway : modemGateways) { try{ logger.debug("Gateway: {}", gateway.getGatewayId()); logger.debug(" Manufacturer: {}", gateway.getManufacturer()); logger.debug(" Model: {}", gateway.getModel()); logger.debug(" Serial No: {}", gateway.getSerialNo()); logger.debug(" SIM IMSI: {}", gateway.getImsi()); logger.debug(" Signal Level: {}", gateway.getSignalLevel() + " dBm"); logger.debug(" Battery Level: {}", gateway.getBatteryLevel() + "%"); } catch (Exception e){ logger.error(e.getMessage(), e); } } } synchronized public void stop() { try { ready = false; service.stopService(); } catch (SMSLibException|IOException|InterruptedException e) { logger.error(e.getMessage(), e); } } synchronized public boolean send(OutboundMessage msg) { msg.setEncoding(Message.MessageEncodings.ENCUCS2); // 中文 msg.setStatusReport(true); // 發送狀態報告 try { outboundMessages.addFirst(msg); logger.info("Send message: {}, {}", msg.getRefNo(), msg.getText()); boolean result = service.sendMessage(msg); logger.info("Send message done"); return result; } catch (TimeoutException |GatewayException|IOException|InterruptedException e) { logger.error(e.getMessage(), e); return false; } } synchronized public boolean queue(OutboundMessage msg) { AbstractQueueManager queueManager = service.getQueueManager(); for (AGateway gateway : service.getGateways()) { int queueSize = queueManager.pendingQueueSize(gateway.getGatewayId()); logger.info("Queue size: {} {}", gateway.getGatewayId(), queueSize); } msg.setEncoding(Message.MessageEncodings.ENCUCS2); // 中文 msg.setStatusReport(true); // 發送狀態報告 return service.queueMessage(msg); } public int countInboundMessages() { return inboundMessages.size(); } public List<InboundMessage> listInboundMessages(int offset, int limit) { if (offset > inboundMessages.size() - 1) offset = inboundMessages.size() - 1; if (offset < 0) offset = 0; int to = offset + limit; if (to < offset) to = offset; if (to > inboundMessages.size()) to = inboundMessages.size(); return inboundMessages.subList(offset, to); } public boolean isReady() { return ready; } public int countOutboundMessages() { return outboundMessages.size(); } public List<OutboundMessage> listOutboundMessages(int offset, int limit) { if (offset > outboundMessages.size() - 1) offset = outboundMessages.size() - 1; if (offset < 0) offset = 0; int to = offset + limit; if (to < offset) to = offset; if (to > outboundMessages.size()) to = outboundMessages.size(); return outboundMessages.subList(offset, to); } synchronized public void purge() { while (inboundMessages.size() > HISTORY_SIZE) { inboundMessages.removeLast(); } while (outboundMessages.size() > HISTORY_SIZE) { outboundMessages.removeLast(); } } public static void main(String[] args) { ModemService modemService = new ModemService(); modemService.init(); /*OutboundMessage msg = new OutboundMessage("13800138000", "English 中文短信內容, 中文短信內容,中文短信內容"); boolean result = modemService.send(msg); logger.info("result {}", result);*/ logger.info("Now Sleeping - Hit <enter> to terminate."); try { System.in.read(); } catch (IOException e) { logger.error(e.getMessage(), e); } logger.info("Now stopping"); modemService.stop(); } }
對於inbound消息的處理, 要區分普通短信和到達通知消息. 為防止寫滿SIM卡, 在接收到消息后都會立即刪除
public class InboundNotification implements IInboundMessageNotification { private static final Logger logger = LoggerFactory.getLogger(InboundNotification.class); private final List<InboundMessage> inboundMessages; public InboundNotification(List<InboundMessage> inboundMessages) { this.inboundMessages = inboundMessages; } @Override public void process(AGateway gateway, MessageTypes msgType, InboundMessage msg) { logger.info("Inbound notification from: {}", gateway.getGatewayId()); logger.info("messageType: {}", msgType); logger.info("message: {}", msg); inboundMessages.add(0, msg); if (msg.getMemLocation().equals("SR")) { try { if (gateway.deleteMessage(msg)) { logger.info("Deleted status report: {}", msg.getId()); } else { logger.error("Failed to delete status report: {}", msg.getId()); } } catch (TimeoutException|GatewayException|IOException|InterruptedException e) { logger.error(e.getMessage(), e); } } else { try { if (gateway.deleteMessage(msg)) { logger.info("Deleted message: {}", msg.getId()); } else { logger.error("Failed to delete message: {}", msg.getId()); } } catch (TimeoutException|GatewayException|IOException|InterruptedException e) { logger.error(e.getMessage(), e); } } } }
使用中的幾點觀察:
1. 代碼中如果不調用 service.stopService() 方法, main進程將一直堵塞. 在Windows 10下, 若未調用stopService()即退出Java應用, 可能導致再次啟動失敗.
2. 如果SIM卡未注冊到移動網絡(接觸不良, 信號不好, 或欠費), 都會導致service.startService() 方法執行失敗
3. startService() 和 stopService() 都會需要幾秒的執行時間.
4. sendMessage()是實時發送, 但是接收短信在服務啟動后有數分鍾的延遲, 數條短信會在延遲七八分鍾后一起到達, 在服務運行一段時間后, 接收短信延遲會變小.
本文使用的硬件
從節電考慮, 未使用普通的x86服務器或主機, 而是使用了一塊Amlogic S905L, 1G RAM的R3300-L, 在上面用8G TF card運行了64位的Armbian Linux, 經實際測試, 運行穩定.
進一步了解Smslib的內部機制
對於單張卡每小時200條每日1000條的限制, 但是物理發送的頻率為每分鍾12次. 實際使用中, 群發任務會集中在每日的特定時段, 目標是在任務啟動后不超出運營商限制的前提下盡快完成, 因此可能會要求單張卡用滿每小時200條的頻率另外在發送滿1000條后標記為不可用. 在這個前提下, 假定群發的短信為10K條, 那么不同數量SIM卡並發的情況下最快執行時間為:
1: 9天5小時 2: 4天5小時 4: 2天3小時 9: 1天0.5小時 10: 4小時20分鍾 16: 3小時3分鍾 20: 2小時10分鍾 32: 1小時12分鍾 40: 1小時5分鍾 100: 10分鍾 128: 7分鍾
在SIM卡數量較多時, 如何提高發送效率? 使用同步鎖是不行的, 需要改進. 研究smslib的代碼得到的信息:
1. AGateway有多個實現, ModemGateway只是其中的一種, 還有HTTPGateway, SMPPGateway. 所以使用smslib實際上是可以將http通道一塊合並進來作為發送出口的.
Smslib內部實現了eztexting.com, bulksms.vsms.net, Kannel, Skype, clickatell.com這幾家的短信接口, 如果使用自己的HTTP通道, 可以擴展HTTPGateway實現, 需要實現的方法為 startGateway(), stopGateway(), sendMessage(OutboundMessage msg), queryMessage(String refNo), queryCoverage(OutboundMessage msg), queryBalance()
2. Smslib的Service實現了群發機制
通過一個Collection<Group> groups的成員變量, 管理可用的手機號組, 可以通過接口addToGroup(), removeFromGroup()管理各個組的手機號, 在發送時, 如果OutboundMessage的recipient命中group名稱, 則會將這個OutboundMessage展開為一個列表, 再通過gateway進行發送.
這個群發機制是支持多層遞歸的, 即一個group為aaa, 可以將aaa加入到bbb, 那么發送到bbb的消息, 會進一步將aaa里的手機號也加入進來.
3. 使用ModemGateway發送短信時, 其sendMessage方法內部是加了同步鎖的
加鎖代碼為 synchronized(this.getDriver().getSYNCCommander()) { ... } 內部再區分是Protocols.PDU 還是 Protocols.TEXT 進行發送.
進一步查看ModemGateway這個類的代碼, 其中大量使用了上面提到的同步鎖, 因此外部代碼加鎖是不必要的, 可以去掉.
4. Service只有在STOPPED的狀態下, 才能addGateway()和removeGateway(), 因此動態管理gateway是不行的. 在運行中, gateway可以start(), stop(), 但是默認的 RoundRobinLoadBalancer 在取gateway時並不會判斷gateway的狀態, 所以如果要動態分配gateway, 需要自己控制.
5. 短信發送后, 其到達狀態的關聯依據為refNo, 這個值在發送成功后會寫回OutboundMessage對象.
可改進的方向
假定運行設備為32口的moden pool, pool根據當前的時間和歷史發送記錄, 定時計算當前可發的短信數量
對於每一個提交的發送任務, 先分解為最終的短信列表,
判斷是否小於pool的當前可發短信數量, 若超出則返回失敗, 否則繼續
維護一個任務隊列, 一個結果隊列
每一個gateway關聯一個executor, 實時判斷當前是否可發短信(超限或狀態為stopped), 如果不可則睡眠一個隨機時間后再次判斷, 如果可以則從短信隊列中取出最早一條進行發送
如果短信隊列為空, 則睡眠隨機時間后再次執行gateway步驟
gateway發送一條短信完成后, 根據結果將短信放入結果列, 同時再次執行gateway步驟
根據配置是否失敗重試, 以及重試次數上限, 將結果隊列中狀態為失敗且可重試的短信再放入任務隊列, 直至結果隊列中再無需要再重試的短信
根據Inbound SR, 更新結果隊列中的短信記錄
定時清理結果隊列中時間較早的記錄
對任務提供接口回調, 以及查詢接口