開頭:我這里是根據bolt與trident進行分類的,寫入和讀取的方法可能會在同一個類中,最后會展示一個測試的類來說明怎么用。
JdbcSpout:這個類是我寫入數據和讀取數據的公用spout,細節注釋里說的比較詳細。
import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.Random; /** * @author cwc * @date 2018年5月31日 * @description:存儲數據的spout,我的讀與寫共用的這一個spout,用於測試 * @version 1.0.0 */ public class JdbcSpout extends BaseRichSpout { public static Random random =new Random(); private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模擬數據 public static final List<Values> rows = Lists.newArrayList( new Values("peter",random.nextInt(80),1), new Values("bob",random.nextInt(60),2), new Values("alice",random.nextInt(100),2)); @Override public void nextTuple() { Random rand = new Random(); Values row = rows.get(rand.nextInt(rows.size() - 1)); // this.collector.emit(new Values("bob"));//用於占位符查詢的字段 this.collector.emit(row);//用於存儲寫入 System.out.println(row); Thread.yield(); } @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector =collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("name","age","sex"));//用於存儲寫入 // declarer.declare(new Fields("name"));//用於占位符查詢的字段 } }
Jdbc_bolt類:注意看注釋
import java.util.List; import java.util.Objects; import org.apache.storm.jdbc.bolt.JdbcInsertBolt; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.tuple.Fields; import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils; import parquet.org.slf4j.Logger; import parquet.org.slf4j.LoggerFactory; /** * @author cwc * @date 2018年9月28日 * @version 1.0.0 * @description:jdbc對數據庫的操作 * 向jdbc中寫入數據,分別由sql寫入和全表寫入兩種bolt方式 * jdbc通過字段與sql語句占位符的方式查詢數據 */ public class JdbcOperationBolt { private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider(); private static Logger logger = LoggerFactory.getLogger(JdbcOperationBolt.class); /** * jdbc 根據字段向數據庫寫入數據 * 傳入兩個參數,根據占位符sql插入數據 * @param columnSchema 列名 * @param sqlString sql * @return */ public static JdbcInsertBolt getInsertBolt(List<Column> columnSchema,String sqlString){ if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper) .withInsertQuery(sqlString) .withQueryTimeoutSecs(30); return PersistanceBolt; } logger.error("列名或sql語句不能為空!"); return null; } /** * jdbc 根據表名向數據庫寫入數據 * 傳一個表名參數,進入全表寫入 * 注意,storm中傳入的 * @param tableName 表名 * @return */ public static JdbcInsertBolt getInsertBolt(String tableName){ if(tableName!=null||!Objects.equals(tableName, "")){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider()); JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper) .withTableName(tableName) .withQueryTimeoutSecs(30); return PersistanceBolt; } logger.error("表名不能為空!"); return null; } /** * jdbc 讀取數據 * 根據sql與列名讀取數據庫數據 * @param outputFields 聲明要輸出的字段 * @param queryParamColumns 傳入占位符的字段 * @param sqlString 查詢sql * @return */ public static JdbcLookupBolt getJdbcLookupBolt(Fields outputFields,List<Column> queryParamColumns,String sqlString){ if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){ SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); JdbcLookupBolt JdbcLookupBolt = new JdbcLookupBolt(cp, sqlString, lookupMapper) .withQueryTimeoutSecs(30); return JdbcLookupBolt; } logger.error("輸出字段,輸入字段集合,sql查詢語句都不能為空!"); return null; } }
我將上面獲取數據庫連接的代碼單獨貼出來,因為封裝的比較深。
/** * 獲取Jdbc需要得ConnectionProvider相關配置 * @return */ public static ConnectionProvider getConnectionProvider(){ Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{ put("dataSourceClassName", JdbcClassName); put("dataSource.url", JdbcdbUrl); put("dataSource.user", JdbcUserName); put("dataSource.password", JdbcPassWord);}}; ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); return connectionProvider; }
Jdbc_trident 類
import java.util.List; import java.util.Objects; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.mapper.JdbcMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper; import org.apache.storm.jdbc.mapper.SimpleJdbcMapper; import org.apache.storm.jdbc.trident.state.JdbcState; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.tuple.Fields; import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils; import parquet.org.slf4j.Logger; import parquet.org.slf4j.LoggerFactory; /** * @author cwc * @date 2018年9月28日 * @version 1.0.0 * @description:jdbc Trident 類 */ public class JdbcTridentStates { private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider(); private static Logger logger = LoggerFactory.getLogger(JdbcTridentStates.class); /** * jdbc Trident 根據字段向數據庫寫入數據 * 傳入兩個參數,根據占位符sql插入數據 * @param columnSchema 列名 * @param sqlString sql * @return */ public static JdbcStateFactory getJdbcStateFactory (List<Column> columnSchema,String sqlString){ if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withMapper(simpleJdbcMapper) .withInsertQuery(sqlString) .withQueryTimeoutSecs(200); JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("列名或sql為空!"); return null; } /** * jdbc Trident 根據表名向數據庫寫入數據 * 傳一個表名參數,進入全表寫入 * 注意,storm中傳入的 * @param tableName 表名 * @return */ public static JdbcStateFactory getJdbcStateFactory(String tableName){ if(tableName!=null||!Objects.equals(tableName, "")){ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider()); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withMapper(simpleJdbcMapper) .withTableName(tableName) .withQueryTimeoutSecs(200); JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("表名為空!"); return null; } /** * jdbc Trident 讀取數據 * @param outputFields 輸出列表 * @param queryParamColumns 占位符字段 * @param sqlString 查詢語句 * @return */ public static JdbcStateFactory getJdbcSelectState(Fields outputFields,List<Column> queryParamColumns,String sqlString){ if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){ SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns); JdbcState.Options options = new JdbcState.Options() .withConnectionProvider(cp) .withJdbcLookupMapper(lookupMapper) .withSelectQuery(sqlString) .withQueryTimeoutSecs(30); JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options); return jdbcStateFactory; } logger.error("輸出字段,輸入字段集合,sql查詢語句都不能為空!"); return null; } }
測試類:
import java.util.List; import java.util.UUID; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.jdbc.bolt.JdbcLookupBolt; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.trident.state.JdbcStateFactory; import org.apache.storm.jdbc.trident.state.JdbcUpdater; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.sql.Types; import com.google.common.collect.Lists; import com.sunsheen.jfids.bigdata.storm.bolt.JdbcOperationBolt; import com.sunsheen.jfids.bigdata.storm.trident.JdbcTridentStates; /** * @author cwc * @date 2018年9月31日 * @description: storm集成Jdbc讀寫測試類 * @version 3.0.0 */ public class JdbcMain { private static String insterSql=" insert into jdbc_test(name,age,sex) values (?,?,?) "; private static String selectSql="select age,sex from jdbc_test where name = ?"; private static String tableName="jdbc_test"; private static Fields outputFields = new Fields("age", "sex");//就是查詢出的數據 private static List<Column> queryParamColumns = Lists.newArrayList(new Column("name", Types.VARCHAR));//占位符的字段 private static List<Column> columnSchema = Lists.newArrayList( new Column("name", java.sql.Types.VARCHAR), new Column("age", java.sql.Types.INTEGER), new Column("sex", java.sql.Types.INTEGER)); public static void main(String[] args){ JdbcWrite(columnSchema,insterSql,tableName); JdbcTrident(columnSchema,insterSql,tableName); JdbcRead(outputFields,queryParamColumns,selectSql); JdbcReadTrident(outputFields,queryParamColumns,selectSql); } /** * 通過jdbc的方式向數據庫寫數據 * @param connectionProvider 連接數據庫 * @param columnSchema 需要插入的列名 * @param sqlString 配合列名進行字段插入 * @param tableName 通過表名整表插入 */ public static void JdbcWrite(List<Column> columnSchema,String sqlString,String tableName){ Config conf = new Config(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("jdbc-save", new JdbcSpout(), 2); builder.setBolt("save", JdbcOperationBolt.getInsertBolt(tableName), 1).shuffleGrouping("jdbc-save");//getInsertBolt根據參數的不同,切換字段或全表插入的模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } cluster.shutdown(); } /** * 通過jdbc Trident的方式向數據庫寫數據 * @param connectionProvider 連接數據庫 * @param columnSchema 需要插入的列名 * @param sqlString 配合列名進行字段插入 * @param tableName 通過表名整表插入 */ public static void JdbcTrident(List<Column> columnSchema,String sqlString,String tableName){ TridentTopology topology = new TridentTopology(); Config config = new Config(); // JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(columnSchema, insterSql);//字段插入 JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(tableName); Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout()); TridentState state = topology.newStaticState(jdbcStateFactory); //將數據更新插入數據庫 jdbcStateFactory 根據設置的表名更新到對應的數據庫 批處理 一批一批的插入 stream.partitionPersist(jdbcStateFactory, new Fields("name", "age","sex"), new JdbcUpdater(), new Fields()); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), config, topology.build()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。 cluster.shutdown(); } /** * 讀數據 * @param connectionProvider */ public static void JdbcRead(Fields outputFields,List<Column> queryParamColumns,String selectSql){ JdbcLookupBolt JdbcLookupBolt = JdbcOperationBolt.getJdbcLookupBolt(outputFields, queryParamColumns, selectSql); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("jdbc-reader", new JdbcSpout(), 2); builder.setBolt("read", JdbcLookupBolt, 1).shuffleGrouping("jdbc-reader"); builder.setBolt("JdbcOutBolt",new JdbcOutBolt(), 1).shuffleGrouping("read"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), conf, builder.createTopology()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。 cluster.shutdown(); } /** * jdbc Trident 查詢數據 * @param outputFields 要輸出傳遞的字段,這里的字段是storm中隨便命名的不是數據庫字段 * @param queryParamColumns 占位符的字段,也就是spout傳出過來的字段,通過該字段查詢數據 * @param selectSql 查詢語句,這里sql已經把字段名固定了,上面的字段名都是形參用於傳輸 */ public static void JdbcReadTrident(Fields outputFields,List<Column> queryParamColumns,String selectSql){ TridentTopology topology = new TridentTopology(); JdbcStateFactory jdbcStateFactory = JdbcTridentStates.getJdbcSelectState(outputFields, queryParamColumns, selectSql); Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout()); TridentState state = topology.newStaticState(jdbcStateFactory); // stream.partitionPersist(jdbcStateFactory, outputFields, new JdbcUpdater(),outputFields);//這里可以根據自己需要進行處理 Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(UUID.randomUUID().toString(), conf, topology.build()); try { Thread.sleep(100000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }//這個時為了防止你忘記關閉程序,造成內存爆炸,但是不要設置時間太小,太小程序沒跑完就終止了,要報錯。 cluster.shutdown(); } }
補充:打印讀取出來的數據
import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; /** * @author cwc * @date 2018年5月30日 * @description:打印拿到的數據 * @version 1.0.0 */ public class JdbcOutBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple tuple) { Object str =tuple.getValue(0); Object str2 =tuple.getInteger(1); System.out.println(str+"-->"+str2); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) { // TODO Auto-generated method stub this.collector=collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("JdbcOutBolt")); } }
主要內容大家看代碼就清楚了,有問題大家可以在我博客下留言。