Node——微服務架構(一)


new ServiceBroker

default settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();

custom settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    logLevel: "info"
});

communicate with remote nodes

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    nodeID: "node-1",
    transporter: "nats://localhost:4222",
    logLevel: "debug",
    requestTimeout: 5 * 1000,
    requestRetry: 3
});

broker options

  • logLevel
    • type:string
    • default:info
    • des:可選項目還有 trace、debug、 info、 warn、 error、 fatal
  • middlewares
    • type:Array<Function>
    • default:null
    • des:中間件
  • created
    • type:Function
    • default:null
    • des:broker 實例被創建的時候將會觸發此函數
  • started
    • type:Function
    • default:null
    • des:broker 實例開始執行時觸發此函數
  • stopped
    • type:Function
    • default:null
    • des:broker 實例停止執行時觸發此函數
  • hotReload
    • type:Boolean
    • default:false
    • des:是否啟動熱加載
  • cacher
    • type:String、Object、Cacher
    • default:null
    • des:若是啟動緩存,兩個相同模型的 broker.call,只有第一個 call 會讓 action 中對應 handler 完整的執行一遍,第二個 call 就不會了,它會直接從緩存中取數據,不常用
    • https://moleculer.services/docs/0.13/caching.html
  • transporter
  • serializer
  • nodeID
    • type:string
    • default:hostname + PID
    • des:這是節點的id,掛載在 某一個 namespace 中是不能夠同名的
  • namespace
    • type:string
    • defalut:”“
    • des:分割一個網咯中的不同區域,基本上用不到,除非項目特別復雜,子服務特別多
  • requestTimeout
    • type:Number
    • default:0
    • des:請求超時設置,單位毫秒

createService

該服務表示Moleculer框架中的一個微服務。您可以定義操作並訂閱事件。若要創建服務,必須定義架構。服務模式類似於VueJS的一個組件

// 定義了兩個actions
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },

        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
});

name

  • 強制屬性,最后去 call 某一個微服務的時候必須帶上 name

version

settings

  • 此屬性相當於倉庫
    • 賦值可以是對象,對象中設置任意鍵值對,action 中通過 this.settings.xxxx 能夠訪問到設置項
    • 遠程節點上可以獲得這些設置項
    • 有一些內部設置是由核心模塊使用的。這些設置名稱以$(美元符號)開頭
      • $noVersionPrefix
        • type:Boolean
        • default:false
        • des:禁用 action 版本前綴
      • $noServiceNamePrefix
        • type:Boolean
        • default:false
        • des:禁用 action 中的服務名稱前綴。
      • $dependencyTimeout
        • type:Number
        • default:0
        • des:依賴等待超時
      • $shutdownTimeout
        • type:Number
        • default:0
        • des:關閉時等待活動請求的超時

mixins

Mixins是一種為Moleculer服務分發可重用功能的靈活方法。服務構造函數將這些混合與當前架構合並。它是在您的服務中擴展其他服務。當服務使用混音時,混音中的所有屬性都將“混合”到當前服務中。

const ApiGwService = require("moleculer-web");

module.exports = {
    name: "api",
    mixins: [ApiGwService]
    settings: {
        // Change port setting
        port: 8080
    },
    actions: {
        myAction() {
            // Add a new action to apiGwService service
        }
    }
}

上面的示例創建了一個API服務,該服務繼承了ApiGwService的所有內容,但是覆蓋了端口設置,並使用新的myAction操作對其進行了擴展

actions

  • action 是服務中可調用的公共方法,broker.call 或 ctx.call,具體 action 必須在 action 中,可以是一個函數,可以是一個對象

events

  • 事件訂閱

lifecycle events

  • 有一些生命周期服務事件,這些事件將由代理觸發。它們被放置在模式的根中
    • created:broker.loadService 或者 broker.createService 觸發
    • started:broker.start() 觸發
    • stopped:broker.stop() 觸發

methods

  • 創建私有方法,以供 action、event、lifecycle event 使用

dependencies

  • 如果您的服務依賴於其他服務,請使用架構中的依賴項屬性。服務在調用已啟動的生命周期事件處理程序之前等待依賴服務
  • 除了配做中添加 dependencies 屬性,也可以用 broker 實例進行外部設置
    • broker.waitForServices(["posts", "users"]),返回以惡 promise 對象
    • broker.waitForServices("accounts", 10 * 1000, 500),設置超時事件和

metadata

  • 元數據屬性,您可以在這里存儲有關服務的任何元信息。在服務函數中可以訪問到元數據
  • 元數據時可以被遠程節點獲取的

this

broker.createService

// 創建微服務實例方式之一
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        }
    }
});

load service from file

math.service.js

// Export the schema of service
module.exports = {
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },
        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
}
// Create broker
const broker = new ServiceBroker();

// Load service
broker.loadService("./math.service");

// Start broker
broker.start();

推薦使用這樣的方式,一目了然,不會在一個文件寫過多的代碼

Load multiple services from a folder

如果您有很多服務,建議將它們放到一個服務文件夾中,並使用 Serge.loadService s方法加載所有這些服務

broker.loadServices(folder = "./services", fileMask = "**/*.service.js");
// 從 ./services 文件夾(包括子文件夾)加載每個 *.service.js 文件
broker.loadServices();
// 從當前文件夾(包括子文件夾)加載每個 *.service.js 文件
broker.loadServices("./");
// 從“./svc”文件夾加載每個用戶*.service.js文件
broker.loadServices("./svc", "user*.service.js");

hot reloading services

Moleculer具有內置的熱重加載功能.在開發期間,注意只針對 service.js 文件的修改被啟動熱重啟,其他位置可以使用 nodemon

const broker = new ServiceBroker({
    hotReload: true
});

broker.loadService("./services/test.service.js");

Internal services

// 列出所有已知節點(包括本地節點)
broker.call("$node.list").then(res => console.log(res))

// 列出所有注冊的服務(本地和遠程)
broker.call("$node.services").then(res => console.log(res))

// 列出所有已注冊 action(本地和遠程)。
broker.call("$node.actions").then(res => console.log(res))

// 列出所有訂閱的事件
broker.call("$node.events").then(res => console.log(res))

// 列出本地節點的健康信息(包括進程和OS信息)
broker.call("$node.health").then(res => console.log(res));

action

action 是服務的可調用的公共方法。action 調用表示遠程過程調用(RPC)。它有請求參數&返回響應,就像HTTP請求一樣。如果您有多個服務實例,代理將在實例之間負載平衡請求

call services

若要調用服務,請使用 broke.Call 方法。代理查找具有給定 action service (可能在某一個節點上)並調用它。調用之后將會返回一個承諾

const res = await broker.call(actionName, params, opts)
  • params:參數是作為上下文的一部分傳遞給 action,action service 可以通過 ctx.params 訪問傳遞參數,這是可選的
  • ops:一個對象,用於設置或者覆蓋某些請求參數,例如:timeout、retry Count,這是可選的
    • tiemout:請求超時,以毫秒為單位。如果請求超時,而您沒有定義應急響應,將會報錯。若要禁用設置0,請執行以下操作。如果未定義,將會啟用 new ServiceBroker 中的 requestTimeout 設置
    • retries :請求重試次數,如果請求超時,代理將再次嘗試調用。若要禁用設置0。如果沒有定義,將啟用 new ServiceBroker 中的配置
    • fallbackResponse :若請求失敗就返回,這是一個 Function
    • nodeID:目標節點,如果設置,它將直接調用給定的節點
    • meta:請求元數據,通過操作處理程序中的 ctx.meta 訪問它,它也將在嵌套調用中被傳輸和合並
    • parentCtx:父親的上下文實例
    • requestID:請求ID或相關ID。它出現在標准事件中
broker.call("user.recommendation", { limit: 5 }, {
    timeout: 500,
    retries: 3,
    fallbackResponse: defaultRecommendation
}).then(res => console.log("Result: ", res));

meta

  • 元信息發送到具有元屬性的服務,通過 action 處理程序中的ctx.meta訪問它。請注意,在嵌套調用時,元被合並。

Streaming

  • Moleculer支持Node.js流作為請求參數和響應。使用它來傳輸從網關上傳的文件,或者編碼/解碼或壓縮/解壓縮流
const stream = fs.createReadStream(fileName);

broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});	
  • 請注意,參數應該是一個流,您不能向參數中添加更多的變量。使用元屬性傳輸其他數據。
  • 服務中接受流
module.exports = {
    name: "storage",
    actions: {
        save(ctx) {
            const s = fs.createWriteStream(`/tmp/${ctx.meta.filename}`);
            ctx.params.pipe(s);
        }
    }
};
  • 將流作為服務中的響應返回
module.exports = {
    name: "storage",
    actions: {
        get: {
            params: {
                filename: "string"
            },
            handler(ctx) {
                return fs.createReadStream(`/tmp/${ctx.params.filename}`);
            }
        }
    }
};
  • 調用方接受流
const filename = "avatar-123.jpg";
broker.call("storage.get", { filename })
    .then(stream => {
        const s = fs.createWriteStream(`./${filename}`);
        stream.pipe(s);
        s.on("close", () => broker.logger.info("File has been received"));
    })
  • AES編解碼示例服務
const crypto = require("crypto");
const password = "moleculer";

module.exports = {
    name: "aes",
    actions: {
        encrypt(ctx) {
            const encrypt = crypto.createCipher("aes-256-ctr", password);
            return ctx.params.pipe(encrypt);
        },

        decrypt(ctx) {
            const decrypt = crypto.createDecipher("aes-256-ctr", password);
            return ctx.params.pipe(decrypt);
        }
    }
};

action visibility

  • visibility:該屬性控制 action service 是否可見、可調用
    • published:公共的 action,它可以在本地調用,也可以遠程調用,並且可以通過API網關發布
    • pulic:公共的 action ,可以在本地或者遠程調用,但不能通過APIGW發布
    • protected:只能在本地 action service 調用(從本地服務調用)
    • private:只能在內部調用(通過 this.actions.xy() 內部服務)
    • 不設置,默認是 null,也就是 published,公共的
module.exports = {
    name: "posts",
    actions: {
        // It's published by default
        find(ctx) {},
        clean: {
            // Callable only via `this.actions.clean`
            visibility: "private",
            handler(ctx) {}
        }
    },
    methods: {
        cleanEntities() {
            // Call the action directly
            return this.actions.clean();
        }
    }
}

action hooks

  • 定義 action 鈎子來包裝來自混合器的某些 action
  • 有 before、after、error 鈎子,將其分配給指定的 action 或者所有 action service (*)
  • 鈎子可以是函數,也可以是字符串。字符串必須是本地服務方法名。
const DbService = require("moleculer-db");
// before hook
module.exports = {
    name: "posts",
    mixins: [DbService]
    hooks: {
        before: {
            // Define a global hook for all actions
            // The hook will call the `resolveLoggedUser` method.
            "*": "resolveLoggedUser",

            // Define multiple hooks
            remove: [
                function isAuthenticated(ctx) {
                    if (!ctx.user)
                        throw new Error("Forbidden");
                },
                function isOwner(ctx) {
                    if (!this.checkOwner(ctx.params.id, ctx.user.id))
                        throw new Error("Only owner can remove it.");
                }
            ]
        }
    },

    methods: {
        async resolveLoggedUser(ctx) {
            if (ctx.meta.user)
                ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
        }
    }
}
const DbService = require("moleculer-db");
// after hook
// error hook
module.exports = {
    name: "users",
    mixins: [DbService]
    hooks: {
        after: {
            // Define a global hook for all actions to remove sensitive data
            "*": function(ctx, res) {
                // Remove password
                delete res.password;

                // Please note, must return result (either the original or a new)
                return res;
            },
            get: [
                // Add a new virtual field to the entity
                async function (ctx, res) {
                    res.friends = await ctx.call("friends.count", { query: { follower: res._id }});

                    return res;
                },
                // Populate the `referrer` field
                async function (ctx, res) {
                    if (res.referrer)
                        res.referrer = await ctx.call("users.get", { id: res._id });

                    return res;
                }
            ]
        },
        error: {
            // Global error handler
            "*": function(ctx, err) {
                this.logger.error(`Error occurred when '${ctx.action.name}' action was called`, err);

                // Throw further the error
                throw err;
            }
        }
    }
};
  • 推薦的用例是創建混合元素,用方法填充服務,並在鈎子中設置方法名
module.exports = {
    methods: {
        checkIsAuthenticated(ctx) {
            if (!ctx.meta.user)
                throw new Error("Unauthenticated");
        },
        checkUserRole(ctx) {
            if (ctx.action.role && ctx.meta.user.role != ctx.action.role)
                throw new Error("Forbidden");
        },
        checkOwner(ctx) {
            // Check the owner of entity
        }
    }
}
// Use mixin methods in hooks
const MyAuthMixin = require("./my.mixin");

module.exports = {
    name: "posts",
    mixins: [MyAuthMixin]
    hooks: {
        before: {
            "*": ["checkIsAuthenticated"],
            create: ["checkUserRole"],
            update: ["checkUserRole", "checkOwner"],
            remove: ["checkUserRole", "checkOwner"]
        }
    },

    actions: {
        find: {
            // No required role
            handler(ctx) {}
        },
        create: {
            role: "admin",
            handler(ctx) {}
        },
        update: {
            role: "user",
            handler(ctx) {}
        }
    }
};

context

  • 當你去 call 一個 action service,broker 就會創建一個上下文 context 實例,這個實例包含着所有的請求信息,最后這些信息都會被當做 action service 中 handler 的一個參數 ctx 進行傳遞使用
  • 在 handler 中可以點出的上下文信息(屬性或者方法)
    • ctx.id:context id
    • ctx.broker:broker 對象實例
    • ctx.action:action 定義實例
    • ctx.nodeID:caller 或者 目標節點 id
    • ctx.requestID:請求ID,如果在 nested-calls 中使用,它將是相同的ID。
    • ctx.parentID:父親上下文實例 id(在 nested-calls 中使用)
    • ctx.params:請求參數,也就是 broker.call 中第二個參數具體設置
    • ctx.meta:請求元數據,它將會傳遞到 nested-calls 中
    • ctx.level:請求等級(在 nested-calls 內部使用),第一層等級為1
    • ctx.call():在 nested-calls 中觸發 action service,參數形式與 broker.call 一樣
    • ctx.emit():emit an event,same as broker.emit
    • ctx.broadcast():Broadcast an event, same as broker.broadcast
  • 優雅地關閉服務,請在代理選項中啟用上下文跟蹤功能。如果啟用它,所有服務都將在關閉之前等待所有正在運行的上下文
    • 一個超時值可以通過關閉Timeout Broker選項來定義。默認值為5秒
    • 在 action services 中,關閉超時設置可以通過 $Shupdown Timeout 屬性重寫
const broker = new ServiceBroker({
    nodeID: "node-1",
    tracking: {
        enabled: true,
        shutdownTimeout: 10 * 1000
    }
});
broker.call("posts.find", {}, { tracking: false }) // 關閉追蹤

event

  • Broker 有一個內置的事件總線來支持事件驅動體系結構,並將事件發送到本地和遠程服務
  • 事件偵聽器被排列成邏輯組,這意味着每個組中只觸發一個偵聽器
  • 例如你有兩個主要服務 users、payments,這兩個服務都訂閱了 user.created 事件。此時,從 users 服務上注冊 3 個具體實例,同時從 paymengs 服務上注冊 2 個具體實例,當 emit 觸發 user.created 事件,只有一個 user 和一個 payments 服務會被觸發,效果如下

  • 組名來自服務名稱,但可以在服務中的事件定義中覆蓋它。
module.exports = {
    name: "payment",
    events: {
        "order.created": {
            // Register handler to the "other" group instead of "payment" group.
            group: "other",
            handler(payload) {
                // ...
            }
        }
    }
}

Emit balanced events

  • broker.emit 函數發送平衡的事件,第一個參數是事件的名稱,第二個參數是傳遞的載荷,如果是復雜數據,可以傳遞一個對象
// The `user` will be serialized to transportation.
broker.emit("user.created", user);
  • 指定哪些組/服務接收事件
// Only the `mail` & `payments` services receives it
broker.emit("user.created", user, ["mail", "payments"]);

Broadcast event

  • 廣播事件被發送到所有可用的本地和遠程服務,它是不平衡的,所有服務實例都會收到它

  • 利用 broker.broadcast 發送廣播
broker.broadcast("config.changed", config);
  • 指定哪些組/服務接收事件
// Send to all "mail" service instances
broker.broadcast("user.created", { user }, "mail");

// Send to all "user" & "purchase" service instances.
broker.broadcast("user.created", { user }, ["user", "purchase"]);

Local broadcast event

  • Send broadcast events to only all local services with broker.broadcastLocal method
broker.broadcastLocal("config.changed", config);

Subscribe to events

  • 通過 service 中的屬性 event 可以訂閱具體事件,在事件名稱中可以使用通配符
module.exports = {
    events: {
        // Subscribe to `user.created` event
        "user.created"(user) {
            console.log("User created:", user);
        },

        // Subscribe to all `user` events
        "user.*"(user) {
            console.log("User event:", user);
        }

        // Subscribe to all internal events
        "$**"(payload, sender, event) {
            console.log(`Event '${event}' received from ${sender} node:`, payload);
        }
    }
}

Internal events

  • broker broadcasts 廣播內部事件,這些事件總是以$前綴開頭
  • $services.changed
    • 如果本地節點或遠程節點加載或破壞服務,代理將發送此事件
  • $circuit-breaker.opened
    • The broker sends this event when the circuit breaker module change its state to open
  • $circuit-breaker.half-opened
    • The broker sends this event when the circuit breaker module change its state to half-open.
  • $circuit-breaker.closed
    • The broker sends this event when the circuit breaker module change its state to closed.
  • $node.connected
    • The broker sends this event when a node connected or reconnected.
  • $node.updated
    • The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
  • $node.disconnected
    • The broker sends this event when a node disconnected (gracefully or unexpectedly).
  • $broker.started
    • The broker sends this event once broker.start() is called and all local services are started.
  • $broker.stopped
    • The broker sends this event once broker.stop() is called and all local services are stopped.
  • $transporter.connected
    • The transporter sends this event once the transporter is connected.
  • $transporter.disconnected
    • The transporter sends this event once the transporter is disconnected.

lifecycle

Broker lifecycle

  • starting logic
    • broker 啟動傳輸連接,但是不會將本地服務列表發送到遠程節點
    • 完成后,broker 將啟動所有服務(call service started handler)
    • 一旦所有服務啟動成功,broker 就會將本地服務列表發布到遠程節點上
    • 因此,遠程節點只有在所有本地服務正確啟動之后才能發送請求

  • avoid deadlocks
    • broker start...
    • user service has dependencies: ["post"]
    • posts service has dependencies: ["users"]
    • 這就死鎖了,按照順序加載,user 永元無法加載到依賴項 post
  • stopping logic
    • call broker.stop 或者停止進程
    • 首先,broker 會向遠程節點發送一個空的服務列表,所以他們可以將請求路由到其他實例而不是停止服務
    • 之后,broker 開始停止所有本地服務,之后 transporter 斷開連接

Service lifecycle

  • created event handler
    • broker.createService or broker.loadService 會觸發此事件
    • 函數內部拿到 broker 實例(this),還可以創建其他模塊實例,例如 http 服務器、數據庫模塊
const http = require("http");

module.exports = {
    name: "www",
    created() {
        // Create HTTP server
        this.server = http.createServer(this.httpHandler);
    }
};
// created function is sync event handler,can not use async/await
  • started event handler
    • 它被觸發的時候,代理會啟動所有的本地服務,而 broker 會啟動所有的本地服務。使用它連接到數據庫,偵聽服務器…等
module.exports = {
    name: "users",
    async started() {
        try {
            await this.db.connect();
        } catch(e) {
            throw new MoleculerServerError("Unable to connect to database.", e.message);
        }
    }
};
// started function is async handler. you can use async/await
  • stopped event handler
    • 它被觸發的時候,broker.stop 被調用和 broker 開始停止所有的本地服務。使用它關閉數據庫連接,關閉套接字…等
module.exports = {
    name: "users",
    async stopped() {
        try {
            await this.db.disconnect();
        } catch(e) {
            this.logger.warn("Unable to stop database connection gracefully.", e);
        }
    }
};
// stopped function is async handler. you can use async/await

logging

  • 在Moleculer框架中,所有核心模塊都有一個自定義記錄器實例。它們是從Broker記錄器實例繼承的,該實例可以在Broker選項中進行配置。

Built-in logger

  • Moleculer有一個內置控制台記錄器。這是默認的記錄器
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    nodeID: "node-100",
    // logger: true,
    logLevel: "info"
});

broker.createService({
    name: "posts",
    actions: {
        get(ctx) {
            this.logger.info("Log message via Service logger");
        }
    }
});

broker.start()
    .then(() => broker.call("posts.get"))
    .then(() => broker.logger.info("Log message via Broker logger"));
[2018-06-26T11:38:06.728Z] INFO  node-100/POSTS: Log message via Service logger
[2018-06-26T11:38:06.728Z] INFO  node-100/BROKER: Log message via Broker logger
[2018-06-26T11:38:06.730Z] INFO  node-100/BROKER: ServiceBroker is stopped. Good bye.
  • 可以使用Broker選項中的logLevel選項更改日志級別。只與內置控制台記錄器一起使用
const broker = new ServiceBroker({
    logger: true, // the `true` is same as `console`
    logLevel: "warn" // only logs the 'warn' & 'error' entries to the console
});
  • Available log levels: fatalerrorwarninfodebugtrace
  • 可以為每個Moleculer模塊設置日志級別。允許通配符使用
const broker = new ServiceBroker({
    logLevel: {
        "MY.**": false,         // Disable log
        "TRANS": "warn",        // Only 'warn ' and 'error' log entries
        "*.GREETER": "debug",   // All log entries
        "**": "info",           // All other modules use this level
    }
});
// 此設置是從上到下計算的,因此*級別必須是最后一項。
  • 有一些內置的日志格式化程序
    • default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
    • simple:INFO - Message
    • short:[13:36:30.968Z] INFO BROKER: Message
  • 可以為內置控制台記錄器設置自定義日志格式化程序函數
const broker = new ServiceBroker({ 
    logFormatter(level, args, bindings) {
        return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" ");
    }
});
broker.logger.warn("Warn message");
broker.logger.error("Error message");
WARN dev-pc: Warn message
ERROR dev-pc: Error message
  • 自定義對象&數組打印格式化程序

    • 設置一個自定義格式化程序函數來打印對象和數組。默認函數將對象和數組打印到一行,以便便於使用外部日志工具進行處理。但是,當您正在開發時,將對象打印成人類可讀的多行格式將是有用的。為此,在代理選項中覆蓋logObjectPrint函數。
const util = require("util");

const broker = new ServiceBroker({  
    logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 })
});
broker.logger.warn(process.release);
[2017-08-18T12:37:25.720Z] INFO  dev-pc/BROKER: { name: 'node',
  lts: 'Carbon',
  sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz',
  headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }

External loggers

  • 外部記錄器可以與Moleculer一起使用。在這種情況下,將創建者函數設置為LOGER。當一個新模塊繼承一個新的記錄器實例時,ServiceBroker將調用它
// pino
const pino = require("pino")({ level: "info" });
const broker = new ServiceBroker({ 
    logger: bindings => pino.child(bindings)
});
// bunyan
const bunyan = require("bunyan");
const logger = bunyan.createLogger({ name: "moleculer", level: "info" });
const broker = new ServiceBroker({ 
    logger: bindings => logger.child(bindings)
});

middlewares

networking

要通信其他節點(ServiceBrokers),您需要配置一個傳輸程序。大多數傳輸者連接到中心消息代理服務器,該服務器負責節點之間的消息傳輸。這些消息代理主要支持發布/訂閱消息傳遞模式

Transporters

如果要在多個節點上運行服務,傳輸程序是一個重要的模塊。傳送器與其他節點通信。它傳輸事件、調用請求和處理響應… 如果一個服務在不同節點上的多個實例上運行,則請求將在活動節點之間實現負載平衡

整個通信邏輯是在傳輸類之外的。這意味着在不改變代碼行的情況下,在傳送器之間切換是很容易的。

Moleculer框架中有幾個內置的運輸機。

NATS

NATS服務器是一個簡單、高性能的開源消息傳遞系統,用於雲本機應用程序、物聯網消息傳遞和微服務體系結構。

let { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "nats://nats.server:4222"
});

使用 nats 傳輸需要安裝 nats 模塊 npm install nats

// Connect to 'nats://localhost:4222'
const broker = new ServiceBroker({
    transporter: "NATS"
});

// Connect to a remote NATS server
const broker = new ServiceBroker({
    transporter: "nats://nats-server:4222"
});

// Connect with options
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            user: "admin",
            pass: "1234"
        }
    }
});

// Connect with TLS
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            // More info: https://github.com/nats-io/node-nats#tls
            tls: {
                key: fs.readFileSync('./client-key.pem'),
                cert: fs.readFileSync('./client-cert.pem'),
                ca: [ fs.readFileSync('./ca.pem') ]
            }
        }
    }
});

Serialization

傳輸程序需要一個序列化模塊來序列化和反序列化傳輸的數據包。默認的串行化程序是JSONS序列化程序,但是有幾個內置的串行化程序。

const { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "NATS",
    serializer: "ProtoBuf"
});
  • JSON serializer:這是內置的默認序列化程序。它將數據包序列化為JSON字符串,並將接收到的數據反序列化為數據包。
const broker = new ServiceBroker({
    // serializer: "JSON" // don't need to set, because it is the default
});

Load balancing

Built-in strategies

若要配置策略,請在注冊表屬性下設置策略代理選項。它可以是一個名稱(在內置策略的情況下),也可以是一個策略類(在自定義策略的情況下)。

Random strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "Random"
    }
});

RoundRobin strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "RoundRobin"
    }
});

CPU usage-based strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "CpuUsage"
    }
});

Fault tolerance

Circuit Breaker

  • Moleculer有一個內置的斷路器解決方案.這是一個基於閾值的實現。它使用一個時間窗口來檢查失敗的請求率。一旦達到閾值,它就會觸發斷路器。
  • 電路斷路器可以防止應用程序重復嘗試執行可能失敗的操作。允許它繼續,而不等待故障被修復或浪費CPU周期,而它確定故障是長期的。斷路器模式還允許應用程序檢測故障是否已經解決。如果問題似乎已經解決,應用程序可以嘗試調用操作。
  • 如果啟用它,所有服務調用都將受到此內置斷路器的保護。
  • 在代理選項中啟用它
const broker = new ServiceBroker({
    circuitBreaker: {
        enabled: true,
        threshold: 0.5,
        minRequestCount: 20,
        windowTime: 60, // in seconds
        halfOpenTime: 5 * 1000, // in milliseconds
        check: err => err && err.code >= 500
    }
});
  • settings
    • enabled:是否啟動此功能,默認是 false
    • threshold:閾值,默認0.5,意味着50%的跳閘失敗
    • minRequestCount:最小請求數,默認20,在它下面,回調函數不會觸發
    • windowTime:時間窗口的秒數,默認60秒
    • halfOpenTime:從打開狀態切換到半打開狀態的毫秒數,默認10000毫秒
    • check:檢查失敗請求的函數,默認 err && err.code >= 500
  • 如果斷路器狀態發生更改,ServiceBroker將發送內部事件
  • 這些全局選項也可以在操作定義中重寫。
// users.service.js
module.export = {
    name: "users",
    actions: {
        create: {
            circuitBreaker: {
                // All CB options can be overwritten from broker options.
                threshold: 0.3,
                windowTime: 30
            },
            handler(ctx) {}
        }
    }
};

Retry

  • 重試解決方案
const broker = new ServiceBroker({
    retryPolicy: {
        enabled: true,
        retries: 5,
        delay: 100,
        maxDelay: 2000,
        factor: 2,
        check: err => err && !!err.retryable
    }
});
  • settings
    • enabled:是否啟用,默認 false
    • retries:重試的次數,默認5次
    • delay:第一次延遲以毫秒為單位,默認 100
    • maxDelay:最大延遲(以毫秒為單位),默認 2000
    • factor:延遲退避系數,默認是 2,表示指數退避
    • check:檢查失敗請求的函數,err && !!err.retryable
  • 在調用選項中覆蓋retry值
broker.call("posts.find", {}, { retries: 3 });
  • 在操作定義中覆蓋重試策略值
// users.service.js
module.export = {
    name: "users",
    actions: {
        find: {
            retryPolicy: {
                // All Retry policy options can be overwritten from broker options.
                retries: 3,
                delay: 500
            },
            handler(ctx) {}
        },
        create: {
            retryPolicy: {
                // Disable retries for this action
                enabled: false
            },
            handler(ctx) {}
        }
    }
};

Timeout

  • 可以為服務調用設置超時。它可以在代理選項或調用選項中全局設置。如果定義了超時並且請求超時,代理將拋出RequestTimeoutError錯誤。
const broker = new ServiceBroker({
    requestTimeout: 5 * 1000 // in seconds
});
  • 覆蓋調用選項中的超時值
broker.call("posts.find", {}, { timeout: 3000 });
  • 分布式超時:Moleculer使用分布式超時。在嵌套調用的情況下,超時值會隨着時間的推移而遞減。如果超時值小於或等於0,則跳過下一個嵌套調用(RequestSkippedError),因為第一個調用已被RequestTimeoutError錯誤拒絕。

Bulkhead

  • 在Moleculer框架中實現了艙壁特性,以控制動作的並發請求處理。
const broker = new ServiceBroker({
    bulkhead: {
        enabled: true,
        concurrency: 3,
        maxQueueSize: 10,
    }
});
  • settings

    • enabled:是否啟動,默認 false

    • concurreny:最大限度的並行數量,默認3

    • maxQueueSize:最大隊列大小,默認10

    • concurreny 值限制並發請求執行

    • 如果 maxQueueSize大於0,則如果所有插槽都被占用,則 broker 將額外的請求存儲在隊列中

    • 如果隊列大小達到maxQueueSize限制或為0,則 Broker 將對每個添加請求拋出QueueIsFull異常

  • 這些全局選項也可以在操作定義中重寫

// users.service.js
// 在操作定義中覆蓋重試策略值
module.export = {
    name: "users",
    actions: {
        find: {
            bulkhead: {
                enabled: false
            },
            handler(ctx) {}
        },
        create: {
            bulkhead: {
                // Increment the concurrency value
                // for this action
                concurrency: 10
            },
            handler(ctx) {}
        }
    }
};

Fallback

  • 當您不想將錯誤返回給用戶時,回退功能是非常有用的。相反,調用其他操作或返回一些常見的內容。可以在調用選項或操作定義中設置回退響應。
  • 它應該是一個返回包含任何內容的承諾的函數。borker 將當前 context&Error對象作為參數傳遞給此函數。
// fallback settings in calling options 
const result = await broker.call("users.recommendation", { userID: 5 }, {
    timeout: 500,
    fallbackResponse(ctx, err) {
        // Return a common response from cache
        return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID);
    }
});
  • 回退響應也可以在接收端,在 action 中定義
  • 請注意,只有在action 處理程序中發生錯誤時,才會使用此回退響應。如果從遠程節點調用請求,並且請求在遠程節點上超時,則不使用回退響應。在這種情況下,在調用選項中使用回退響應。
// fallback as a function
module.exports = {
    name: "recommends",
    actions: {
        add: {
            fallback: (ctx, err) => "Some cached result",
            //fallback: "fakeResult",
            handler(ctx) {
                // Do something
            }
        }
    }
};
// fallback as method name
module.exports = {
    name: "recommends",
    actions: {
        add: {
            // Call the 'getCachedResult' method when error occurred
            fallback: "getCachedResult",
            handler(ctx) {
                // Do something
            }
        }
    },

    methods: {
        getCachedResult(ctx, err) {
            return "Some cached result";
        }
    }
};

NATS

幫助文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM