JGroups 入門實踐


前言

      JGroups是一個開源的純java編寫的可靠的群組通訊工具。其工作模式基於IP多播,但可以在可靠性和群組成員管理上進行擴展。其結構上設計靈活,提供了一種靈活兼容多種協議的協議棧。

      JGroups 多線程的方式實現了多個協議之間的協同工作,常見工作線程有心跳檢測,診斷等等

      JGroups實現多機器之間的通信一般都會包含維護群組狀態、群組通信協議、群組數據可靠性傳輸這樣的一些主題。

      JGroups群組的各個節點是存在"管理節點"的,至少可以說某個節點提供了在一段時間內維護狀態信息和消息可靠性檢測的功能(一般是最先啟動的節點)

      目前Jboss、Ecache的分布式緩存是基於Groups通信。

      若JGroups通信基於Udp,則可能需要開啟機器上UDP相關的設置,比如Open udp。

      溫馨提示:JGroups各個協議相關的配置文件都可以從JGroups-x.x.x.Final.jar中找到。

JGroups 資料

     http://www.jgroups.org/tutorial/index.html(官網)

     http://sourceforge.net/projects/javagroups/(JGroups工程&討論組(Discussion))

JGroups 入門示例

    1,節點通信(tcp/ip,udp)方式.

    2,通道和消息傳送.

    3,節點狀態同步.

 

tcp/ip與udp協議

    通常我們都知道tcp和udp最大的區別在於可靠性,tcp是基於可靠連接的傳輸,udp則屬非連接,具體可參考百度百科(http://baike.baidu.com/view/1161229.htm?fr=aladdin)。

    JGroups當中,udp是比較推薦的通信方式,其特點是不需要知道另一個節點的ip,通過多播網絡發現就可以“找到”相應的節點,而tcp則需要在配置文件中固定配置。

 

示例代碼(之后的測試基於tcp,因為不同機器的測試由於udp端口的問題未成功)

   tcp配置文件network-tcp.xml

<!--
    TCP based stack, with flow control and message bundling. This is usually used when IP
    multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
    Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g.
    -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
    author: Bela Ban
-->
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.3.xsd">
    <TCP bind_addr="192.168.19.112"
         bind_port="7800"
         loopback="false"
         recv_buf_size="${tcp.recv_buf_size:5M}"
         send_buf_size="${tcp.send_buf_size:640K}"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         use_send_queues="true"
         sock_conn_timeout="300"

         timer_type="new3"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"
         
         thread_pool.enabled="true"
         thread_pool.min_threads="1"
         thread_pool.max_threads="10"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="false"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"/>
                         
    <TCPPING timeout="3000"
             initial_hosts="${jgroups.tcpping.initial_hosts:192.168.19.112[7800],192.168.19.112[7801]}"
             port_range="1"
             num_initial_members="10"/>
    <MERGE2  min_interval="10000"
             max_interval="30000"/>
    <FD_SOCK/>
    <FD timeout="3000" max_tries="3" />
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST3 />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"

                view_bundling="true"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <!--RSVP resend_interval="2000" timeout="10000"/-->
    <pbcast.STATE_TRANSFER/>
</config>

 

   udp配置文件network-udp.xml

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.5.xsd">

    <UDP
         mcast_addr="${jgroups.udp.mcast_addr:235.5.5.5}"
         mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000" num_initial_members="3"/>
    <MERGE2 max_interval="30000" min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK use_mcast_xmit="true"
                   retransmit_timeout="300,600,1200"
                   discard_delivered_msgs="true"/>

    <pbcast.STABLE stability_delay="1000" 
                   desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true"
                print_physical_addrs="true"
                join_timeout="3000"
                view_bundling="true"
                max_join_attempts="3"/>
                
    <UFC max_credits="2M" min_threshold="0.4"/>
    <MFC max_credits="2M" min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />

</config>

 

    數據節點Node.java

package org.wit.ff;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.log4j.Logger;
import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;

/**
 * 
 * <pre>
 * 節點.
 * </pre>
 * 
 * @author F.Fang
 * @version $Id: CacheNode.java, v 0.1 2014年10月17日 上午5:27:11 F.Fang Exp $
 */
public class Node extends ReceiverAdapter {

    private final static Logger LOG = Logger.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名稱.
     */
    private static final String CLUSTER_NAME = "FF";

    /**
     * 節點通道.
     */
    private JChannel channel = null;

    /**
     * 以此作為節點間初始化的同步數據.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node() {
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);
channel.getState(null,50000); }
catch (Exception e) { LOG.error("啟動節點異常!", e); // 最好是自定義RuntimeException! throw new RuntimeException("啟動節點異常!", e); } } /** * * <pre> * 發送消息給目標地址. * </pre> * * @param dest * 為空表示發給所有節點. * @param textMsg * 消息. */ public void sendMsg(Address dest, Object textMsg) { Message msg = new Message(dest, null, textMsg); try { channel.send(msg); } catch (Exception e) { LOG.error("消息發送失敗!", e); // 應自定異常,最好是自定義Exception類型! throw new RuntimeException("消息發送失敗!", e); } } @Override public void getState(OutputStream output) throws Exception { //cacheData過大可能會造成節點的狀態同步時間過長.
     lock.lock();
try {
             Util.objectToStream(state, new DataOutputStream(output));
        }catch(Exception e){
             throw e;
}finally{
             lock.unlock();
        }
    }

    @Override
    public void receive(Message msg) {
        //當前節點不接收自己發送到通道當中的消息.
        if (msg.getSrc().equals(channel.getAddress())) {
            return;
        }
        LOG.info(msg.getObject());
    }

    @Override
    public void setState(InputStream input) throws Exception {
        lock.lock();
        try {
            @SuppressWarnings("unchecked")
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(input));
            this.cacheData.putAll(cacheData);
        } catch (Exception e) {
            LOG.error("從主節點同步狀態到當前節點發生異常!", e);
        } finally {
           lock.unlock();
        }
       
    }

    @Override
    public void viewAccepted(View view) {
        LOG.info("當前成員[" + this.channel.getAddressAsString() + "]");
        LOG.info(view.getCreator());
        LOG.info(view.getMembers());
LOG.info("當前節點數據:" + cacheData);
}
/**
     * 
     * <pre>
     * 提供一個簡單的初始化數據的方法.
     * </pre>
     *
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }
}

 

    實例節點1 Node1.java

package org.wit.ff;

import java.util.Scanner;
import java.util.concurrent.TimeUnit;

import org.wit.ff.Node;

/**
 * 
 * <pre>
 * tcp模式下:
 * 如果是同一台機器測試,請注意在
 * TCPPING 元素下修改 initial_hosts的配置端口:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]}
 * 如果是多台機器測試,請注意在
 * TCPPING 元素下修改 initial_hosts的ip,端口隨意:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]}
 * 
 * udp模式下:
 * 同一台機器的不同端口(端口是動態的)可通信.
 * 不同機器之間的ip多播可能會受到一些因素限制而造成節點之間無法彼此發現.
 * </pre>
 *
 * @author F.Fang
 * @version $Id: Node1.java, v 0.1 2014年10月15日 上午5:31:32 F.Fang Exp $
 */
public class Node1 {

    public static void main(String[] args) {
        Node node = new Node();
        node.addData("hello", "world");
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 使用控制台發送消息給Node2.
        Scanner scanner = new Scanner(System.in);
        while(true){
            String text = scanner.next();
            if("exit".equals(text)){
                break;
            }
            node.sendMsg(null,"hello "+text+",node2!");
        }

    }

}

 

實例節點2 Node2.java

package org.wit.ff;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;

/**
 * 
 * <pre>
 * tcp模式下:
 * 如果是同一台機器測試,請注意在
 * TCPPING 元素下修改 initial_hosts的配置端口:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.100[7801]}
 * 如果是多台機器測試,請注意在
 * TCPPING 元素下修改 initial_hosts的ip,端口隨意:
 * 例如:"${jgroups.tcpping.initial_hosts:192.168.19.100[7800],192.168.19.178[7800]}
 *
 * @author F.Fang
 * @version $Id: Node2.java, v 0.1 2014年10月15日 上午5:31:44 F.Fang Exp $
 */
public class Node2 {

    public static void main(String[] args) {
        Node node = new Node();
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // 使用控制台發送消息給Node1.
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String text = scanner.next();
            if ("exit".equals(text)) {
                break;
            }
            node.sendMsg(null,"hello " + text + ",node1!");
        }
        
    }

}

 

測試Case

     啟動Node1,Node1平穩后啟動Node2。

     Node1運行信息如下:

DEBUG Configurator                   - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75

-------------------------------------------------------------------
GMS: address=DSH07fFang-18185, cluster=FF, physical address=192.168.19.112:7800
-------------------------------------------------------------------
DEBUG NAKACK2                        - 
[DSH07fFang-18185 setDigest()]
existing digest:  []
new digest:       DSH07fFang-18185: [0 (0)]
resulting digest: DSH07fFang-18185: [0 (0)]
DEBUG GMS                            - DSH07fFang-18185: installing view [DSH07fFang-18185|0] (1) [DSH07fFang-18185]
DEBUG STABLE                         - resuming message garbage collection
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185]
INFO  Node                           - 當前成員[DSH07fFang-18185]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185]
INFO  Node                           - 當前節點數據:{}
DEBUG STABLE                         - resuming message garbage collection
DEBUG GMS                            - created cluster (first member). My view is [DSH07fFang-18185|0], impl is org.jgroups.protocols.pbcast.CoordGmsImpl
DEBUG STABLE                         - suspending message garbage collection
DEBUG STABLE                         - DSH07fFang-18185: resume task started, max_suspend_time=33000
DEBUG GMS                            - DSH07fFang-18185: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882]
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 當前成員[DSH07fFang-18185]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 當前節點數據:{hello=world}
DEBUG FD_SOCK                        - ping_dest is DSH07fFang-2882, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882]
DEBUG STABLE                         - resuming message garbage collection
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882
DEBUG FD                             - DSH07fFang-18185: sending are-you-alive msg to DSH07fFang-2882

     主要包括ip通信信息、狀態、心跳等等。

     Node2運行消息如下:

DEBUG Configurator                   - set property TCP.diagnostics_addr to default value /ff0e:0:0:0:0:0:75:75

-------------------------------------------------------------------
GMS: address=DSH07fFang-2882, cluster=FF, physical address=192.168.19.112:7801
-------------------------------------------------------------------
DEBUG GMS                            - DSH07fFang-2882: sending JOIN(DSH07fFang-2882) to DSH07fFang-18185
DEBUG NAKACK2                        - 
[DSH07fFang-2882 setDigest()]
existing digest:  []
new digest:       DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)]
resulting digest: DSH07fFang-18185: [0 (0)], DSH07fFang-2882: [0 (0)]
DEBUG GMS                            - DSH07fFang-2882: installing view [DSH07fFang-18185|1] (2) [DSH07fFang-18185, DSH07fFang-2882]
DEBUG FD_SOCK                        - VIEW_CHANGE received: [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 當前成員[DSH07fFang-2882]
INFO  Node                           - DSH07fFang-18185
INFO  Node                           - [DSH07fFang-18185, DSH07fFang-2882]
INFO  Node                           - 當前節點數據:{hello=world}
DEBUG FD_SOCK - ping_dest is DSH07fFang-18185, pingable_mbrs=[DSH07fFang-18185, DSH07fFang-2882] DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185 DEBUG FD - DSH07fFang-2882: sending are-you-alive msg to DSH07fFang-18185

 節點之間存在通信和狀態同步,可以通過控制台輸入發送消息的命令觀察節點變化。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM