多節點部署保證HA,分布式鎖代碼
public class DistributedLock implements Watcher,Runnable{
private static final Logger logger = LoggerFactory.getLogger(DistributedLock.class);
private int threadId;
private ZKConnector zkClient;
private String selfPath;
private String waitPath;
private String LOG_PREFIX_OF_THREAD;
private AbstractApplicationContext ctx;
private static boolean hascreated = false;
//確保連接zookeeper成功
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
//確保每個進程運行結束
private static final CountDownLatch threadSemaphore = new CountDownLatch(Constant.THREAD_NUM);
public ZKConnector getZkClient() {
return zkClient;
}
public void setZkClient(ZKConnector zkClient) {
this.zkClient = zkClient;
}
public DistributedLock(int id,AbstractApplicationContext context,ZKConnector zkClient){
this.threadId = id;
this.zkClient = zkClient;
LOG_PREFIX_OF_THREAD = Thread.currentThread().getName().concat("_").concat(String.valueOf(Thread.currentThread().getId()));
try{
zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT);
//GROUP_PATH 不存在的話,由一個線程創建即可
synchronized (threadSemaphore) {
if(!zkClient.exist(Constant.GROUP_PATH)){
zkClient.createPersistNode(Constant.GROUP_PATH, "該節點由線程"+threadId+"創建");
}
}
ctx = context;
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void run() {
getLock();
}
@Override
public void process(WatchedEvent event) {
if(event ==null){
return;
}
if(KeeperState.SyncConnected ==event.getState()){
if(EventType.None == event.getType()){
connectedSemaphore.countDown();
}else if(event.getType()==EventType.NodeDeleted && event.getPath().equals(waitPath)){
if(checkMinPath()){
getLockSuccess();
}
}
}
}
/**
* 獲取鎖邏輯:
* 首先是上來先在zookeeper上注冊一把屬於自己的鎖,然后修改狀態為已創建
* 第二步,檢查自己是否是最小id的鎖,若是則獲取鎖,不是則繼續等待
*/
private void getLock(){
if(!hascreated){
selfPath = this.getZkClient().createEsquentialNode(Constant.SUB_PATH, "");
hascreated = true;
}
if(checkMinPath()){
getLockSuccess();
}else{
Executor.run(this, 1, 1,TimeUnit.SECONDS);
}
}
/**
* 檢查自己是不是最小路徑
* @return
*/
public boolean checkMinPath(){
List<String> subNodes = this.getZkClient().getChildren(Constant.GROUP_PATH);
Collections.sort(subNodes);
//查找"/syncLocks"后面的路徑
int index = subNodes.indexOf(selfPath.substring(Constant.GROUP_PATH.length()+1));
switch(index){
case -1:{
return false;
}
case 0:{
return true;
}
default:{
this.waitPath = Constant.GROUP_PATH+"/"+subNodes.get(index-1);
//Logger.info("waitPath: "+waitPath);
this.getZkClient().readData(waitPath);
if(!this.getZkClient().exist(waitPath)){
return checkMinPath();
}
}
}
return false;
}
/**
* 獲取鎖成功
*/
public void getLockSuccess(){
if(!this.getZkClient().exist(selfPath)){
logger.error(LOG_PREFIX_OF_THREAD+"本節點已不存在.");
return;
}
logger.info(LOG_PREFIX_OF_THREAD + "獲取鎖成功,進行同步工作!");
try{
new Worker(ctx).doWork();
}catch(Exception ex){
logger.info(ex.getMessage());
Executor.run(this, 1, 1, TimeUnit.SECONDS);
return;
}
logger.info(LOG_PREFIX_OF_THREAD+"刪除本節點:"+selfPath);
this.getZkClient().deleteNode(selfPath);
this.getZkClient().releaseConnection();
threadSemaphore.countDown();
}
}
執行同步工作代碼
public class Worker {
private static final Logger logger = LoggerFactory.getLogger(Worker.class);
private static JdbcTemplate jdbcTemplate;
private final ObjectMapper mapper = new ObjectMapper();
private ZKConnector zkClient =null;
private TransportClient client =null;
private Timestamp currentTimestamp = null;
private Timestamp previousTimestamp = null;
private static final String oggSql = "select * from t_order t0 left join t_order_attachedinfo t1 on t0.order_id = t1.order_id where ";
private String sql;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
private TransportClient getClient() {
Settings settings = Settings.settingsBuilder().put("cluster.name", Constant.CLUSTER).build();
TransportClient client = TransportClient.builder().settings(settings).build();
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(Constant.ESHOST), Constant.ESPORT));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
public Worker(AbstractApplicationContext ctx){
//初始化Oracle連接
jdbcTemplate = (JdbcTemplate) ctx.getBean("jdbcTemplate");
client = getClient();
zkClient = new ZKConnector();
zkClient.createConnection(Constant.ZKSERVER, Constant.SESSION_TIMEOUT);
//初始化zookeeper鎖,由於zookeeper不能聯級創建
if(!zkClient.exist(Constant.ZK_PATH)){
zkClient.createPersistNode(Constant.ZK_PATH,"");
}
/**
* 獲取zookeeper的最后同步時間
*/
if(currentTimestamp == null){
String zkTimestamp = zkClient.readData(Constant.NODE_PATH);
if(zkTimestamp != null && !zkTimestamp.equals(""))
{
try
{
currentTimestamp = Timestamp.valueOf(zkTimestamp);
logger.info("獲取zookeeper最后同步時間: "+currentTimestamp);
}catch(Exception e){
zkClient.deleteNode(Constant.NODE_PATH);
}
}
}
}
/**
* 同步work的邏輯:
* 將Oracle里面的規則表同步到緩存當中
* 首先是訪問Oracle里面數據,通過訪問最小鎖里面的同步時間戳,查詢出大於同步時間戳的數據
* 如果在zookeeper中獲取的時間戳為空,則查詢條件增加時間戳,寫入存儲框架
* 寫入成功之后,將最后一條記錄的同步時間戳寫到zookeeper集群中
* 若寫入失敗,和zookeeper握手失敗,會話鎖消失
* 然后導入ElasticSearch中
*/
public void doWork(){
logger.info("start ...");
//一直進行同步工作
while(true){
String sqlwhere = "";
//根據時間戳獲取Mycat中規則表數據
String sql = "";
//若最后一次同步時間為空,則按最后更新時間排序,取最小的時間作為當前時間戳
if(currentTimestamp != null){
sql = "select order_id,timestamp from t_order_changes where rownum <= 10 and timestamp > to_timestamp('" + currentTimestamp.toString() + "','yyyy-mm-dd hh24:mi:ss.ff6')";
}else{
sql = "select order_id,timestamp from t_order_changes where rownum <= 10 order by timestamp";
}
//查詢該時間段的訂單id
List<String> ids = new ArrayList<String>();
//升序會將最后一次的時間也就是最大的時間作為當前的currentTimeStamp
ids = jdbcTemplate.query(sql, new Object[] {}, new RowMapper<String>()
{
public String mapRow(ResultSet result, int rowNum) throws SQLException {
currentTimestamp = result.getTimestamp("timestamp");
return result.getString("order_id");
}
});
if(ids.size() ==0){
continue;
}
int i =0;
List<String> checkIds = new ArrayList<String>();
for (String id : ids) {
//若存在更新的id則跳過
if (checkIds.contains(id)) {
continue;
}
if (i == 0) {
sqlwhere = sqlwhere.concat(" t0.order_id = '" + id + "'");
} else {
sqlwhere = sqlwhere.concat(" or t0.order_id = '" + id + "'");
}
checkIds.add(id);
i++;
}
System.out.println(oggSql.concat(sqlwhere));
//objs 即是Oracle里面查詢出來需要同步的數據
List<JSONObject> objs = jdbcTemplate.query(oggSql.concat(sqlwhere), new Object[] {}, new RowMapper<JSONObject>()
{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public JSONObject mapRow(ResultSet result, int rowNum) throws SQLException {
int c = result.getMetaData().getColumnCount();
JSONObject obj = new JSONObject();
for(int t =1 ;t <= c;t++)
{
if(result.getObject(t) == null)
{
continue;
}
if(result.getMetaData().getColumnType(t) == Types.DATE)
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getDate(t));
}else if(result.getMetaData().getColumnType(t) == Types.TIMESTAMP)
{
Date date = new Date(result.getTimestamp(t).getTime());
String f = sdf.format(date);
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(),sdf.format(date));
}else
{
obj.put(result.getMetaData().getColumnLabel(t).toLowerCase(), result.getObject(t));
}
}
return obj;
}
});
/*for (JSONObject obj : objs) {
System.out.println(obj.toJSONString());
}*/
/**
* 將查詢出來的數據寫入到elasticsearch中
*/
BulkRequestBuilder bulkRequest =null;
try {
bulkRequest = client.prepareBulk();
for (JSONObject obj : objs) {
byte[] json;
try {
json = mapper.writeValueAsBytes(obj);
bulkRequest.add(new IndexRequest(Constant.INDEX, Constant.INDEX, obj.getString("order_id"))
.source(json));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
logger.info("====================批量創建索引過程中出現錯誤 下面是錯誤信息==========================");
long count = 0L;
for (BulkItemResponse bulkItemResponse : bulkResponse) {
System.out.println("發生錯誤的 索引id為 : "+bulkItemResponse.getId()+" ,錯誤信息為:"+ bulkItemResponse.getFailureMessage());
count++;
}
logger.info("====================批量創建索引過程中出現錯誤 上面是錯誤信息 共有: "+count+" 條記錄==========================");
currentTimestamp = previousTimestamp;
} else {
logger.info("The lastest currenttimestamp : ".concat(currentTimestamp.toString()));
previousTimestamp = currentTimestamp;
//將寫入成功后的時間寫到zookeeper中
zkClient.writeData(Constant.NODE_PATH, String.valueOf(currentTimestamp));
}
} catch (NoNodeAvailableException e) {
currentTimestamp = previousTimestamp;
e.printStackTrace();
}
}
}
}
調度工具代碼
public class Executor {
private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
public static void run(Runnable r,long init,long delay,TimeUnit u){
service.scheduleWithFixedDelay(r, init, delay, u);
}
}
啟動類
public class StartRcpSync {
private static final Logger Logger = LoggerFactory.getLogger(StartRcpSync.class);
private static AbstractApplicationContext appContext = null;
private static String confPath = null;
static{
//后續來讀取命令中的conf 例如 java -Dconf=conf/*.xml -classpath .:lib/*
if(System.getProperty("conf") !=null){
System.out.println(System.getProperty("user.dir"));
confPath = System.getProperty("conf");
System.out.println("讀取配置路徑conf目錄:"+confPath);
appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
}else{
confPath = "E:/aa/bb/src/main/resources/conf";
appContext = new FileSystemXmlApplicationContext(confPath.concat("/applicationContext*.xml"));
}
}
public static void main(String[] args) {
Logger.info("Sync will starting ...");
//加載配置文件
appContext.registerShutdownHook();
appContext.start();
Logger.info("Sync has been started successfully.");
//獲取zookeeper的連接
ZKConnector zkClient = new ZKConnector();
DistributedLock dl = new DistributedLock(new Random().nextInt(),appContext,zkClient);
dl.run();
}
//just for Test
public static void DoTask(){
Worker w =new Worker(appContext);
w.doWork();
}
}

