Redis學習記錄(六)redis客戶端jedis多線程操作時報異常


  jedis不是線程安全的:

public class RedisLockTest {

    private Integer inventory = 1000;
    private int num  = 1000;
    private int corePoolsize = 100;
    private int maximumPoolSize = 1000;
    private long keepAliveTime = 10000;
    private LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque();

   /**
     * jedis--redis客戶端,在多線程操作時會報異常
     * @throws InterruptedException
     */
    @Test
    public void redismethodError() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolsize,maximumPoolSize,keepAliveTime,
                java.util.concurrent.TimeUnit.SECONDS,linkedBlockingDeque);
        num =20;
        CountDownLatch countDownLatch = new CountDownLatch(num);
        Jedis jedis = new Jedis("localhost",6379);
        jedis.auth("root123456");
        for (int i = 0; i < num; i++) {
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try{
                        jedis.set("aaa","value");
                    }catch (Exception e){
                        System.out.println(e);
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await();
        threadPoolExecutor.shutdown();
    }
}

  該程序可能會報異常:

    1、redis.clients.jedis.exceptions.JedisDataException: ERR Protocol error: expected '$', got ' '

    2、redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Connection reset

    3、redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketException: Connection reset by peer: socket write error

    4、 redis.clients.jedis.exceptions.JedisDataException: RR Protocol error: invalid bulk length

  打印出錯誤棧:

Exception in thread "pool-1-thread-7" Exception in thread "pool-1-thread-10" redis.clients.jedis.exceptions.JedisDataException: RR Protocol error: invalid bulk length
	at redis.clients.jedis.Protocol.processError(Protocol.java:127)
	at redis.clients.jedis.Protocol.process(Protocol.java:161)
	at redis.clients.jedis.Protocol.read(Protocol.java:215)
	at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340)
	at redis.clients.jedis.Connection.getStatusCodeReply(Connection.java:239)
	at redis.clients.jedis.Jedis.set(Jedis.java:121)
	at com.yhq.redis.RedisLockTest$2.run(RedisLockTest.java:76)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

  這種報錯是因為客戶端向redis發送的命令,redis發現接收的命令不滿足RESP協議(Redis服務器與客戶端通過RESP(REdis Serialization Protocol)協議通信)返回以字節 "-"開頭的字節流,客戶端jedis接收到后,拋出異常。

  在 RESP 中, 一些數據的類型通過它的第一個字節進行判斷:

    單行回復:回復的第一個字節是 "+"

    錯誤信息:回復的第一個字節是 "-"

    整形數字:回復的第一個字節是 ":"

    多行字符串:回復的第一個字節是 "$"

    數組:回復的第一個字節是 "*" 

public final class Protocol {
  public static final byte DOLLAR_BYTE = '$';
  public static final byte ASTERISK_BYTE = '*';
  public static final byte PLUS_BYTE = '+';
  public static final byte MINUS_BYTE = '-';
  public static final byte COLON_BYTE = ':';
}

  分析命令不滿足RESP協議的原因:

  jedis.set("aaa","value");    ---->   client.set(key, value);    ----->

public class Connection implements Closeable {

  private RedisOutputStream outputStream;
  private RedisInputStream inputStream;

  protected Connection sendCommand(final Command cmd, final byte[]... args) {
    try {
      //獲取連接
      connect();
      //發送命令(向outputStream寫入拼接的命令,但是沒有執行輸出流刷新,redis還接收不到命令)
      Protocol.sendCommand(outputStream, cmd, args);
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
           //......................
      }
      broken = true;
      throw ex;
    }
 }    

  ------>

public final class Protocol {
  private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();
      // 如果一個線程剛執行到這里,另一個線程執行到 outputStream.flush();操作,那么就報該異常
      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM