java多線程向數據庫寫入數據


任務: 從sqlserver中將一個表A(約16W條數據)導到mysql中對應的一個表B中。

思路:分段獲取A表中的數據后,用多個線程同時向B表中寫入。

關鍵代碼

//將數據庫中的數據條數分段
public void division(){
//獲取要導入的總的數據條數
String sql3="SELECT count(*) FROM [CMD].[dbo].[mycopy1]";
try {
pss=cons.prepareStatement(sql3);
rss=pss.executeQuery();

while(rss.next()){
System.out.println("總記錄條數:"+rss.getInt(1));
sum=rss.getInt(1);
}
//每30000條記錄作為一個分割點
if(sum>=30000){
n=sum/30000;
residue=sum%30000;
}else{
residue=sum;
}

System.out.println(n+" "+residue);

} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
線程類
public MyThread(int start,int end) {
this.end=end;
this.start=start;
System.out.println("處理掉余數");
try {

System.out.println("--------"+Thread.currentThread().getName()+"------------");
Class.forName(SQLSERVERDRIVER);
System.out.println("加載sqlserver驅動...");
cons = DriverManager.getConnection(CONTENTS,UNS,UPS);
stas = cons.createStatement();
System.out.println("連接SQLServer數據庫成功!!");

System.out.println("加載mysql驅動.....");
Class.forName(MYSQLDRIVER);
con = DriverManager.getConnection(CONTENT,UN,UP);
sta = con.createStatement();
// 關閉事務自動提交
con.setAutoCommit(false);
System.out.println("連接mysql數據庫成功!!");

} catch (Exception e) {
e.printStackTrace();
}
// TODO Auto-generated constructor stub
}





public ArrayList<Member> getAll(){
Member member;
String sql1="select * from (select row_number() over (order by pmcode) as rowNum,*" +
" from [CMD].[dbo].[mycopy1]) as t where rowNum between "+start+" and "+end;
try {
System.out.println("正在獲取數據...");
allmembers=new ArrayList();
rss=stas.executeQuery(sql1);
while(rss.next()){
member=new Member();
member.setAddress1(rss.getString("address1"));
member.setBnpoints(rss.getString("bnpoints"));
member.setDbno(rss.getString("dbno"));
member.setExpiry(rss.getString("expiry"));
member.setHispoints(rss.getString("hispoints"));
member.setKypoints(rss.getString("kypoints"));
member.setLevels(rss.getString("levels"));
member.setNames(rss.getString("names"));
member.setPmcode(rss.getString("pmcode"));
member.setRemark(rss.getString("remark"));
member.setSex(rss.getString("sex"));
member.setTelephone(rss.getString("telephone"));
member.setWxno(rss.getString("wxno"));
member.setPmdate(rss.getString("pmdate"));
allmembers.add(member);
// System.out.println(member.getNames());
}
System.out.println("成功獲取sqlserver數據庫數據!");
return allmembers;

} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("獲取sqlserver數據庫數據發送異常!");
e.printStackTrace();
}
try {
rss.close();
stas.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}

public void inputAll(ArrayList<Member> allmembers){
System.out.println("開始向mysql中寫入");
String sql2="insert into test.mycopy2 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
ps=con.prepareStatement(sql2);
System.out.println("-------------------------等待寫入數據條數: "+allmembers.size());
for(int i=0;i<allmembers.size();i++){
ps.setString(1, allmembers.get(i).getPmcode());
ps.setString(2, allmembers.get(i).getNames());
//System.out.println(allmembers.get(i).getNames());
ps.setString(3, allmembers.get(i).getSex());
ps.setString(4, allmembers.get(i).getTelephone());
ps.setString(5, allmembers.get(i).getAddress1());
ps.setString(6, allmembers.get(i).getPmdate());
ps.setString(7, allmembers.get(i).getExpiry());
ps.setString(8, allmembers.get(i).getLevels());
ps.setString(9, allmembers.get(i).getDbno());
ps.setString(10, allmembers.get(i).getHispoints());
ps.setString(11, allmembers.get(i).getBnpoints());
ps.setString(12, allmembers.get(i).getKypoints());
ps.setString(13, allmembers.get(i).getWxno());
ps.setString(14, allmembers.get(i).getRemark());
//插入命令列表
//ps.addBatch();
ps.executeUpdate();
}
//ps.executeBatch();
con.commit();

ps.close();
con.close();
this.flag=false;
System.out.println(Thread.currentThread().getName()+"--->OK");
} catch (SQLException e) {
// TODO Auto-generated catch block
System.out.println("向mysql中更新數據時發生異常!");
e.printStackTrace();
}
}

@Override
public void run() {
// TODO Auto-generated method stub

while(true&&flag){
this.inputAll(getAll());
}
}

測試類:
public class Test1 {
DbManager dm=null;
MyThread my1=null;
public Test1(){
dm=new DbManager();
System.out.println(dm.n+"----"+dm.residue);

if(dm.n<1){//數據條數小於30000單線程處理
my1=new MyThread(1,dm.sum);
my1.start=1;
my1.end=dm.residue;
Thread t1=new Thread(my1);
t1.start();
}else{//大於30000時

//起n個線程 每個處理30000條數據
for (int i = 1; i <=dm.n; i++) {
new Thread(new MyThread(i)).start();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//處理掉余數
my1=new MyThread(dm.n*30000+1,dm.sum);
Thread t1=new Thread(my1);
t1.start();
}
}
public static void main(String[] args) {
//new Test1();
//遷移完數據,自動關機
try {
Runtime.getRuntime().exec("cmd /c Shutdown -t 10");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

經過多次測試:從sqlserver中讀取16w條數據並寫入mysql,耗時15min左右。
開始會報錯: java heap space

解決方案:(myeclipse)window->Preferences->Java->Installed JREs,選擇當前的JRE,然后edit它;在新窗口里設置Default VM Arguments為 -Xms512M -Xmx512M即可
————————————————
版權聲明:本文為CSDN博主「飛舞的鋤頭」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/bailin0007/article/details/11815177


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM