fast協議解讀


背景

股票行情一般傳輸的數據類型為: int / long / float /double / string 來表示行情價格成交量之類的數據。
正常傳輸過程中,都是使用tag=value的方式。 如1=date(標號1代表日期) 2=openPrice(2表示開盤價格) 等等, 在解析每個字段之前需要先解析這個字段標號,然后通過這個標號能夠從提前約定的字段(一般編碼端和解碼端都有一個xml模板類似的約定配置文件)對應類型來解析這個字段。

前提約定:
tag : 1->日期 2->時間 3->開盤價 4->最高價 5->最低價 6->當前價。 其中tag為short類型,即2個字節
日期、時間為int; 開高低收為float ,保留3位小數

樣例數據:
1=20190310 , 2=142900 , 3=13.4 , 4=15.0 ,5=13.0 ,6=13.5

不使用fast協議來傳輸需要的字節數: tag占用字節(6*2) + value(4 + 4 + 4 +4 + 4 + 4)=36字節

同樣的數據,如果使用fast協議傳輸需要字節數: tag(6*1) +value(4 + 3 + 3+ 3+ 3)=29字節

fast協議特征

基本特征:

  1. 每個字段中所有的byte的最高位用0表示當前字節屬於該字段,用1表示這是該字段的最后一個字節(停止位特征),byte流和unicode字符串流數據部分不使用停止位特征
  2. fast協議傳輸過程中不會傳輸float/double類型的數據,而是將其根據小數位【具體每個字段小數位數在模板配置文件中約定】擴展成數字類型。
  3. 數字類型在傳輸過程中,可以為1,2,3,4,5,6,7,8,9,10個字節,具體要根據是否為有符號、無符號、以及數字的范圍來具體確定占用幾個字節
  4. 在傳輸數字時,如果涉及到有符號數的時候,第一個字節的第2位用來表示符號,0表示正數,1表示負數。
  5. 在傳輸ascii編碼類型的時候,占用1個字節。ascii編碼本身第一位為0,,所以一個字節是符合fast協議規定的。
  6. 在傳輸unicode編碼類型的時候,使用(size,真正數據)來傳輸,size代表數據真正占用的字節數。
  7. 在傳輸byte流的時候,和unicode編碼一樣,也使用(size,真正數據)來傳輸。
  8. 在傳輸unicode 和byte流的時候不使用停止位特征,即每個字節的最高位為真實數據。數據長度字段仍然使用停止位特征

下面代碼出自:openfast-1.1.1

fast協議解讀

停止位

org.openfast.template.type.codec

    /**
     * 數據編碼成功后,將最后一個字節的首位設置成1,即為停止位
     * 
     * */
    public byte[] encode(ScalarValue value) {
        byte[] encoding = encodeValue(value);
        encoding[encoding.length - 1] |= 0x80;
        return encoding;
    }

有符號數編碼類

類:org.openfast.template.type.codec.SignedInteger

    /**
     * 編碼方法
     * */
    public byte[] encodeValue(ScalarValue value) {
        long longValue = ((NumericValue) value).toLong();
        int size = getSignedIntegerSize(longValue);
        byte[] encoding = new byte[size];
        //組裝數據,即每個字節第一位不表示數據;組裝完成后仍然是大端序列(低字節位為值得高有效位)
        for (int factor = 0; factor < size; factor++) {
            //0x3f = 0011 1111
            //0x7f = 0111 1111
            int bitMask = (factor == (size - 1)) ? 0x3f : 0x7f;
            encoding[size - factor - 1] = (byte) ((longValue >> (factor * 7)) & bitMask);
        }
        // Get the sign bit from the long value and set it on the first byte
        // 01000000 00000000 ... 00000000
        // ^----SIGN BIT
        //將第一個字節的第二位設置為符號位, 0表示正數;1表示負數
        encoding[0] |= (0x40 & (longValue >> 57));
        return encoding;
    }

    /**
     * 解碼方法
     * */
    public ScalarValue decode(InputStream in) {
        long value = 0;
        try {
            // IO read方法如果返回小於-1的時候,表示結束;正常范圍0-255
            int byt = in.read();
            if (byt < 0) {
                Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                return null; // short circuit if global error handler does not throw exception
            }
            //通過首字節的第二位與運算,確認該數據的符號
            if ((byt & 0x40) > 0) { 
                value = -1;
            }
            //到此,value的符號已經確定,
            //value=0 則該數為負數, value= -1該數為正數
            // int value = -1   16進制為 0xFF FF FF FF
            // int value = 0    16進制為 0x00 00 00 00
            //下面的只是通過位操作來復原真實的數據
            value = (value << 7) | (byt & 0x7f);  //(value << 7)確保最后7位為0;     (byt & 0x7f) 還是byt
            while ((byt & 0x80) == 0) {  //根據第一位來判斷當前byte是否屬於這個字段
                byt = in.read();
                if (byt < 0) {
                    Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                    return null; // short circuit if global error handler does not throw exception
                }
                value = (value << 7) | (byt & 0x7f); //先把有效位往左移7位,然后再處理當前的七位
            }
        } catch (IOException e) {
            Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
            return null; // short circuit if global error handler does not throw exception
        }
        return createValue(value);
    }

    /**
     * 判斷無符號數所要占用的字節數
     * */
    public static int getUnsignedIntegerSize(long value) {
        if (value < 128) {
            return 1; // 2 ^ 7
        }
        if (value <= 16384) {
            return 2; // 2 ^ 14
        }
        if (value <= 2097152) {
            return 3; // 2 ^ 21
        }
        if (value <= 268435456) {
            return 4; // 2 ^ 28
        }
        if (value <= 34359738368L) {
            return 5; // 2 ^ 35
        }
        if (value <= 4398046511104L) {
            return 6; // 2 ^ 42
        }
        if (value <= 562949953421312L) {
            return 7; // 2 ^ 49
        }
        if (value <= 72057594037927936L) {
            return 8; // 2 ^ 56
        }
        return 9;
    }

    /**
     * 判斷有符號數需要占用的字節
     * */
    public static int getSignedIntegerSize(long value) {
        if ((value >= -64) && (value <= 63)) {
            return 1; // - 2 ^ 6 ... 2 ^ 6 -1
        }
        if ((value >= -8192) && (value <= 8191)) {
            return 2; // - 2 ^ 13 ... 2 ^ 13 -1
        }
        if ((value >= -1048576) && (value <= 1048575)) {
            return 3; // - 2 ^ 20 ... 2 ^ 20 -1
        }
        if ((value >= -134217728) && (value <= 134217727)) {
            return 4; // - 2 ^ 27 ... 2 ^ 27 -1
        }
        if ((value >= -17179869184L) && (value <= 17179869183L)) {
            return 5; // - 2 ^ 34 ... 2 ^ 34 -1
        }
        if ((value >= -2199023255552L) && (value <= 2199023255551L)) {
            return 6; // - 2 ^ 41 ... 2 ^ 41 -1
        }
        if ((value >= -281474976710656L) && (value <= 281474976710655L)) {
            return 7; // - 2 ^ 48 ... 2 ^ 48 -1
        }
        if ((value >= -36028797018963968L) && (value <= 36028797018963967L)) {
            return 8; // - 2 ^ 55 ... 2 ^ 55 -1
        }
        if ((value >= -4611686018427387904L && value <= 4611686018427387903L)) {
            return 9;
        }
        return 10;
    }



無符號數編碼類

org.openfast.template.type.codec.UnsignedInteger


    /**
     * 編碼方法
     * */
    public byte[] encodeValue(ScalarValue scalarValue) {
        long value = scalarValue.toLong();
        int size = getUnsignedIntegerSize(value);
        byte[] encoded = new byte[size];
        for (int factor = 0; factor < size; factor++) {
            encoded[size - factor - 1] = (byte) ((value >> (factor * 7)) & 0x7f);
        }
        return encoded;
    }

    /**
     * 
     * 解碼方法
     * */
    public ScalarValue decode(InputStream in) {
        long value = 0;
        int byt;
        try {
            do {
                byt = in.read();
                if (byt < 0) {
                    Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                    return null; // short circuit if global error handler does not throw exception
                }
                value = (value << 7) | (byt & 0x7f);
            } while ((byt & 0x80) == 0);
        } catch (IOException e) {
            Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
            return null; // short circuit if global error handler does not throw exception
        }
        return createValue(value);
    }

AsciiString編碼類

org.openfast.template.type.codec.AsciiString


    public byte[] encodeValue(ScalarValue value) {
        if ((value == null) || value.isNull()) {
            throw new IllegalStateException("Only nullable strings can represent null values.");
        }
        String string = value.toString();
        if ((string != null) && (string.length() == 0)) {
            return TypeCodec.NULL_VALUE_ENCODING;
        }
        if (string.startsWith(ZERO_TERMINATOR)) {
            return ZERO_PREAMBLE;
        }
        return string.getBytes();
    }

    public ScalarValue decode(InputStream in) {
        int byt;
        ByteArrayOutputStream buffer = Global.getBuffer();
        try {
            do {
                byt = in.read();
                if (byt < 0) {
                    Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                    return null; // short circuit if global error handler does not throw exception
                }
                buffer.write(byt);
            } while ((byt & 0x80) == 0);
        } catch (IOException e) {
            Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
            return null; // short circuit if global error handler does not throw exception
        }
        byte[] bytes = buffer.toByteArray();
        //復原最后一個字節為真實數據
        bytes[bytes.length - 1] &= 0x7f;
        if (bytes[0] == 0) {
            if (!ByteUtil.isEmpty(bytes))
                Global.handleError(FastConstants.R9_STRING_OVERLONG, null);
            if (bytes.length > 1 && bytes[1] == 0)
                return new StringValue("\u0000");
            return new StringValue("");
        }
        return new StringValue(new String(bytes));
    }

字節流編碼類

org.openfast.template.type.codec.ByteVectorType

注意:字節流類型不使用停止位


    public byte[] encode(ScalarValue value) {
        byte[] bytes = value.getBytes();
        int lengthSize = IntegerCodec.getUnsignedIntegerSize(bytes.length);
        byte[] encoding = new byte[bytes.length + lengthSize];
        byte[] length = TypeCodec.UINT.encode(new IntegerValue(bytes.length));
        //數據流所占長度
        System.arraycopy(length, 0, encoding, 0, lengthSize);
        //數據
        System.arraycopy(bytes, 0, encoding, lengthSize, bytes.length);
        return encoding;
    }


    public ScalarValue decode(InputStream in) {
        //解析字節流的長度
        int length = ((IntegerValue) TypeCodec.UINT.decode(in)).value;
        byte[] encoding = new byte[length];
        //讀取字節流
        for (int i = 0; i < length; i++)
            try {
                int nextByte = in.read();
                if (nextByte < 0) {
                    Global.handleError(FastConstants.END_OF_STREAM, "The end of the input stream has been reached.");
                    return null; // short circuit if global error handler does not throw exception
                }
                encoding[i] = (byte) nextByte;
            } catch (IOException e) {
                Global.handleError(FastConstants.IO_ERROR, "A IO error has been encountered while decoding.", e);
                return null; // short circuit if global error handler does not throw exception
            }
        return new ByteVectorValue(encoding);
    }

Unicode字符串類型編碼類

org.openfast.template.type.codec.UnicodeString

這個類型和上面的字節流編碼類邏輯一樣。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM