最近的一個舊項目重構過程中,使用到了gearman這個開源項目,簡單來講,這是一個類似MQ的異步系統,一邊派發任務,一邊處理任務(有類似MQ中的消息發送方與接收方),目前支持java,php等多種語言,缺點是存在單點問題(server的HA官方沒有提供方案,需要二次開發)。
下面是java語言的示例:
注:gearman的java客戶端實例有好幾個版本,不同的版本之間相差巨大,建議使用官方推薦的最新版,地址為https://github.com/gearman/java-service
一、spring配置
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 5 6 <bean id="gearmanImpl" class="org.gearman.impl.GearmanImpl"></bean> 7 8 <bean id="gearmanServer" class="org.gearman.impl.server.remote.GearmanServerRemote"> 9 <constructor-arg index="0" ref="gearmanImpl"/> 10 <constructor-arg index="1"> 11 <bean class="java.net.InetSocketAddress"> 12 <constructor-arg index="0" value="localhost"/> 13 <constructor-arg index="1" value="4730"/> 14 </bean> 15 </constructor-arg> 16 </bean> 17 18 <bean id="gearmanClient" class="org.gearman.impl.client.ClientImpl"> 19 <constructor-arg index="0" ref="gearmanImpl"/> 20 </bean> 21 22 <bean id="gearmanWorker" class="org.gearman.impl.worker.GearmanWorkerImpl"> 23 <constructor-arg index="0" ref="gearmanImpl"/> 24 </bean> 25 26 27 </beans>
二、任務發送方(也稱Client)
import org.gearman.GearmanJobEvent;
import org.gearman.GearmanJobReturn;
import org.gearman.GearmanServer;
import org.gearman.impl.client.ClientImpl;
import org.gearman.impl.server.remote.GearmanServerRemote;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.io.UnsupportedEncodingException;
/**
* Created by 菩提樹下的楊過 on 6/12/16.
*/
public class DemoClient {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-gearman-test.xml");
GearmanServer gearmanServer = ctx.getBean(GearmanServerRemote.class);
ClientImpl client = ctx.getBean(ClientImpl.class);
client.addServer(gearmanServer);
client.submitBackgroundJob("demoTask", "jimmy1".getBytes());//asynchronous commit
GearmanJobReturn jobReturn = client.submitJob("demoTask", "jimmy2".getBytes());//synchronous commit
while (!jobReturn.isEOF()) {
//next job event
GearmanJobEvent event = jobReturn.poll();
switch (event.getEventType()) {
case GEARMAN_JOB_SUCCESS: //job execute success
System.out.println(new String(event.getData(), "utf-8"));
break;
case GEARMAN_SUBMIT_FAIL: //job submit fail
case GEARMAN_JOB_FAIL: //job execute fail
System.err.println(event.getEventType() + ": "
+ new String(event.getData(), "utf-8"));
default:
}
}
client.shutdown();
}
}
三、任務處理方(也稱Worker)
import org.gearman.GearmanFunction;
import org.gearman.GearmanFunctionCallback;
import org.gearman.GearmanServer;
import org.gearman.GearmanWorker;
import org.gearman.impl.server.remote.GearmanServerRemote;
import org.gearman.impl.worker.GearmanWorkerImpl;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class DemoWorker implements GearmanFunction {
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-gearman-test.xml");
GearmanServer gearmanServer = ctx.getBean(GearmanServerRemote.class);
GearmanWorker worker = ctx.getBean(GearmanWorkerImpl.class);
worker.addFunction("demoTask", new DemoWorker());
worker.addServer(gearmanServer);
}
@Override
public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {
if (data != null) {
String param = new String(data);
System.out.println("demoWorker => param :" + param);
return ("return value:" + param).getBytes("utf-8");
} else {
return "not receive data!".getBytes("utf-8");
}
}
}
