cassandra高級操作之JMX操作


前言

  路漫漫其修遠兮,吾將上下而求索!

  github:https://github.com/youzhibing

  碼雲(gitee):https://gitee.com/youzhibing

需求場景

  項目中有這么個需求:統計集群中各個節點的數據量存儲大小,不是記錄數。

  一開始有點無頭緒,后面查看cassandra官方文檔看到Monitoring章節,里面說到:Cassandra中的指標使用Dropwizard Metrics庫進行管理。 這些指標可以通過JMX查詢,也可以使用多個內置和第三方報告插件推送到外部監控系統(Jconsole)。那么數據量存儲大小是不是也是cassandra的某項指標了? 帶着疑問,我通過Jconsole看到了cassandra的一些指標(先啟動cassandra, 運行  -> Jconsole),如下圖

  數據量存儲大小就在叫org.apache.cassandra.db的MBean中,具體會在之后介紹。

JMX定義

  引用JMX超詳細解讀中一段話:

JMX(Java Management Extensions)是一個為應用程序植入管理功能的框架。JMX是一套標准的代理和服務,實際上,用戶可以在任何Java應用程序中使用這些代理和服務實現管理。這是官方文檔上的定義,我看過很多次也無法很好的理解。
我個人的理解是JMX讓程序有被管理的功能,例如你開發一個WEB網站,它是在24小時不間斷運行,那么你肯定會對網站進行監控,如每天的UV、PV是多少;又或者在業務高峰的期間,你想對接口進行限流,就必須去修改接口並發的配置值。   應用場景:中間件軟件WebLogic的管理頁面就是基於JMX開發的,而JBoss則整個系統都基於JMX構架,另外包括cassandra中各項指標的管理。   對於一些參數的修改,網上有一段描述還是比較形象的:   1、程序初哥一般是寫死在程序中,到要改變的時候就去修改代碼,然后重新編譯發布。   2、程序熟手則配置在文件中(JAVA一般都是properties文件),到要改變的時候只要修改配置文件,但還是必須重啟系統,以便讀取配置文件里最新的值。   3、程序好手則會寫一段代碼,把配置值緩存起來,系統在獲取的時候,先看看配置文件有沒有改動,如有改動則重新從配置里讀取,否則從緩存里讀取。   4、程序高手則懂得物為我所用,用JMX把需要配置的屬性集中在一個類中,然后寫一個MBean,再進行相關配置。另外JMX還提供了一個工具頁,以方便我們對參數值進行修改。

  給我的感覺,jmx server進行監聽,jmx client進行請求的發送,以此達到通信的目的;cassandra的jmx server已經被cassandra實現,我們只需要實現jmx client,就能從cassandra進程中拿到我們需要的指標數據。

JMX Server

  MBean接口定義

    接口的命名規范為以具體的實現類為前綴(這個規范很重要),動態代理的過程中需要用到這點。

public interface HelloMBean
{
    String getName();
    void setName(String name);
    
    void print();
}

  MBean接口實現

    實現上面的接口:

public class Hello implements HelloMBean
{
    
    private String name;
    
    @Override
    public String getName()
    {
        return this.name;
    }
    
    @Override
    public void setName(String name)
    {
        this.name = name;
    }
    
    @Override
    public void print()
    {
        System.out.println("hello, print");
    }
    
}

  jmx server實現

    定義一個jmx server,並啟動它

public class HelloService
{
    private static final int RMI_PORT = 8099;  
    private static final String JMX_SERVER_NAME = "TestJMXServer";
    private static final String USER_NAME = "hello";
    private static final String PASS_WORD = "world";
    
    public static void main(String[] args) throws Exception
    {
        HelloService service = new HelloService(); 
        service.startJmxServer();
    }
    
    private void startJmxServer() throws Exception
    {
        //MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName);  
        MBeanServer mbs = this.getMBeanServer(); 
      
        // 在本地主機上創建並輸出一個注冊實例,來接收特定端口的請求
        LocateRegistry.createRegistry(RMI_PORT, null, RMISocketFactory.getDefaultSocketFactory()); 
        
        JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + RMI_PORT + "/" + JMX_SERVER_NAME);  
        System.out.println("JMXServiceURL: " + url.toString());  
        
        Map<String, Object> env = this.putAuthenticator();
        
        //JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);   // 不加認證 
        JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs);      // 加認證 
        jmxConnServer.start(); 
    }
    
    private MBeanServer getMBeanServer() throws Exception
    {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        ObjectName objName = new ObjectName(JMX_SERVER_NAME + ":name=" + "hello");  
        mbs.registerMBean(new Hello(), objName);
        return mbs;
    }
    
    private Map<String, Object> putAuthenticator()
    {
        Map<String,Object> env = new HashMap<String,Object>();
        JMXAuthenticator auth = createJMXAuthenticator();
        env.put(JMXConnectorServer.AUTHENTICATOR, auth);

        env.put("com.sun.jndi.rmi.factory.socket", RMISocketFactory.getDefaultSocketFactory());
        return env;
    }
    
    private JMXAuthenticator createJMXAuthenticator() 
    {
        return new JMXAuthenticator() 
        {
            public Subject authenticate(Object credentials) 
            {
                String[] sCredentials = (String[]) credentials;
                if (null == sCredentials || sCredentials.length != 2)
                {
                    throw new SecurityException("Authentication failed!");
                }
                String userName = sCredentials[0];
                String password = sCredentials[1];
                if (USER_NAME.equals(userName) && PASS_WORD.equals(password)) 
                {
                    Set<JMXPrincipal> principals = new HashSet<JMXPrincipal>();
                    principals.add(new JMXPrincipal(userName));
                    return new Subject(true, principals, Collections.EMPTY_SET,
                            Collections.EMPTY_SET);
                }
                throw new SecurityException("Authentication failed!");
            }
        };
    }
}
View Code

  

    點下print按鈕,你會發現控制台會打印:hello, print

  cassandra的jmx server已經自己實現了,我們不需要實現它,我們需要實現的是調用它。

JMX client

  這個是我們需要關注和實現的

  client端接口定義

    接口中定義的方法是我們需要調用的,方法名必須與server端暴露的方法一樣,通過server端動態生成client端的實例,實例中的方法只包括client端接口中定義的方法(若server端暴露的是屬性,那么直接在屬性前加get,后面cassandra部分會講到)

public interface HelloClientMBean
{
    void print();          // 方法定義與server端暴露的方法一致
}

  連接jmx server

public class HelloClient implements AutoCloseable
{
    private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/TestJMXServer";
    private static final String ssObjName = "TestJMXServer:name=hello";
    private static final int defaultPort = 1099;                       // cassandra默認端口是7199
    final String host;
    final int port;
    private String username;
    private String password;
    
    private JMXConnector jmxc;
    private MBeanServerConnection mbeanServerConn;
    private HelloMBean hmProxy;
    
    /**
     * Creates a connection using the specified JMX host, port, username, and password.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host, int port, String username, String password) throws IOException
    {
        assert username != null && !username.isEmpty() && password != null && !password.isEmpty()
               : "neither username nor password can be blank";

        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        connect();
    }
    
    /**
     * Creates a connection using the specified JMX host and port.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host, int port) throws IOException
    {
        this.host = host;
        this.port = port;
        connect();
    }
    
    /**
     * Creates a connection using the specified JMX host and default port.
     *
     * @param host hostname or IP address of the JMX agent
     * @throws IOException on connection failures
     */
    public HelloClient(String host) throws IOException
    {
        this.host = host;
        this.port = defaultPort;
        connect();
    }
    
    /**
     * Create a connection to the JMX agent and setup the M[X]Bean proxies.
     *
     * @throws IOException on connection failures
     */
    private void connect() throws IOException
    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        if (username != null)
        {
            String[] creds = { username, password };
            env.put(JMXConnector.CREDENTIALS, creds);
        }

        env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());

        jmxc = JMXConnectorFactory.connect(jmxUrl, env);
        mbeanServerConn = jmxc.getMBeanServerConnection();

        try
        {
            ObjectName name = new ObjectName(ssObjName);
            hmProxy = JMX.newMBeanProxy(mbeanServerConn, name, HelloMBean.class);
        }
        catch (MalformedObjectNameException e)
        {
            throw new RuntimeException(
                    "Invalid ObjectName? Please report this as a bug.", e);
        }
    }
    
    private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException
    {
        if (Boolean.parseBoolean(System.getProperty("ssl.enable")))
            return new SslRMIClientSocketFactory();
        else
            return RMISocketFactory.getDefaultSocketFactory();
    }
    
    public void print()
    {
        hmProxy.print();
    }

    @Override
    public void close() throws Exception 
    {
        jmxc.close();
    }
}
View Code

  接口調用  

public class JmxClient
{
    public static void main(String[] args) throws Exception
    {
        HelloClient client = new HelloClient("localhost", 8099, "hello", "world");
        client.print();
        client.close();
    }
    
}

    會在控制台打印:hello, print。

統計cassandra集群中各個節點的數據量存儲大小

  回到我們的項目需求,如何實現呢? 也分3步

  client端接口定義

    因為我們只關心數據量存儲大小,所以我們只需要在接口定義一個方法

 

public interface StorageServiceMBean
{
    /** Human-readable load value.  Keys are IP addresses. */
    public Map<String, String> getLoadMap();            // cassandra端暴露的是屬性LoadMap,那么此方法名由get加LoadMap組成, 那么getLoad方法就可以獲取LoadMap的值
} 

  連接jmx server

    cassandra-env.sh配置文件中有cassandra的JMX默認端口:JMX_PORT="7199"

public class CassNodeProbe implements AutoCloseable
{
    private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi";
    private static final String ssObjName = "org.apache.cassandra.db:type=StorageService";
    private static final int defaultPort = 7199;
    final String host;
    final int port;
    private String username;
    private String password;
    
    private JMXConnector jmxc;
    private MBeanServerConnection mbeanServerConn;
    private StorageServiceMBean ssProxy;
    
    /**
     * Creates a NodeProbe using the specified JMX host, port, username, and password.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host, int port, String username, String password) throws IOException
    {
        assert username != null && !username.isEmpty() && password != null && !password.isEmpty()
               : "neither username nor password can be blank";

        this.host = host;
        this.port = port;
        this.username = username;
        this.password = password;
        connect();
    }
    
    /**
     * Creates a NodeProbe using the specified JMX host and port.
     *
     * @param host hostname or IP address of the JMX agent
     * @param port TCP port of the remote JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host, int port) throws IOException
    {
        this.host = host;
        this.port = port;
        connect();
    }
    
    /**
     * Creates a NodeProbe using the specified JMX host and default port.
     *
     * @param host hostname or IP address of the JMX agent
     * @throws IOException on connection failures
     */
    public CassNodeProbe(String host) throws IOException
    {
        this.host = host;
        this.port = defaultPort;
        connect();
    }
    
    /**
     * Create a connection to the JMX agent and setup the M[X]Bean proxies.
     *
     * @throws IOException on connection failures
     */
    private void connect() throws IOException
    {
        JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port));
        Map<String,Object> env = new HashMap<String,Object>();
        if (username != null)
        {
            String[] creds = { username, password };
            env.put(JMXConnector.CREDENTIALS, creds);
        }

        env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory());

        jmxc = JMXConnectorFactory.connect(jmxUrl, env);
        mbeanServerConn = jmxc.getMBeanServerConnection();

        try
        {
            ObjectName name = new ObjectName(ssObjName);
            ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
        }
        catch (MalformedObjectNameException e)
        {
            throw new RuntimeException(
                    "Invalid ObjectName? Please report this as a bug.", e);
        }
    }
    
    private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException
    {
        if (Boolean.parseBoolean(System.getProperty("ssl.enable")))
            return new SslRMIClientSocketFactory();
        else
            return RMISocketFactory.getDefaultSocketFactory();
    }
    
    public Map<String, String> getCassClusterStorage()
    {
        return ssProxy.getLoadMap();
    }

    @Override
    public void close() throws Exception 
    {
        jmxc.close();
    }
}
View Code

  接口調用

public class JMXTest
{
    
    public static void main(String[] args) throws Exception
    {
        CassNodeProbe prode = new CassNodeProbe("127.0.0.1");
        Map<String, String> nodeStorages = prode.getCassClusterStorage();
        System.out.println(nodeStorages);
        prode.close();
    }
    
}

  最后得到結果:{127.0.0.1=266.36 KB}

  cassandra的jmx 認證訪問我就不做演示了,大家自己去實現。

cassandra-all

  cassandra給我們提供了工具jar,也就是cassandra jmx client實現,jmx server暴露的在這個工具jar中都有對應的請求方式;

  如若大家用到的很少則可以自己實現,而不需要用cassandra-all,當然我們可以拷貝cassandra-all中我們需要的代碼到我們的工程中,那么我們就可以不用引用此jar,但是又滿足了我們的需求

<dependency>
    <groupId>org.apache.cassandra</groupId>
    <artifactId>cassandra-all</artifactId>
    <version>2.1.14</version>
</dependency>

  工程附件

參考

  http://www.cnblogs.com/FlyAway2013/p/jmx.html

  http://www.cnblogs.com/dongguacai/p/5900507.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM