打通電商多模式支持的“任督二脈”


你聽說過任督二脈嗎?像這樣~

咳咳~今天不講武功,講電商平台設計的功夫~

背景

當今的電商可不僅僅是B2C商城,接下來還會有O2O,往后可能還會有商超、奧萊、二手交易。。。且稱之為業務模式~而每個業務模式下還會有預售、競拍、拼團等不同組合的子模式。

可是我商城的商品列表頁不想展示O2O的商品啊,商品列表的數據希望按一定規則相互隔離。其他模塊,有的出於操作習慣的考慮不隔離,有的出於用戶行為的考慮需要隔離。

各模塊數據隔離需求如下

 

列表頁

商詳頁

商品組

優惠券

活動

訂單

...

原商城

隔離

隔離

隔離

暫時不隔離

暫時不隔離

隔離

 

O2O

隔離

隔離

隔離

暫時不隔離

暫時不隔離

隔離

 

各模塊流程差異

 

新建商品

列表頁

購物車

訂單

...

原商城

店鋪創建,門店設置庫存

基於item建es文檔

跨門店

狀態流轉走快遞

 

O2O

門店創建(沿用原模型但弱化店鋪的概念)

基於item建es文檔

單個門店

狀態流轉走配送

 

於是我們就會面臨不同的改造的場景。

場景1,新建商品就是新建商品啊!!!

例如商品的新建保存,是基礎服務,已經具備通用存儲模型。為了支持新模式我還得改服務接口、發布二方包?咱可不可以這樣?

商品服務

Integer bizMode = BizModeContext.getBizMode();
itemDO.setBizMode(bizMode);
// ...
itemDAO.save(itemDO);

 

場景2,下單就是下單啊!!!

例如創建訂單,雖然商品維度、訂單類型、優惠方式有很多,但我修改一下B2C下單的字段計算,還要引發O2O模式的回歸測試?咱可不可以這樣?

甚至這樣~

實現類路由

@BizModeService( bizMode=BizMode.B2C, srvClz=OrderTradeService.class )
public class MallOrderTradeServiceImpl extends AbstractOrderTradeService { }

//使用時
Integer bizMode = BizModeContext.getBizMode();
OrderTradeService srv = BizModeRouter.routeBean(bizMode, OrderTradeService.class);

眼尖的小哥哥可能已經發現,要是能再搭配個熱加載的bean容器,都可以做成插件了!emmm...那是遠景~

 

如何打通任督二脈?

首先要舌尖抵住上顎,再來三個深呼吸~然后拿起一本《Thinking In Java》或《Core Java》假裝在修煉。。。等等。。。什么是任督二脈?

Java老司機都知道,我們通常會把ApplicationContext比作Spring的任督二脈,它貫穿始終,管理着bean的生命周期和傳遞。

所以電商平台的任督二脈就是BizModeContext啦!它的經脈圖大概長這樣~

文章出處

 

所以我們通過下面一二三四,入口處打標、dubbo服務間傳遞、RocketMQ傳遞、本機線程池內傳遞,一步一步打通整個標的透傳。

步驟1-打標

aop按包路徑切面+注解覆蓋,滿足你不同的定制需求~於是,在用戶點擊頁面操作的那一刻,每個接口都被打上了“模式標”。

注解打標

@Configuration
public class ControllerConfig {
        @Aspect
        @Component
        public static class CxcAdvice implements BizModeControllerAspect {
                 @Override
                 public Integer getBizMode() {
                         return 300;
                 }
                 @Override
                 @Pointcut("execution(* com.mall.web.controller..*(..))")
                 public void pointcut() {
                 }
        }
}
 
@Slf4j
@RestController
@MarkBizMode(bizMode = 200)
public class AdminOldController2 {
        @RequestMapping("/admin_anno_byclass")
        public String annoByClass() {
                 log.info("annoByClass got bizmode: " + BizModeContext.getBizMode());
                 return "this is " + this.getClass().toString();
        }
        @RequestMapping("/admin_anno_bymethod")
        @MarkBizMode(bizMode = 100)
        public String annoByMethod() {
                 log.info("annoByMethod got bizmode: " + BizModeContext.getBizMode());
                 return "this is " + this.getClass().toString();
        }
}

 

步驟2-dubbo服務傳遞

借助dubbo自帶的Filter和RpcContext可以輕松實現。那是因為dubbo的設計中已經充分考慮了。

Filter的使用

filter定義

@Activate(group = Constants.CONSUMER)
public class BizModeDubboConsumerFilter implements Filter { }

filter配置掃描發現: /src/main/resources/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter

filter的裝配原理: List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

dubbo的SPI擴展機制就不具體展開啦~

RpcContext的生命周期

RpcContext -> RpcInvocation ---服務調用--- RpcInvocation -> RpcContext

業務擴展的調用:RpcContext.getContext().setAttachment("bizMode", (bizMode.toString()));

RpcContext.java

//創建一個線程隔離的上下文實例
    private static final InternalThreadLocal<RpcContext> LOCAL = new InternalThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };
    public static RpcContext getContext() {
        return LOCAL.get();
    }

 

dubbo對attachment的傳遞:

  • 本機(當前線程)的保存:RpcContext
  • 遠程調用的保存和傳遞:RpcInvocation
  • 將RpcContext存入RpcInvocation:AbstractInvoker
public abstract class AbstractInvoker<T> implements Invoker<T> {
    @Override
    public Result invoke(Invocation inv) throws RpcException {
//節選。。。
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
          invocation.addAttachmentsIfAbsent(context);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
          invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//節選。。。
// return ...
    }
    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
  • 序列化與反序列化:DubboCodec (此處不展開)
  • 從RpcInvocation取出,存入提供方的RpcContext:ContextFilter
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
//節選。。。
                RpcContext.getContext().getAttachments().putAll(attachments);
//節選。。。
        try {
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            // pass attachments to result
            result.addAttachments(RpcContext.getServerContext().getAttachments());
            return result;
        } finally {
            RpcContext.removeContext();
            RpcContext.getServerContext().clearAttachments();
        }
    }
}

步驟3-RocketMQ傳遞

RocketMQ設計時也預留了擴展打標的能力,只需要把模式標存入屬性字段,就能跟隨MQ把標傳遞到消費方。

消息體數據結構

org.apache.rocketmq.common.message.Message

private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;

 

//填入屬性,僅包可見
void putProperty(final String name, final String value);

//填入自定義屬性,與其他屬性共享map,但對key過濾保留字
public void putUserProperty(final String name, final String value);

org.apache.rocketmq.common.message.MessageExt

是Message的子類

 private int queueId;

private int storeSize;

private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;

private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;

private long preparedTransactionOffset;

因此,可以在消息體的 Map<String, String> properties 屬性上附加打標信息。

 

發消息的擴展鈎子

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.registerSendMessageHook(SendMessageHook)

 

收消息的擴展鈎子

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.registerConsumeMessageHook(ConsumeMessageHook)

但由於收消息是一批一批收的,收到的是消息列表 List<MessageExt>,默認配置下只有一個元素,但允許配置多個,因此不能在這個鈎子上做擴展。

因此,對starter做改造,在單個消息消費的位置增加了類似的hook擴展點。

ConsumerHook

public interface ConsumeOneMessageAdvice {
    String hookName();
    void consumeMessageBefore(final MessageExt msg);
    void consumeMessageAfter(final MessageExt msg);
}

 

步驟4-線程池子線程傳遞

BizModeContext的原理是用ThreadLocal存儲線程范圍的上下文,可是實際場景中,總會有些異步和並發的問題,需要使用到線程池。那么問題來了。

父線程context如何傳遞給子線程

jdk自帶InheritableThreadLocal類解決了父子線程傳遞的問題。

Thread.init()

public class Thread implements Runnable {
    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
//節選。。。
        Thread parent = currentThread();
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
//節選。。。
    }
}
 
//子線程創建時會把父線程的ThreadLocalMap復制到子線程中
public class ThreadLocal<T> {
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];
            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }
} 

 

線程池中子線程復用時怎樣維護context

但如果使用了線程池,子線程運行完並不會銷毀,被另一個父線程復用時不會重新初始化。

這時候我們需要借助一個開源框架 TransmittableThreadLocal  https://github.com/alibaba/transmittable-thread-local

(圖片來自官網)

在獲取子線程時重新讀取父線程的上下文,子線程run()執行結束時清理子線程的上下文。

打通任督二脈后可以練什么武功?

打通模式標的透傳后,能怎么使用呢?大家可以盡情發揮下想象力~何時何地只需要 BizModeContext.getBizMode()

  • 日志MDC打標:可以統一給日志記錄加入模式標。
  • sql自動追加查詢條件:通過mybatis插件擴展或甚至是數據源代理,可以給sql自動追加隔離標條件(雖然具體業務中並不那么好用)。
  • 全鏈路監控或壓測:是的,如果打標的不是bizMode,而是traceId或影子標,就可以通過這個“任督二脈”透傳整個系統!
  • 新模式插件化接入:各業務板塊逐漸模塊化后,可以通過給擴展點開發實現類的形式接入新模式。

遠景-多模式插件化部署

我們期望,未來新的業務模式接入,就像安裝插件一樣無痛無感知。

 

新模式接入,只需要增加部署新的bizmodeX節點,其他業務不需要回歸測試。

某個業務,例如bizmode100,部署重啟時,其他業務不受影響。

這還需要一步一步來,目前我們先實現了“任督二脈”的打通,后面的故事,敬請期待哦~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM