
1.發布者接口
package com.shoshana.publishsubscribe;
public interface IPublisher<M> {
public void publish(SubscribePublish subscribePublish, M message, boolean isInstantMsg);
}
2.訂閱者接口
package com.shoshana.publishsubscribe;
public interface ISubcriber<M> {
public void subcribe(SubscribePublish subscribePublish);
public void unSubcribe(SubscribePublish subscribePublish);
public void update(String publisher, M message);
}
3.訂閱器類
package com.shoshana.publishsubscribe; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class SubscribePublish<M> { //訂閱器名稱 private String name; //訂閱器隊列容量 final int QUEUE_CAPACITY = 20; //訂閱器存儲隊列 private BlockingQueue<Msg> queue = new ArrayBlockingQueue<Msg>(QUEUE_CAPACITY); //訂閱者 private List<ISubcriber> subcribers = new ArrayList<ISubcriber>(); public SubscribePublish(String name) { this.name = name; } public void publish(String publisher, M message, boolean isInstantMsg) { if (isInstantMsg) { update(publisher, message); return; } Msg<M> m = new Msg<M>(publisher, message); if (!queue.offer(m)) { update(); } } public void subcribe(ISubcriber subcriber) { subcribers.add(subcriber); } public void unSubcribe(ISubcriber subcriber) { subcribers.remove(subcriber); } public void update() { Msg m = null; while ((m = queue.peek()) != null) { this.update(m.getPublisher(), (M) m.getMsg()); } } public void update(String publisher, M Msg) { for (ISubcriber subcriber : subcribers) { subcriber.update(publisher, Msg); } } } class Msg<M> { private String publisher; private M m; public Msg(String publisher, M m) { this.publisher = publisher; this.m = m; } public String getPublisher() { return publisher; } public void setPublisher(String publisher) { this.publisher = publisher; } public M getMsg() { return m; } public void setMsg(M m) { this.m = m; } }
4.發布者實現類
package com.shoshana.publishsubscribe; public class PublisherImpOne<M> implements IPublisher<M> { private String name; public PublisherImpOne(String name) { super(); this.name = name; } public void publish(SubscribePublish subscribePublish, M message, boolean isInstantMsg) { subscribePublish.publish(this.name, message, isInstantMsg); } }
5.訂閱者實現類
package com.shoshana.publishsubscribe; public class SubcriberImpOne<M> implements ISubcriber<M> { public String name; public SubcriberImpOne(String name) { super(); this.name = name; } public void subcribe(SubscribePublish subscribePublish) { subscribePublish.subcribe(this); } public void unSubcribe(SubscribePublish subscribePublish) { subscribePublish.unSubcribe(this); } public void update(String publisher, M message) { System.out.println(this.name + "收到" + publisher + "發來的消息:" + message.toString()); } }
6.測試類
package com.shoshana.publishsubscribe; public class SubPubTest { public static void main(String[] args) { SubscribePublish<String> subscribePublish = new SubscribePublish<String>("訂閱器"); IPublisher<String> publisher1 = new PublisherImpOne<String>("發布者1"); ISubcriber<String> subcriber1 = new SubcriberImpOne<String>("訂閱者1"); ISubcriber<String> subcriber2 = new SubcriberImpOne<String>("訂閱者2"); subcriber1.subcribe(subscribePublish); subcriber2.subcribe(subscribePublish); publisher1.publish(subscribePublish, "welcome", true); publisher1.publish(subscribePublish, "to", true); publisher1.publish(subscribePublish, "yy", false); } }
測試結果:
com.shoshana.publishsubscribe.SubPubTest 訂閱者1收到發布者1發來的消息:welcome 訂閱者2收到發布者1發來的消息:welcome 訂閱者1收到發布者1發來的消息:to 訂閱者2收到發布者1發來的消息:to Process finished with exit code 0