Netty源碼細節IO線程(EventLoop)(轉)


原文:http://budairenqin.iteye.com/blog/2215896

源碼來自Netty5.x版本, 本系列文章不打算從架構的角度去討論netty, 只想從源碼細節展開, 又不想通篇的貼代碼, 如果沒有太大的必要, 我會盡量避免貼代碼或是去掉不影響主流程邏輯的代碼, 盡量多用語言描述. 這個過程中我會把我看到的netty對代碼進行優化的一些細節提出來探討, 大家共同學習, 更希望能拋磚引玉.

java nio api細節這里不會討論, 不過推薦一個非常好入門系列 http://ifeve.com/overview/

先從一個簡單的代碼示例開始

服務端啟動代碼示例

    // Configure the server.
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 p.addLast(new EchoServerHandler());
             }
         });
        // Start the server.
        ChannelFuture f = b.bind(PORT).sync();
        // Wait until the server socket is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down all event loops to terminate all threads.
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

在看這個示例之前, 先拋出netty中幾個重要組件以及他們之間的簡單關系, 方便理解后續的代碼展開.

1.EventLoopGroup
2.EventLoop
3.boss/worker
4.channel
5.event(inbound/outbound)
6.pipeline
7.handler
--------------------------------------------------------------------
1.EventLoopGroup中包含一組EventLoop

2.EventLoop的大致數據結構是
    a.一個任務隊列
    b.一個延遲任務隊列(schedule)
    c.EventLoop綁定了一個Thread, 這直接避免了pipeline中的線程競爭(在這里更正一下4.1.x以及5.x由於引入了FJP[4.1.x現在又去掉了FJP], 線程模型已經有所變化, EventLoop.run()可能被不同的線程執行,但大多數scheduler(包括FJP)在EventLoop這種方式的使用下都能保證在handler中不會"可見性(visibility)"問題, 所以為了理解簡單, 我們仍可以理解為為EventLoop綁定了一個Thread)
    d.每個EventLoop有一個Selector, boss用Selector處理accept, worker用Selector處理read,write等

3.boss可簡單理解為Reactor模式中的mainReactor的角色, worker可簡單理解為subReactor的角色
    a.boss和worker共用EventLoop的代碼邏輯
    b.在不bind多端口的情況下bossEventLoopGroup中只需要包含一個EventLoop
    c.workerEventLoopGroup中一般包含多個EventLoop
    d.netty server啟動后會把一個監聽套接字ServerSocketChannel注冊到bossEventLoop中
    e.通過上一點我們知道bossEventLoop一個主要責任就是負責accept連接(channel)然后dispatch到worker
    f.worker接到boss爺賞的channel后負責處理此chanel后續的read,write等event

4.channel分兩大類ServerChannel和channel, ServerChannel對應着監聽套接字(ServerSocketChannel), channel對應着一個網絡連接

5.有兩大類event:inbound/outbound(上行/下行)

6.event按照一定順序在pipeline里面流轉, 流轉順序參見下圖

7.pipeline里面有多個handler, 每個handler節點過濾在pipeline中流轉的event, 如果判定需要自己處理這個event,則處理(用戶可以在pipeline中添加自己的handler)

IO線程組的創建:NioEventLoopGroup
構造方法:

public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) {
    super(nEventLoops, executor, selectorProvider);
}

nEventLoops:
    Group內EventLoop個數, 每個EventLoop都綁定一個線程, 默認值為cpu cores * 2, 對worker來說, 這是一個經驗值, 當然如果worker完全是在處理cpu密集型任務也可以設置成 cores + 1 或者是根據自己場景測試出來的最優值.
    一般boss group這個參數設置為1就可以了, 除非需要bind多個端口.
    boss和worker的關系可以參考Reactor模式,網上有很多資料.簡單的理解就是:boss負責accept連接然后將連接轉交給worker, worker負責處理read,write等

executor:
    Netty 4.1.x版本以及5.x版本采用Doug Lea在jsr166中的ForkJoinPool作為默認的executor, 每個EventLoop在一次run方法調用的生命周期內都是綁定在fjp中一個Thread身上(EventLoop父類SingleThreadEventExecutor中的thread實例變量)
    目前netty由於線程模型的關系並沒有利用fjp的work−stealing, 關於fjp可參考這個paper http://gee.cs.oswego.edu/dl/papers/fj.pdf

selectorProvider:
    group內每一個EventLoop都要持有一個selector, 就由它提供了

上面反復提到過每個EventLoop都綁定了一個Thread(可以這么理解,但5.x中實際不是這樣子), 這是netty4.x以及5.x版本相對於3.x版本最大變化之一, 這個改變從根本上避免了outBound/downStream事件在pipeline中的線程競爭

父類構造方法:

private MultithreadEventExecutorGroup(int nEventExecutors,
                                      Executor executor,
                                      boolean shutdownExecutor,
                                      Object... args) {
    // ......

    if (executor == null) {
        executor = newDefaultExecutorService(nEventExecutors); // 默認fjp
        shutdownExecutor = true;
    }

    children = new EventExecutor[nEventExecutors];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nEventExecutors; i++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args); // child即EventLoop
            success = true;
        } catch (Exception e) {
            // ......
        } finally {
            if (!success) {
                // 失敗處理......
            }
        }
    }
    // ......
}

同時參考《http://www.cnblogs.com/guazi/p/6612375.html》中關於這一塊的內容。

1.如果之前沒有指定executor默認為fjp, fjp的parallelism值即為nEventExecutors
    executor(scheduler)可以由用戶指定, 這給了第三方很大的自由度, 總會有高級用戶想完全的控制scheduler, 比如Twitter的Finagle. https://github.com/netty/netty/issues/2250

2.接下來創建children數組, 即EventLoop[],現在可以知道 EventLoop與EventLoopGroup的關系了.

3.后面會講到boss把一個就緒的連接轉交給worker時會從children中取模拿出一個EventLoop然后將連接交給它.
    值得注意的是由於這段代碼是熱點代碼, 作為"優化狂魔"netty團隊豈會放過這種優化細節? 如果children個數為2的n次方, 會采用和HashMap同樣的優化方式[位操作]來代替取模操作:
    children[childIndex.getAndIncrement() & children.length - 1]

4.接下來的newChild()是構造EventLoop, 下面會詳細展開
接下來我們分析NioEventLoop 

 PS:Netty 4.0.16版本開始由Norman Maurer提供了EpollEventLoop, 基於Linux Epoll ET實現的JNI(java nio基於Epoll LT)Edge Triggered(ET) VS Level Triggered(LT)http://linux.die.net/man/7/epoll.這在一定程度上提供了更高效的傳輸層, 同時也減少了java層的gc, 這里不詳細展開了, 感興趣的可看這里 Native transport for Linux wikihttp://netty.io/wiki/native-transports.html

NioEventLoop

 

接上面的newchild()
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
}

構造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
    super(parent, executor, false);
    // ......
    provider = selectorProvider;
    selector = openSelector();
}

父類構造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
    super(parent);
    // ......
    this.addTaskWakesUp = addTaskWakesUp;
    this.executor = executor;
    taskQueue = newTaskQueue();
}

1.我們看到首先是打開一個selector, selector的優化細節我們下面會講到

2.接着在父類中會構造一個task queue, 這是一個lock-free的MPSC隊列, netty的線程(比如worker)一直在一個死循環狀態中(引入fjp后是不斷自己調度自己)去執行IO事件和非IO事件.
除了IO事件, 非IO事件都是先丟到這個MPSC隊列再由worker線程去異步執行.
    MPSC即multi-producer single-consumer(多生產者, 單消費者) 完美貼合netty的IO線程模型(消費者就是EventLoop自己咯), 情不自禁再給"優化狂魔"點32個贊.

    跑題一下:
        對lock-free隊列感興趣可以仔細看看MpscLinkedQueue的代碼, 其中一些比如為了避免偽共享的long padding優化也是比較有意思的.
        如果還對類似並發隊列感興趣的話請轉戰這里 https://github.com/JCTools/JCTools
        另外報個八卦料曾經也有人提出在這里引入disruptor后來不了了之, 相信用disruptor也會很有趣 https://github.com/netty/netty/issues/447

接下來展開openSelector()詳細分析

private Selector openSelector() {
    final Selector selector;
    try {
        selector = provider.openSelector();
    } catch (IOException ignored) {}

    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }

    try {
        SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Class<?> selectorImplClass =
                Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());

        // Ensure the current selector implementation is what we can instrument.
        if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
            return selector;
        }

        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

        selectedKeysField.setAccessible(true);
        publicSelectedKeysField.setAccessible(true);

        selectedKeysField.set(selector, selectedKeySet);
        publicSelectedKeysField.set(selector, selectedKeySet);

        selectedKeys = selectedKeySet;
        logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
    } catch (Throwable t) {
        selectedKeys = null;
        logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
    }

    return selector;
}

1.首先openSelector, 這是jdk的api就不詳細展開了

2.接着DISABLE_KEYSET_OPTIMIZATION是判斷是否需要對sun.nio.ch.SelectorImpl中的selectedKeys進行優化, 不做配置的話默認需要優化.

3.哪些優化呢?原來SelectorImpl中的selectedKeys和publicSelectedKeys是個HashSet, 新的數據結構是雙數組A和B, 初始大小1024, 避免了HashSet的頻繁自動擴容,
processSelectedKeys時先使用數組A,再一次processSelectedKeys時調用flip的切換到數組B, 如此反復
另外我大膽胡說一下我個人對這個優化的理解, 如果對於這個優化只是看到避免了HashSet的自動擴容, 我還是認為這有點小看了"優化狂魔"們, 我們知道HashSet用拉鏈法解決哈希沖突, 也就是說它的數據結構是數組+鏈表,
而我們又知道, 對於selectedKeys, 最重要的操作是遍歷全部元素, 但是數組+鏈表的數據結構對於cpu的 cache line 來說肯定是不夠友好的.如果是直接遍歷數組的話, cpu會把數組中相鄰的元素一次加載到同一個cache line里面(一個cache line的大小一般是64個字節), 所以遍歷數組無疑效率更高.
有另一隊優化狂魔是上面論調的支持者及推廣者 disruptor https://github.com/LMAX-Exchange/disruptor

 

EventLoop構造方法的部分到此介紹完了, 接下來看看EventLoop怎么啟動的, 啟動后都做什么

 

EventLoop的父類SingleThreadEventExecutor中有一個startExecution()方法, 它最終會調用如下代碼:

private final Runnable asRunnable = new Runnable() {
    @Override
    public void run() {
        updateThread(Thread.currentThread());

        if (firstRun) {
            firstRun = false;
            updateLastExecutionTime();
        }

        try {
            SingleThreadEventExecutor.this.run();
        } catch (Throwable t) {
            cleanupAndTerminate(false);
        }
    }
};

這個Runnable不詳細解釋了, 它用來實現IO線程在fjp中死循環的自己調度自己, 只需要看 SingleThreadEventExecutor.this.run() 便知道, 接下來要轉戰EventLoop.run()方法了

protected void run() {
    boolean oldWakenUp = wakenUp.getAndSet(false);
    try {
        if (hasTasks()) {
            selectNow();
        } else {
            select(oldWakenUp);
            if (wakenUp.get()) {
                selector.wakeup();
            }
        }

        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        if (ioRatio == 100) {
            processSelectedKeys();
            runAllTasks();
        } else {
            final long ioStartTime = System.nanoTime();
            processSelectedKeys();
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }

        if (isShuttingDown()) {
            closeAll();
            if (confirmShutdown()) {
                cleanupAndTerminate(true);
                return;
            }
        }
    } catch (Throwable t) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {}
    }
    scheduleExecution();
}

為了避免代碼占用篇幅過大, 我去掉了注釋部分
首先強調一下EventLoop執行的任務分為兩大類:IO任務和非IO任務.
1)IO任務比如: OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE
2)非IO任務比如: bind、channelActive等

接下來看這個run方法的大致流程:
1.先調用hasTask()判斷是否有非IO任務, 如果有的話, 選擇調用非阻塞的selectNow()讓select立即返回, 否則以阻塞的方式調用select. 后續再分析select方法, 目前先把run的流程梳理完.

2.兩類任務執行的時間比例由ioRatio來控制, 你可以通過它來限制非IO任務的執行時間, 默認值是50, 表示允許非IO任務獲得和IO任務相同的執行時間, 這個值根據自己的具體場景來設置.

3.接着調用processSelectedKeys()處理IO事件, 后邊會再詳細分析.

4.執行完IO任務后就輪到非IO任務了runAllTasks().

5.最后scheduleExecution()是自己調度自己進入下一個輪回, 如此反復, 生命不息調度不止, 除非被shutDown了, isShuttingDown()方法就是去檢查state是否被標記為ST_SHUTTING_DOWN.

接下來分析阻塞select方法都做了什么, selectNow就略過吧

 private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            currentTimeNanos = time;
        }
        // ...
    } catch (CancelledKeyException ignored) {}
}

1.首先執行delayNanos(currentTimeNanos), 這個方法是做什么的呢?
    1)要了解delayNanos我們需要知道每個EventLoop都有一個延遲執行任務的隊列(在父類SingleThreadEventExecutor中), 是的現在我們知道EventLoop有2個隊列了.
    2)delayNanos就是去這個延遲隊列里面瞄一眼是否有非IO任務未執行, 如果沒有則返回1秒鍾
    3)如果很不幸延遲隊列里面有任務, delayNanos的計算結果就等於這個task的deadlineNanos到來之前的這段時間, 也即是說select在這個task按預約到期執行的時候就返回了, 不會耽誤這個task.
    4)如果最終計算出來的可以無憂無慮select的時間(selectDeadLineNanos - currentTimeNanos)小於500000L納秒, 就認為這點時間是干不出啥大事業的, 還是selectNow一下直接返回吧, 以免耽誤了延遲隊列里預約好的task.
    5)如果大於500000L納秒, 表示很樂觀, 就以1000000L納秒為時間片, 放肆的去執行阻塞的select了, 阻塞時間就是timeoutMillis(n * 1000000L納秒時間片).

2.阻塞的select返回后,如果遇到以下幾種情況則立即返回
    a)如果select到了就緒連接(selectedKeys > 0)
    b)被用戶waken up了
    c)任務隊列(上面介紹的那個MPSC)來了一個任務
    d)延遲隊列里面有個預約任務到期需要執行了

3.如果上面情況都不滿足, 代表select返回0了, 並且還有時間繼續愉快的玩耍

4.這其中有一個統計select次數的計數器selectCnt, select過多並且都返回0, 默認512就代表過多了, 這表示需要調用rebuildSelector()重建selector了, 為啥呢, 因為nio有個臭名昭著的epoll cpu 100%的bug, 為了規避這個bug, 無奈重建吧. 參考下面鏈接
        http://bugs.java.com/view_bug.do?bug_id=6403933
        https://github.com/netty/netty/issues/327   

5.rebuildSelector的實際工作就是:
    重新打開一個selector, 將原來的那個selector中已注冊的所有channel重新注冊到新的selector中, 並將老的selectionKey全部cancel掉, 最后將的selector關閉

6.重建selector后, 不死心的再selectNow一下
select過后, 有了一些就緒的讀啊寫啊等事件, 就需要processSelectedKeys()登場處理了, 我只分析一下優化了selectedKeys的處理方法processSelectedKeysOptimized(selectedKeys.flip())

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        selectedKeys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            for (;;) {
                if (selectedKeys[i] == null) {
                    break;
                }
                selectedKeys[i] = null;
                i++;
            }

            selectAgain();
            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }
    }
}

1.第一眼就看到這里要遍歷SelectionKey[]了, 上面提到HashSet-->array的優化就是為了這一步.

2.每次拿到一個之后SelectionKey立即釋放array對這個key的強引用
    selectedKeys[i] = null;
    這么做是為了幫助GC, 這個key處理完了就應該被GC回收了, 如果array對這個key繼續維持強引用, 在循環處理后續其他key的時候可能要消耗很長時間, 對GC, 還是能幫則幫吧, Doug lea在設計jsr166也就是jdk中juc包下面的代碼也有用到過類似小優化.

3.憑啥k.attachment()就是AbstractNioChannel呢?后續分析到register會看到如下一行代碼:
    selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
    其中this就是channel咯, 具體情況后續章節再詳細說

4.接下來拿到channel調用processSelectedKey(), 下面再詳細分析

5.有的時候需要select again, 比如被cancel的時候needsToSelectAgain被標記為true

6.接下來那個for循環中的處理同樣是 help gc

7. selectAgain()調用的是非阻塞的selectNow(), 然后重置index為-1重新開始新的循環

再看processSelectedKey方法:

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

1.終於見到熟悉的NIO處理代碼了, 首先netty中每個channel都有一個unsafe,
    1)作為NioSocketChannel它對應的unsafe是NioByteUnsafe
    2)作為NioServerSocketChannel它對應的unsafe是NioMessageUnsafe
    以上兩個的區別后續章節再詳細解釋, 先簡要說明下1)跟worker的channel相關, 2)跟boss的serverChannel相關

2.接下來就是根據readyOps來dispatch了, 后續都由unsafe來處理, unsafe留着以后章節分析

執行完IO任務以后, 輪到非IO任務了

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    this.lastExecutionTime = lastExecutionTime;
    return true;
}

1. 先是fetchFromScheduledTaskQueue, 將延遲任務隊列中已到期的task拿到非IO任務的隊列中,此隊列即為上文中提到的MPSC隊列.

2. task即是從MPSC queue中彈出的任務

3. 又是計算一個deadline

4. 注意到 0x3F 了吧?轉換成10進制就是64-1, 就是每執行64個任務就檢查下時間, 如果到了deadline, 就退出, 沒辦法, IO任務是親生的, 非IO任務是后媽生的, 資源肯定要先緊IO任務用.
我們使用netty時也要注意, 不要產生大量耗時的非IO任務, 以免影響了IO任務.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM