實時Android語音對講系統架構


本文屬於Android局域網內的語音對講項目系列,《通過UDP廣播實現Android局域網Peer Discovering》實現了局域網內的廣播及多播通信,本文將重點說明系統架構,音頻信號的實時錄制、播放及編解碼相關技術。

本文主要包含以下內容:

  1. AudioRecord、AudioTrack
  2. Speex編解碼
  3. Android語音對講系統架構

一、AudioRecord、AudioTrack

AudioRecorder和AudioTracker是Android中獲取實時音頻數據的接口。在網絡電話、語音對講等場景中,由於實時性的要求,不能采用文件傳輸,因此,MediaRecorder和MediaPlayer就無法使用。

AudioRecorder和AudioTracker是Android在Java層對libmedia庫的封裝,所以效率較高,適合於實時語音相關處理的應用。在使用時,AudioRecorder和AudioTracker的構造器方法入參較多,這里對其進行詳細的解釋。

AudioRecord

public AudioRecord(int audioSource, int sampleRateInHz, int channelConfig, int audioFormat, int bufferSizeInBytes)

其中,audioSource表示錄音來源,在AudioSource中列舉了不同的音頻來源,包括:

AudioSource.DEFAULT:默認音頻來源
AudioSource.MIC:麥克風(常用)
AudioSource.VOICE_UPLINK:電話上行
AudioSource.VOICE_DOWNLINK:電話下行
AudioSource.VOICE_CALL:電話、含上下行
AudioSource.CAMCORDER:攝像頭旁的麥克風
AudioSource.VOICE_RECOGNITION:語音識別
AudioSource.VOICE_COMMUNICATION:語音通信

這里比較常用的有MICVOICE_COMMUNICATIONVOICE_CALL

sampleRateInHz表示采樣頻率。音頻的采集過程要經過抽樣量化編碼三步。抽樣需要關注抽樣率。聲音是機械波,其特征主要包括頻率和振幅(即音調和音量),頻率對應時間軸線,振幅對應電平軸線。采樣是指間隔固定的時間對波形進行一次記錄,采樣率就是在1秒內采集樣本的次數。量化過程就是用數字表示振幅的過程。編碼是一個減少信息量的過程,任何數字音頻編碼方案都是有損的。PCM編碼(脈沖編碼調制)是一種保真水平較高的編碼方式。在Android平台,44100Hz是唯一目前所有設備都保證支持的采樣頻率。但比如22050、16000、11025也在大多數設備上得到支持。8000是針對某些低質量的音頻通信使用的。

channelConfig表示音頻通道,即選擇單聲道、雙聲道等參數。系統提供的選擇如下:

public static final int CHANNEL_IN_DEFAULT = 1;
// These directly match native
public static final int CHANNEL_IN_LEFT = 0x4;
public static final int CHANNEL_IN_RIGHT = 0x8;
public static final int CHANNEL_IN_FRONT = 0x10;
public static final int CHANNEL_IN_BACK = 0x20;
public static final int CHANNEL_IN_LEFT_PROCESSED = 0x40;
public static final int CHANNEL_IN_RIGHT_PROCESSED = 0x80;
public static final int CHANNEL_IN_FRONT_PROCESSED = 0x100;
public static final int CHANNEL_IN_BACK_PROCESSED = 0x200;
public static final int CHANNEL_IN_PRESSURE = 0x400;
public static final int CHANNEL_IN_X_AXIS = 0x800;
public static final int CHANNEL_IN_Y_AXIS = 0x1000;
public static final int CHANNEL_IN_Z_AXIS = 0x2000;
public static final int CHANNEL_IN_VOICE_UPLINK = 0x4000;
public static final int CHANNEL_IN_VOICE_DNLINK = 0x8000;
public static final int CHANNEL_IN_MONO = CHANNEL_IN_FRONT;
public static final int CHANNEL_IN_STEREO = (CHANNEL_IN_LEFT | CHANNEL_IN_RIGHT);

常用的是CHANNEL_IN_MONOCHANNEL_IN_STEREO分別表示單通道輸入和左右兩通道輸入。

audioFormat指定返回音頻數據的格式,常見的選擇包括ENCODING_PCM_16BITENCODING_PCM_8BITENCODING_PCM_FLOATENCODING_PCM_16BIT表示PCM 16bits每個樣本,所有設備保證支持。ENCODING_PCM_8BIT自然表示PCM 8bits每個樣本。ENCODING_PCM_FLOAT表示一個單精度浮點數表示一個樣本。

bufferSizeInBytes表示錄音時音頻數據寫入的buffer的大小。這個數值是通過另一個方法來獲取的:getMinBufferSizegetMinBufferSize是AudioRecord類的靜態方法,返回值就是bufferSizeInBytes。這里我們來看下它的入參:

static public int getMinBufferSize(int sampleRateInHz, int channelConfig, int audioFormat)

sampleRateInHz, channelConfig, audioFormat三個參數與上面的含義完全一樣,代表錄音的采樣率、通道以及數據輸出的格式。綜上,AudioRecord的初始化方法如下:

// 獲取音頻數據緩沖段大小
inAudioBufferSize = AudioRecord.getMinBufferSize(
        Constants.sampleRateInHz, Constants.inputChannelConfig, Constants.audioFormat);
// 初始化音頻錄制
audioRecord = new AudioRecord(Constants.audioSource,
        Constants.sampleRateInHz, Constants.inputChannelConfig, Constants.audioFormat, inAudioBufferSize);

其中,參數設置如下:

// 采樣頻率,44100保證兼容性
public static final int sampleRateInHz = 44100;
// 音頻數據格式:PCM 16位每個樣本,保證設備支持。
public static final int audioFormat = AudioFormat.ENCODING_PCM_16BIT;

// 音頻獲取源
public static final int audioSource = MediaRecorder.AudioSource.MIC;
// 輸入單聲道
public static final int inputChannelConfig = AudioFormat.CHANNEL_IN_MONO;

AudioTrack

public AudioTrack(int streamType, int sampleRateInHz, int channelConfig, int audioFormat,
        int bufferSizeInBytes, int mode) throws IllegalArgumentException {
    this(streamType, sampleRateInHz, channelConfig, audioFormat,
            bufferSizeInBytes, mode, AudioManager.AUDIO_SESSION_ID_GENERATE);
}

與AudioRecord類似,AudioTrack的構造器方法依然有很多需要選擇的參數。其中,streamType表示音頻流播放類型,AudioManager中列出了可選的類型如下:

/** The audio stream for phone calls */
public static final int STREAM_VOICE_CALL = AudioSystem.STREAM_VOICE_CALL;
/** The audio stream for system sounds */
public static final int STREAM_SYSTEM = AudioSystem.STREAM_SYSTEM;
/** The audio stream for the phone ring */
public static final int STREAM_RING = AudioSystem.STREAM_RING;
/** The audio stream for music playback */
public static final int STREAM_MUSIC = AudioSystem.STREAM_MUSIC;
/** The audio stream for alarms */
public static final int STREAM_ALARM = AudioSystem.STREAM_ALARM;
/** The audio stream for notifications */
public static final int STREAM_NOTIFICATION = AudioSystem.STREAM_NOTIFICATION;
/** @hide The audio stream for phone calls when connected to bluetooth */
public static final int STREAM_BLUETOOTH_SCO = AudioSystem.STREAM_BLUETOOTH_SCO;
/** @hide The audio stream for enforced system sounds in certain countries (e.g camera in Japan) */
public static final int STREAM_SYSTEM_ENFORCED = AudioSystem.STREAM_SYSTEM_ENFORCED;
/** The audio stream for DTMF Tones */
public static final int STREAM_DTMF = AudioSystem.STREAM_DTMF;
/** @hide The audio stream for text to speech (TTS) */
public static final int STREAM_TTS = AudioSystem.STREAM_TTS;

常用的有STREAM_VOICE_CALLSTREAM_MUSIC等,需要根據應用特點進行選擇。

sampleRateInHzaudioFormat需與AudioRecord中的參數保持一致,這里不再介紹。

channelConfig與AudioRecord中的參數保持對應,比如AudioRecord選擇了AudioFormat.CHANNEL_IN_MONO(單通道音頻輸入),這里需要選擇AudioFormat.CHANNEL_OUT_MONO(單通道音頻輸出)。

bufferSizeInBytes表述音頻播放緩沖區大小,同樣,也需要根據AudioTrack的靜態方法getMinBufferSize來獲取。

static public int getMinBufferSize(int sampleRateInHz, int channelConfig, int audioFormat) 

sampleRateInHz, channelConfig, audioFormat三個參數與上面的含義完全一樣,代表輸出音頻的采樣率、通道以及數據輸出的格式。

最后說明下modeAudioManager.AUDIO_SESSION_ID_GENERATEmode代表音頻輸出的模式:MODE_STATICMODE_STREAM,分別表示靜態模式和流模式。AudioManager.AUDIO_SESSION_ID_GENERATE表示AudioSessionId,即AudioTrack依附到哪個音頻會話。

比如,要給AudioRecord添加回聲消除AcousticEchoCancelerAcousticEchoCanceler的構建方法create的入參就是sessionId,通過AudioRecord實例的getAudioSessionId()方法獲取。

綜上,AudioTrack的初始化方法如下:

public Tracker() {
    // 獲取音頻數據緩沖段大小
    outAudioBufferSize = AudioTrack.getMinBufferSize(
            Constants.sampleRateInHz, Constants.outputChannelConfig, Constants.audioFormat);
    // 初始化音頻播放
    audioTrack = new AudioTrack(Constants.streamType,
            Constants.sampleRateInHz, Constants.outputChannelConfig, Constants.audioFormat,
            outAudioBufferSize, Constants.trackMode);
}

其中,參數設置如下:

// 音頻播放端
public static final int streamType = AudioManager.STREAM_VOICE_CALL;
// 輸出單聲道
public static final int outputChannelConfig = AudioFormat.CHANNEL_OUT_MONO;
// 音頻輸出模式
public static final int trackMode = AudioTrack.MODE_STREAM;

二、Speex編解碼

Speex是一個聲音編碼格式,目標是用於網絡電話、線上廣播使用的語音編碼,基於CELP(一種語音編碼算法)開發,Speex宣稱可以免費使用,以BSD授權條款開放源代碼。

Speex是由C語言開發的音頻處理庫,在Android中使用,需要通過JNI來調用。因此,對NDK開發不熟悉的朋友,可以先了解下文檔:向您的項目添加 C 和 C++ 代碼

在Android Studio中使用C/C++庫有兩種方式:cmake和ndk-build。cmake是最新支持的方法,通過配置CMakeLists.txt文件來實現;ndk-build是傳統的方式,通過配置Android.mk文件來實現。具體語法參考相關文檔,這里不做深入介紹。配置完上述文件之后,需要將Gradle關聯到原生庫,通過AS的Link C++ Project with Gradle功能實現。

完成上述配置之后,正式開始在Android中使用Speex進行音頻編解碼。主要包括以下步驟:

  1. 下載Speex源碼。推薦使用Speex 1.2.0穩定版,由於目前Speex 已不再繼續維護,官方建議使用Opus。但在某些場合,使用Speex已然足夠滿足需求。
    Speex源碼
  2. src/main下創建jni文件夾,將上述Speex源碼中includelibspeex文件夾拷貝到jni文件夾下。
  3. 編寫Android.mk文件和Application.mk文件。
    Android.mk
LOCAL_PATH := $(call my-dir)
include $(CLEAR_VARS)
LOCAL_LDLIBS :=-llog
LOCAL_MODULE    := libspeex
LOCAL_CFLAGS = -DFIXED_POINT -DUSE_KISS_FFT -DEXPORT="" -UHAVE_CONFIG_H
LOCAL_C_INCLUDES := $(LOCAL_PATH)/include
LOCAL_SRC_FILES := speex_jni.cpp \
        ./libspeex/bits.c \
        ./libspeex/cb_search.c \
        ./libspeex/exc_10_16_table.c \
        ./libspeex/exc_10_32_table.c \
        ./libspeex/exc_20_32_table.c \
        ./libspeex/exc_5_256_table.c \
        ./libspeex/exc_5_64_table.c \
        ./libspeex/exc_8_128_table.c \
        ./libspeex/filters.c \
        ./libspeex/gain_table_lbr.c \
        ./libspeex/gain_table.c \
        ./libspeex/hexc_10_32_table.c \
        ./libspeex/hexc_table.c \
        ./libspeex/high_lsp_tables.c \
        ./libspeex/kiss_fft.c \
        ./libspeex/kiss_fftr.c \
        ./libspeex/lpc.c \
        ./libspeex/lsp_tables_nb.c \
        ./libspeex/lsp.c \
        ./libspeex/ltp.c \
        ./libspeex/modes_wb.c \
        ./libspeex/modes.c \
        ./libspeex/nb_celp.c \
        ./libspeex/quant_lsp.c \
        ./libspeex/sb_celp.c \
        ./libspeex/smallft.c \
        ./libspeex/speex_callbacks.c \
        ./libspeex/speex_header.c \
        ./libspeex/speex.c \
        ./libspeex/stereo.c \
        ./libspeex/vbr.c \
        ./libspeex/vorbis_psy.c \
        ./libspeex/vq.c \
        ./libspeex/window.c \
include $(BUILD_SHARED_LIBRARY)

Application.mk

APP_ABI := armeabi armeabi-v7a
  1. 新建speex_config_types.h文件。在jnispeex源碼目錄下的include/speex文件夾下,有一個speex_config_types.h.in文件,在include/speex目錄下創建speex_config_types.h,把speex_config_types.h.in的內容拷貝過來,然后把@SIZE16@改成short,把@SIZE32@改成int,對應標准C/C++數據類型。這個文件的內容如下:
#ifndef __SPEEX_TYPES_H__
#define __SPEEX_TYPES_H__
typedef short spx_int16_t;
typedef unsigned short spx_uint16_t;
typedef int spx_int32_t;
typedef unsigned int spx_uint32_t;
#endif
  1. 在Java層定義編解碼需要的接口。
public class Speex {
    static {
        try {
            System.loadLibrary("speex");
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
    public native int open(int compression);
    public native int getFrameSize();
    public native int decode(byte encoded[], short lin[], int size);
    public native int encode(short lin[], int offset, byte encoded[], int size);
    public native void close();
}
  1. 在C層實現上述方法(以encode為例)。
extern "C"
JNIEXPORT jint JNICALL Java_com_jd_wly_intercom_audio_Speex_encode
    (JNIEnv *env, jobject obj, jshortArray lin, jint offset, jbyteArray encoded, jint size) {

    jshort buffer[enc_frame_size];
    jbyte output_buffer[enc_frame_size];
    int nsamples = (size-1)/enc_frame_size + 1;
    int i, tot_bytes = 0;

    if (!codec_open)
        return 0;

    speex_bits_reset(&ebits);

    for (i = 0; i < nsamples; i++) {
        env->GetShortArrayRegion(lin, offset + i*enc_frame_size, enc_frame_size, buffer);
        speex_encode_int(enc_state, buffer, &ebits);
    }

    tot_bytes = speex_bits_write(&ebits, (char *)output_buffer, enc_frame_size);
    env->SetByteArrayRegion(encoded, 0, tot_bytes, output_buffer);

    return (jint)tot_bytes;
}
  1. 命令行到Android.mk文件夾下,執行命令ndk-build
D:\dev\study\intercom\WlyIntercom\app\src\main\jni>ndk-build
[armeabi] Compile++ thumb: speex <= speex_jni.cpp
[armeabi] Compile thumb  : speex <= bits.c
[armeabi] Compile thumb  : speex <= cb_search.c
[armeabi] Compile thumb  : speex <= exc_10_16_table.c
[armeabi] Compile thumb  : speex <= exc_10_32_table.c
[armeabi] Compile thumb  : speex <= exc_20_32_table.c
[armeabi] Compile thumb  : speex <= exc_5_256_table.c
[armeabi] Compile thumb  : speex <= exc_5_64_table.c
[armeabi] Compile thumb  : speex <= exc_8_128_table.c
[armeabi] Compile thumb  : speex <= filters.c
[armeabi] Compile thumb  : speex <= gain_table_lbr.c
[armeabi] Compile thumb  : speex <= gain_table.c
[armeabi] Compile thumb  : speex <= hexc_10_32_table.c
[armeabi] Compile thumb  : speex <= hexc_table.c
[armeabi] Compile thumb  : speex <= high_lsp_tables.c
[armeabi] Compile thumb  : speex <= kiss_fft.c
[armeabi] Compile thumb  : speex <= kiss_fftr.c
[armeabi] Compile thumb  : speex <= lpc.c
[armeabi] Compile thumb  : speex <= lsp_tables_nb.c
[armeabi] Compile thumb  : speex <= lsp.c
[armeabi] Compile thumb  : speex <= ltp.c
[armeabi] Compile thumb  : speex <= modes_wb.c
[armeabi] Compile thumb  : speex <= modes.c
[armeabi] Compile thumb  : speex <= nb_celp.c
[armeabi] Compile thumb  : speex <= quant_lsp.c
[armeabi] Compile thumb  : speex <= sb_celp.c
[armeabi] Compile thumb  : speex <= smallft.c
[armeabi] Compile thumb  : speex <= speex_callbacks.c
[armeabi] Compile thumb  : speex <= speex_header.c
[armeabi] Compile thumb  : speex <= speex.c
[armeabi] Compile thumb  : speex <= stereo.c
[armeabi] Compile thumb  : speex <= vbr.c
[armeabi] Compile thumb  : speex <= vorbis_psy.c
[armeabi] Compile thumb  : speex <= vq.c
[armeabi] Compile thumb  : speex <= window.c
[armeabi] StaticLibrary  : libstdc++.a
[armeabi] SharedLibrary  : libspeex.so
[armeabi] Install        : libspeex.so => libs/armeabi/libspeex.so

生成libs/armeabi/libspeex.so和對應的obj文件,如需單獨使用,將上述過程生成的*.so包拷貝至jniLibs文件夾中。
8. 最后,在Android中通過Java去調用encode方法進行音頻數據的編碼。

/**
 * 將raw原始音頻文件編碼為Speex格式
 *
 * @param audioData 原始音頻數據
 * @return 編碼后的數據
 */
public static byte[] raw2spx(short[] audioData) {
    // 原始數據中包含的整數個encFrameSize
    int nSamples = audioData.length / encFrameSize;
    byte[] encodedData = new byte[((audioData.length - 1) / encFrameSize + 1) * encodedFrameSize];
    short[] rawByte;
    // 將原數據轉換成spx壓縮的文件
    byte[] encodingData = new byte[encFrameSize];
    int readTotal = 0;
    for (int i = 0; i < nSamples; i++) {
        rawByte = new short[encFrameSize];
        System.arraycopy(audioData, i * encFrameSize, rawByte, 0, encFrameSize);
        int encodeSize = Speex.getInstance().encode(rawByte, 0, encodingData, rawByte.length);
        System.arraycopy(encodingData, 0, encodedData, readTotal, encodeSize);
        readTotal += encodeSize;
    }
    rawByte = new short[encFrameSize];
    System.arraycopy(audioData, nSamples * encFrameSize, rawByte, 0, audioData.length - nSamples * encFrameSize);
    int encodeSize = Speex.getInstance().encode(rawByte, 0, encodingData, rawByte.length);
    System.arraycopy(encodingData, 0, encodedData, readTotal, encodeSize);
    return encodedData;
}

這里設置了每幀處理160個short型數據,壓縮比為5,每幀輸出為28個byte型數據。Speex壓縮模式特征如下:
Speex壓縮模式特征

原文綜合考慮音頻質量、壓縮比和算法復雜度,最后選擇了Mode 5。

private static final int DEFAULT_COMPRESSION = 5;

三、Android語音對講項目系統架構

再次說明,本文實現參考了論文:Android real-time audio communications over local wireless,因此系統架構如下圖所示:

Android對講機系統架構

數據包要經過Record、Encoder、Transmission、Decoder、Play這一鏈條的處理,這種數據流轉就是對講機核心抽象。鑒於這種場景,本文的實現采用了責任鏈設計模式。責任鏈模式屬於行為型模式,表征對對象的某種行為。

創建型模式,共五種:工廠方法模式、抽象工廠模式、單例模式、建造者模式、原型模式。
結構型模式,共七種:適配器模式、裝飾器模式、代理模式、外觀模式、橋接模式、組合模式、享元模式。
行為型模式,共十一種:策略模式、模板方法模式、觀察者模式、迭代子模式、責任鏈模式、命令模式、備忘錄模式、狀態模式、訪問者模式、中介者模式、解釋器模式。

責任鏈設計模式的使用場景:在責任鏈模式里,很多對象里由每一個對象對其下家的引用而連接起來形成一條鏈。請求在這個鏈上傳遞,直到鏈上的某一個對象決定處理此請求。發出這個請求的客戶端並不知道鏈上的哪一個對象最終處理這個請求,這使得系統可以在不影響客戶端的情況下動態地重新組織和分配責任。下面來看下具體的代碼:

首先定義一個JobHandler,代表每個對象,其中包含抽象方法handleRequest():

/**
 * 數據處理節點
 *
 * @param <I> 輸入數據類型
 * @param <O> 輸出數據類型
 * @author yanghao1
 */
public abstract class JobHandler<I, O> {

    private JobHandler<O, ?> nextJobHandler;

    public JobHandler<O, ?> getNextJobHandler() {
        return nextJobHandler;
    }

    public void setNextJobHandler(JobHandler<O, ?> nextJobHandler) {
        this.nextJobHandler = nextJobHandler;
    }

    public abstract void handleRequest(I audioData);

    /**
     * 釋放資源
     */
    public void free() {

    }
}

JobHandler<I, O>表示輸入數據類型為I,輸出類型為OnextJobHandler表示下一個處理請求的節點,其類型為JobHandler<O, ?>,即輸入數據類型必須為上一個處理節點的輸出數據類型。

繼承類必須實現抽象方法handleRequest(),參數類型為I,實現對數據包的處理。free()方法實現資源的釋放,繼承類可根據情況重寫該方法。這里分別定義RecorderEncoderSenderReceiverDecoderTracker,均繼承自JobHandler

RecorderEncoderSender為例說明輸入側數據的處理(這里僅列出部分代碼,具體代碼參考github地址):

/**
 * 音頻錄制數據格式ENCODING_PCM_16BIT,返回數據類型為short[]
 *
 * @author yanghao1
 */
public class Recorder extends JobHandler<short[], short[]> {

    @Override
    public void handleRequest(short[] audioData) {
        if (audioRecord.getRecordingState() == AudioRecord.RECORDSTATE_STOPPED) {
            audioRecord.startRecording();
        }
        // 實例化音頻數據緩沖
        audioData = new short[inAudioBufferSize];
        audioRecord.read(audioData, 0, inAudioBufferSize);
        getNextJobHandler().handleRequest(audioData);
    }
}

Recorder完成音頻采集之后,通過getNextJobHandler()方法獲取對下一個處理節點的引用,然后調用其方法handleRequest(),並且入參類型為short[]Recorder的下一個處理節點是Encoder,在EncoderhandleRequest()方法中,實現音頻數據的編碼,其輸入類型為short[],輸出為byte[]

/**
 * 音頻編碼,輸入類型為short[],輸出為byte[]
 *
 * @author yanghao1
 */
public class Encoder extends JobHandler<short[], byte[]> {

    @Override
    public void handleRequest(short[] audioData) {
        byte[] encodedData = AudioDataUtil.raw2spx(audioData);
        getNextJobHandler().handleRequest(encodedData);
    }
}

Encoder的下一個處理節點是Sender,在SenderhandleRequest()方法中,通過多播(組播),將音頻編碼數據發送給局域網內的其它設備。

/**
 * UDP多播發送
 *
 * @author yanghao1
 */
public class Sender extends JobHandler<byte[], byte[]> {

    @Override
    public void handleRequest(byte[] audioData) {
        DatagramPacket datagramPacket = new DatagramPacket(
                audioData, audioData.length, inetAddress, Constants.MULTI_BROADCAST_PORT);
        try {
            multicastSocket.send(datagramPacket);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

最后,在AudioInput類的構造函數中執行對象之間的關系:

/**
 * 音頻錄制、編碼、發送線程
 *
 * @author yanghao1
 */
public class AudioInput implements Runnable {

    private Recorder recorder;
    private Encoder encoder;
    private Sender sender;
    private Handler handler;

    // 錄制狀態
    private boolean recording = false;

    public AudioInput(Handler handler) {
        this.handler = handler;
        initJobHandler();
    }

    /**
     * 初始化錄制、編碼、發送,並指定關聯
     */
    private void initJobHandler() {
        recorder = new Recorder();
        encoder = new Encoder();
        sender = new Sender(handler);
        recorder.setNextJobHandler(encoder);
        encoder.setNextJobHandler(sender);
    }
}

即:在界面初始化AudioInput對應的線程的時候,就完成這些類的實例化,並指定Recorder的下一個處理者是Encoder,Encoder的下一個處理者是Sender。這樣使得整個處理流程非常靈活,比如,如果暫時沒有開發編解碼的過程,在Encoder的handleRequest()方法中直接指定下一個處理者:

public class Encoder extends JobHandler {

    @Override
    public void handleRequest(byte[] audioData) {
        getNextJobHandler().handleRequest(audioData);
    }
}

同樣的,在初始化AudioOutput對應的線程時,完成ReceiverDecoderTracker的實例化,並且指定Receiver的下一個處理者是DecoderDecoder的下一個處理者是Tracker

在Activity中,分別申明輸入、輸出Runable、線程池對象、界面更新Handler:

// 界面更新Handler
private AudioHandler audioHandler = new AudioHandler(this);

// 音頻輸入、輸出Runable
private AudioInput audioInput;
private AudioOutput audioOutput;

// 創建緩沖線程池用於錄音和接收用戶上線消息(錄音線程可能長時間不用,應該讓其超時回收)
private ExecutorService inputService = Executors.newCachedThreadPool();

// 創建循環任務線程用於間隔的發送上線消息,獲取局域網內其他的用戶
private ScheduledExecutorService discoverService = Executors.newScheduledThreadPool(1);

// 設置音頻播放線程為守護線程
private ExecutorService outputService = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread thread = Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);
        return thread;
    }
});

可能有的同學會覺得這里的責任鏈設計模式用法並非真正的責任鏈,真正的責任鏈模式要求一個具體的處理者對象只能在兩個行為中選擇一個:一是承擔責任,而是把責任推給下家。不允許出現某一個具體處理者對象在承擔了一部分責任后又把責任向下傳的情況。
本文中責任鏈設計模式的用法確實不是嚴格的責任鏈模式,但學習的目的不就是活學活用嗎?

Android線程池

上述代碼涉及Android中的線程池,與Android線程池相關的類包括:ExecutorExecutorsExecutorServiceFutureCallableThreadPoolExecutor等,為了理清它們之間的關系,首先從Executor開始:

  • Executor接口中定義了一個方法 execute(Runnable command),該方法接收一個 Runable 實例,它用來執行一個任務,任務即一個實現了Runnable 接口的類。
  • ExecutorService接口繼承自Executor 接口,它提供了更豐富的實現多線程的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個異步任務執行狀況而生成Future 的方法。 可以調用 ExecutorService shutdown()方法來平滑地關閉 ExecutorService,調用該方法后,將導致 ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢后將會關閉 ExecutorService。因此我們一般用該接口來實現和管理多線程。
  • Executors 提供了一系列工廠方法用於創建線程池,返回的線程池都實現了 ExecutorService 接口。包括:
    1. newCachedThreadPool()
      創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程;
    2. newFixedThreadPool(int)
      創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
    3. newScheduledThreadPool(int)
      創建一個定長線程池,支持定時及周期性任務執行。
    4. newSingleThreadExecutor()
      創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
  • Callable接口與Runnable接口類似,ExecutorService <T> Future<T> submit(Callable<T> task)方法接受Callable作為入參,在 Java 5 之后,任務分兩類:一類是實現了 Runnable 接口的類,一類是實現了 Callable 接口的類。兩者都可以被 ExecutorService 執行,但是 Runnable 任務沒有返回值,而 Callable 任務有返回值。並且 Callable call()方法只能通過 ExecutorServicesubmit(Callable task) 方法來執行,並且返回一個 Future,是表示任務等待完成的 Future
  • ThreadPoolExecutor繼承自AbstractExecutorServiceAbstractExecutorService實現了ExecutorService 接口。ThreadPoolExecutor的構造器由於參數較多,不宜直接暴露給使用者。所以,Executors 中定義 ExecutorService 實例的工廠方法,其實是通過定義ThreadPoolExecutor不同入參來實現的。

下面來看下ThreadPoolExecutor的構造器方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
       BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {

    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

其中,corePoolSize表示線程池中所保存的核心線程數,包括空閑線程;maximumPoolSize表示池中允許的最大線程數;keepAliveTime表示線程池中的空閑線程所能持續的最長時間;unit表示時間的單位;workQueue表示任務執行前保存任務的隊列,僅保存由execute 方法提交的Runnable任務;threadFactory表示線程創建的工廠,指定線程的特性,比如前面代碼中設置音頻播放線程為守護線程;handler表示隊列容量滿之后的處理方法。

ThreadPoolExecutor對於傳入的任務Runnable有如下處理流程:

  1. 如果線程池中的線程數量少於corePoolSize,即使線程池中有空閑線程,也會創建一個新的線程來執行新添加的任務;
  2. 如果線程池中的線程數量大於等於corePoolSize,但緩沖隊列 workQueue 未滿,則將新添加的任務放到 workQueue 中,按照 FIFO 的原則依次等待執行(線程池中有線程空閑出來后依次將緩沖隊列中的任務交付給空閑的線程執行);
  3. 如果線程池中的線程數量大於等於 corePoolSize,且緩沖隊列 workQueue 已滿,但線程池中的線程數量小於 maximumPoolSize,則會創建新的線程來處理被添加的任務;
  4. 如果線程池中的線程數量等於了 maximumPoolSize,交由RejectedExecutionHandler handler處理。

ThreadPoolExecutor主要用於某些特定場合,即上述工廠方法無法滿足的時候,自定義線程池使用。本文使用了三種特性的線程池工廠方法:newCachedThreadPool()newScheduledThreadPool(int)newSingleThreadExecutor

首先,對於錄音線程,由於對講機用戶大部分時間可能是在聽,而不是說。錄音線程可能長時間不用,應該讓其超時回收,所以錄音線程宜使用CachedThreadPool
其次,對於發現局域網內的其它用戶的功能,該功能需要不斷循環執行,相當於循環的向局域網內發送心跳信號,因此宜使用ScheduledThreadPool
最后,對於音頻播放線程,該線程需要一直在后台執行,且播放需要串行執行,因此使用SingleThreadExecutor,並設置為守護線程,在UI線程(主線程是最后一個用戶線程)結束之后結束。

// 設置音頻播放線程為守護線程
private ExecutorService outputService = Executors.newSingleThreadExecutor(new ThreadFactory() {
    @Override
    public Thread newThread(@NonNull Runnable r) {
        Thread thread = Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);
        return thread;
    }
});

以上。詳細代碼請移步github:intercom


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM