storm_jdbc 最完整的版本


開頭:我這里是根據bolt與trident進行分類的,寫入和讀取的方法可能會在同一個類中,最后會展示一個測試的類來說明怎么用。

 

JdbcSpout:這個類是我寫入數據和讀取數據的公用spout,細節注釋里說的比較詳細。

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.Map;
import java.util.Random;
/**
 * @author cwc
 * @date 2018年5月31日  
 * @description:存儲數據的spout,我的讀與寫共用的這一個spout,用於測試
 * @version 1.0.0 
 */
public class JdbcSpout extends BaseRichSpout {
	public static Random random =new Random();
	private static final long serialVersionUID = 1L;
	private SpoutOutputCollector collector;
	//模擬數據
	public static final List<Values> rows = Lists.newArrayList(
	            new Values("peter",random.nextInt(80),1),
	            new Values("bob",random.nextInt(60),2),
	            new Values("alice",random.nextInt(100),2));

	@Override
	public void nextTuple() {
		  Random rand = new Random();
	      Values row = rows.get(rand.nextInt(rows.size() - 1));
	      
//	      this.collector.emit(new Values("bob"));//用於占位符查詢的字段
	      this.collector.emit(row);//用於存儲寫入
	      System.out.println(row);
	      Thread.yield();
	}

	
	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		this.collector =collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		 declarer.declare(new Fields("name","age","sex"));//用於存儲寫入
//		 declarer.declare(new Fields("name"));//用於占位符查詢的字段
	}

}

  

Jdbc_bolt類:注意看注釋

import java.util.List;
import java.util.Objects;

import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.tuple.Fields;

import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils;

import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;

/**
 * @author cwc
 * @date 2018年9月28日  
 * @version 1.0.0 
 * @description:jdbc對數據庫的操作 
 *                 向jdbc中寫入數據,分別由sql寫入和全表寫入兩種bolt方式
 *                 jdbc通過字段與sql語句占位符的方式查詢數據
 */
public class JdbcOperationBolt {
    private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider();
    
    private static Logger logger = LoggerFactory.getLogger(JdbcOperationBolt.class);
    
    /**
     * jdbc 根據字段向數據庫寫入數據
     * 傳入兩個參數,根據占位符sql插入數據 
     * @param columnSchema 列名
     * @param sqlString sql
     * @return
     */
    public static JdbcInsertBolt getInsertBolt(List<Column> columnSchema,String sqlString){
        if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
            JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper)
                    .withInsertQuery(sqlString)
                    .withQueryTimeoutSecs(30);  
            return PersistanceBolt;
        }
        logger.error("列名或sql語句不能為空!");
        return null;
    }
    
    
    /**
     * jdbc 根據表名向數據庫寫入數據
     * 傳一個表名參數,進入全表寫入
     * 注意,storm中傳入的
     * @param tableName 表名
     * @return
     */
    public static JdbcInsertBolt getInsertBolt(String tableName){
        if(tableName!=null||!Objects.equals(tableName, "")){
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider());
            JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper)
                    .withTableName(tableName)
                    .withQueryTimeoutSecs(30);  
            return PersistanceBolt;
        }
        logger.error("表名不能為空!");
        return null;
    }
    
    /**
     * jdbc 讀取數據
     * 根據sql與列名讀取數據庫數據
     * @param outputFields 聲明要輸出的字段
     * @param queryParamColumns 傳入占位符的字段
     * @param sqlString 查詢sql
     * @return
     */
    public static JdbcLookupBolt getJdbcLookupBolt(Fields outputFields,List<Column> queryParamColumns,String sqlString){
        if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){
            SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
            JdbcLookupBolt JdbcLookupBolt = new JdbcLookupBolt(cp, sqlString, lookupMapper)
                    .withQueryTimeoutSecs(30);
            return JdbcLookupBolt;
        }
        logger.error("輸出字段,輸入字段集合,sql查詢語句都不能為空!");
        return null;
    }
}

 

我將上面獲取數據庫連接的代碼單獨貼出來,因為封裝的比較深。

/**
     * 獲取Jdbc需要得ConnectionProvider相關配置
     * @return
     */
    public static ConnectionProvider getConnectionProvider(){
        Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{
            put("dataSourceClassName", JdbcClassName);
            put("dataSource.url", JdbcdbUrl);
            put("dataSource.user", JdbcUserName);
            put("dataSource.password", JdbcPassWord);}};
            ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
        return connectionProvider;
    }

 

Jdbc_trident 類

import java.util.List;
import java.util.Objects;

import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.tuple.Fields;

import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils;
import parquet.org.slf4j.Logger;
import parquet.org.slf4j.LoggerFactory;

/**
 * @author cwc
 * @date 2018年9月28日  
 * @version 1.0.0 
 * @description:jdbc Trident 類
 */
public class JdbcTridentStates {

    private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider();
    
    private static Logger logger = LoggerFactory.getLogger(JdbcTridentStates.class);
    
    /**
     * jdbc Trident 根據字段向數據庫寫入數據
     * 傳入兩個參數,根據占位符sql插入數據 
     * @param columnSchema 列名
     * @param sqlString sql
     * @return
     */
    public static JdbcStateFactory getJdbcStateFactory (List<Column> columnSchema,String sqlString){
        if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
            JdbcState.Options options = new JdbcState.Options()
                    .withConnectionProvider(cp)
                    .withMapper(simpleJdbcMapper)
                    .withInsertQuery(sqlString)
                    .withQueryTimeoutSecs(200);
            JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options);
            return jdbcStateFactory;
        }
        logger.error("列名或sql為空!");
        return null;
    }
    
    /**
     * jdbc Trident 根據表名向數據庫寫入數據
     * 傳一個表名參數,進入全表寫入
     * 注意,storm中傳入的
     * @param tableName 表名
     * @return
     */
    public static JdbcStateFactory getJdbcStateFactory(String tableName){
        if(tableName!=null||!Objects.equals(tableName, "")){
            JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider());
            JdbcState.Options options = new JdbcState.Options()
                    .withConnectionProvider(cp)
                    .withMapper(simpleJdbcMapper)
                    .withTableName(tableName)
                    .withQueryTimeoutSecs(200);
            JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options);
            return jdbcStateFactory;
        }
        logger.error("表名為空!");
        return null;
    }
    
    /**
     * jdbc Trident 讀取數據
     * @param outputFields 輸出列表
     * @param queryParamColumns    占位符字段
     * @param sqlString 查詢語句
     * @return
     */
    public static JdbcStateFactory getJdbcSelectState(Fields outputFields,List<Column> queryParamColumns,String sqlString){
        if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){
            SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
            JdbcState.Options options = new JdbcState.Options()
                    .withConnectionProvider(cp)
                    .withJdbcLookupMapper(lookupMapper)
                    .withSelectQuery(sqlString)
                    .withQueryTimeoutSecs(30);
            JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
            return jdbcStateFactory;
        }
        logger.error("輸出字段,輸入字段集合,sql查詢語句都不能為空!");
        return null;
    }
    
}

 

測試類:

import java.util.List;
import java.util.UUID;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.jdbc.trident.state.JdbcUpdater;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;
import java.sql.Types;
import com.google.common.collect.Lists;
import com.sunsheen.jfids.bigdata.storm.bolt.JdbcOperationBolt;
import com.sunsheen.jfids.bigdata.storm.trident.JdbcTridentStates;
/**
 * @author cwc
 * @date 2018年9月31日  
 * @description: storm集成Jdbc讀寫測試類
 * @version 3.0.0 
 */
public class JdbcMain {
    
    private static String insterSql=" insert into jdbc_test(name,age,sex) values (?,?,?) ";
    private static String selectSql="select age,sex from jdbc_test where name = ?";
    private static String tableName="jdbc_test";
    private static Fields outputFields = new Fields("age", "sex");//就是查詢出的數據
    
    private static List<Column> queryParamColumns = Lists.newArrayList(new Column("name", Types.VARCHAR));//占位符的字段
    
    private static List<Column> columnSchema = Lists.newArrayList(
            new Column("name", java.sql.Types.VARCHAR),
            new Column("age", java.sql.Types.INTEGER),
             new Column("sex", java.sql.Types.INTEGER));
    
    
    public static void main(String[] args){
            JdbcWrite(columnSchema,insterSql,tableName);
            JdbcTrident(columnSchema,insterSql,tableName);
            JdbcRead(outputFields,queryParamColumns,selectSql);
            JdbcReadTrident(outputFields,queryParamColumns,selectSql);
    }
    
    /**
     * 通過jdbc的方式向數據庫寫數據
     * @param connectionProvider 連接數據庫
     * @param columnSchema 需要插入的列名
     * @param sqlString 配合列名進行字段插入
     * @param tableName 通過表名整表插入
     */
    public static void JdbcWrite(List<Column> columnSchema,String sqlString,String tableName){
        Config conf = new Config();
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("jdbc-save", new JdbcSpout(), 2);
        builder.setBolt("save", JdbcOperationBolt.getInsertBolt(tableName), 1).shuffleGrouping("jdbc-save");//getInsertBolt根據參數的不同,切換字段或全表插入的模式
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", conf, builder.createTopology());
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cluster.shutdown();
    }
    
    /**
     * 通過jdbc Trident的方式向數據庫寫數據
     * @param connectionProvider 連接數據庫
     * @param columnSchema 需要插入的列名
     * @param sqlString 配合列名進行字段插入
     * @param tableName 通過表名整表插入
     */
    public static void  JdbcTrident(List<Column> columnSchema,String sqlString,String tableName){
        TridentTopology topology = new TridentTopology();
        Config config = new Config();
        
//        JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(columnSchema, insterSql);//字段插入
        JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(tableName);
        
        Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout());
        TridentState state = topology.newStaticState(jdbcStateFactory);
        
        //將數據更新插入數據庫  jdbcStateFactory 根據設置的表名更新到對應的數據庫 批處理 一批一批的插入
        stream.partitionPersist(jdbcStateFactory, new Fields("name", "age","sex"), new JdbcUpdater(), new Fields());
       
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(UUID.randomUUID().toString(), config, topology.build());
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。
        cluster.shutdown();
    }
    
    /**
     * 讀數據
     * @param connectionProvider
     */
    public static void JdbcRead(Fields outputFields,List<Column> queryParamColumns,String selectSql){
        JdbcLookupBolt JdbcLookupBolt = JdbcOperationBolt.getJdbcLookupBolt(outputFields, queryParamColumns, selectSql);
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("jdbc-reader", new JdbcSpout(), 2);
        builder.setBolt("read",  JdbcLookupBolt, 1).shuffleGrouping("jdbc-reader");
        builder.setBolt("JdbcOutBolt",new JdbcOutBolt(), 1).shuffleGrouping("read");
        Config conf = new Config();
        
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(UUID.randomUUID().toString(), conf, builder.createTopology());
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。
            cluster.shutdown();        
    }
    
    /**
     * jdbc Trident 查詢數據 
     * @param outputFields 要輸出傳遞的字段,這里的字段是storm中隨便命名的不是數據庫字段
     * @param queryParamColumns 占位符的字段,也就是spout傳出過來的字段,通過該字段查詢數據
     * @param selectSql 查詢語句,這里sql已經把字段名固定了,上面的字段名都是形參用於傳輸
     */
    public static void JdbcReadTrident(Fields outputFields,List<Column> queryParamColumns,String selectSql){
        TridentTopology topology = new TridentTopology();
        JdbcStateFactory jdbcStateFactory = JdbcTridentStates.getJdbcSelectState(outputFields, queryParamColumns, selectSql);
        
        Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout());
        TridentState state = topology.newStaticState(jdbcStateFactory);
//         stream.partitionPersist(jdbcStateFactory, outputFields, new JdbcUpdater(),outputFields);//這里可以根據自己需要進行處理
        
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(UUID.randomUUID().toString(), conf, topology.build());
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。
            cluster.shutdown();        
    }
    
    
}

 補充:打印讀取出來的數據

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

/**
 * @author cwc
 * @date 2018年5月30日  
 * @description:打印拿到的數據
 * @version 1.0.0 
 */
public class JdbcOutBolt extends BaseRichBolt{

    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple) {
        
                Object str =tuple.getValue(0);
                Object str2 =tuple.getInteger(1);
                System.out.println(str+"-->"+str2);
                
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector=collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("JdbcOutBolt"));
    }
    
    
}

 

主要內容大家看代碼就清楚了,有問題大家可以在我博客下留言。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM