new ServiceBroker
default settings
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();
custom settings
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
logLevel: "info"
});
communicate with remote nodes
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "node-1",
transporter: "nats://localhost:4222",
logLevel: "debug",
requestTimeout: 5 * 1000,
requestRetry: 3
});
broker options
- logLevel
- type:string
- default:info
- des:可選項目還有 trace、debug、 info、 warn、 error、 fatal
- middlewares
- type:Array<Function>
- default:null
- des:中間件
- created
- type:Function
- default:null
- des:broker 實例被創建的時候將會觸發此函數
- started
- type:Function
- default:null
- des:broker 實例開始執行時觸發此函數
- stopped
- type:Function
- default:null
- des:broker 實例停止執行時觸發此函數
- hotReload
- type:Boolean
- default:false
- des:是否啟動熱加載
- cacher
- type:String、Object、Cacher
- default:null
- des:若是啟動緩存,兩個相同模型的 broker.call,只有第一個 call 會讓 action 中對應 handler 完整的執行一遍,第二個 call 就不會了,它會直接從緩存中取數據,不常用
- https://moleculer.services/docs/0.13/caching.html
- transporter
- type:string、object、Transporter
- default:null
- des:多個節點通訊,需要一個傳輸中心,這就是消息代理服務器,這是微服務核心。一般選擇 NATS,這需要安裝 NATS 依賴包
- https://moleculer.services/docs/0.13/networking.html#Transporters
- serializer
- type:string、Serializer
- default:JSONSerializer
- des:系統默認以 json 格式序列化,不常用
- https://moleculer.services/docs/0.13/networking.html#Serialization
- nodeID
- type:string
- default:hostname + PID
- des:這是節點的id,掛載在 某一個 namespace 中是不能夠同名的
- namespace
- type:string
- defalut:”“
- des:分割一個網咯中的不同區域,基本上用不到,除非項目特別復雜,子服務特別多
- requestTimeout
- type:Number
- default:0
- des:請求超時設置,單位毫秒
createService
該服務表示Moleculer框架中的一個微服務。您可以定義操作並訂閱事件。若要創建服務,必須定義架構。服務模式類似於VueJS的一個組件
// 定義了兩個actions
broker.createService({
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
},
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
}
}
});
name
- 強制屬性,最后去 call 某一個微服務的時候必須帶上 name
version
- 可以區分同一個 action 不同版本,一般不帶
broker.call("v1.math.add")
- restful call:https://moleculer.services/docs/0.13/moleculer-web.html
settings
- 此屬性相當於倉庫
- 賦值可以是對象,對象中設置任意鍵值對,action 中通過
this.settings.xxxx
能夠訪問到設置項 - 遠程節點上可以獲得這些設置項
- 有一些內部設置是由核心模塊使用的。這些設置名稱以$(美元符號)開頭
- $noVersionPrefix
- type:Boolean
- default:false
- des:禁用 action 版本前綴
- $noServiceNamePrefix
- type:Boolean
- default:false
- des:禁用 action 中的服務名稱前綴。
- $dependencyTimeout
- type:Number
- default:0
- des:依賴等待超時
- $shutdownTimeout
- type:Number
- default:0
- des:關閉時等待活動請求的超時
- $noVersionPrefix
- 賦值可以是對象,對象中設置任意鍵值對,action 中通過
mixins
Mixins是一種為Moleculer服務分發可重用功能的靈活方法。服務構造函數將這些混合與當前架構合並。它是在您的服務中擴展其他服務。當服務使用混音時,混音中的所有屬性都將“混合”到當前服務中。
const ApiGwService = require("moleculer-web");
module.exports = {
name: "api",
mixins: [ApiGwService]
settings: {
// Change port setting
port: 8080
},
actions: {
myAction() {
// Add a new action to apiGwService service
}
}
}
上面的示例創建了一個API服務,該服務繼承了ApiGwService的所有內容,但是覆蓋了端口設置,並使用新的myAction操作對其進行了擴展
actions
-
action 是服務中可調用的公共方法,broker.call 或 ctx.call,具體 action 必須在 action 中,可以是一個函數,可以是一個對象
- 調用時
- const res = await broker.call("math.add", { a: 5, b: 7 })
- const res = await broker.call("math.mult", { a: 10, b: 31 })
- https://moleculer.services/docs/0.13/services.html#Actions
- 調用時
events
- 事件訂閱
lifecycle events
- 有一些生命周期服務事件,這些事件將由代理觸發。它們被放置在模式的根中
- created:broker.loadService 或者 broker.createService 觸發
- started:broker.start() 觸發
- stopped:broker.stop() 觸發
methods
- 創建私有方法,以供 action、event、lifecycle event 使用
dependencies
- 如果您的服務依賴於其他服務,請使用架構中的依賴項屬性。服務在調用已啟動的生命周期事件處理程序之前等待依賴服務
- 除了配做中添加 dependencies 屬性,也可以用 broker 實例進行外部設置
- broker.waitForServices(["posts", "users"]),返回以惡 promise 對象
- broker.waitForServices("accounts", 10 * 1000, 500),設置超時事件和
metadata
- 元數據屬性,您可以在這里存儲有關服務的任何元信息。在服務函數中可以訪問到元數據
- 元數據時可以被遠程節點獲取的
this
- 微服務中 this 指向微服務的實例,有些方法是可以直接被點出來的,具體查看請點擊
- https://moleculer.services/docs/0.13/services.html#Properties-of-Service-instances
broker.createService
// 創建微服務實例方式之一
broker.createService({
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
}
}
});
load service from file
math.service.js
// Export the schema of service
module.exports = {
name: "math",
actions: {
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
},
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
}
}
}
// Create broker
const broker = new ServiceBroker();
// Load service
broker.loadService("./math.service");
// Start broker
broker.start();
推薦使用這樣的方式,一目了然,不會在一個文件寫過多的代碼
Load multiple services from a folder
如果您有很多服務,建議將它們放到一個服務文件夾中,並使用 Serge.loadService s方法加載所有這些服務
broker.loadServices(folder = "./services", fileMask = "**/*.service.js");
// 從 ./services 文件夾(包括子文件夾)加載每個 *.service.js 文件
broker.loadServices();
// 從當前文件夾(包括子文件夾)加載每個 *.service.js 文件
broker.loadServices("./");
// 從“./svc”文件夾加載每個用戶*.service.js文件
broker.loadServices("./svc", "user*.service.js");
hot reloading services
Moleculer具有內置的熱重加載功能.在開發期間,注意只針對 service.js 文件的修改被啟動熱重啟,其他位置可以使用 nodemon
const broker = new ServiceBroker({
hotReload: true
});
broker.loadService("./services/test.service.js");
Internal services
// 列出所有已知節點(包括本地節點)
broker.call("$node.list").then(res => console.log(res))
// 列出所有注冊的服務(本地和遠程)
broker.call("$node.services").then(res => console.log(res))
// 列出所有已注冊 action(本地和遠程)。
broker.call("$node.actions").then(res => console.log(res))
// 列出所有訂閱的事件
broker.call("$node.events").then(res => console.log(res))
// 列出本地節點的健康信息(包括進程和OS信息)
broker.call("$node.health").then(res => console.log(res));
action
action 是服務的可調用的公共方法。action 調用表示遠程過程調用(RPC)。它有請求參數&返回響應,就像HTTP請求一樣。如果您有多個服務實例,代理將在實例之間負載平衡請求
call services
若要調用服務,請使用 broke.Call 方法。代理查找具有給定 action service (可能在某一個節點上)並調用它。調用之后將會返回一個承諾
const res = await broker.call(actionName, params, opts)
- params:參數是作為上下文的一部分傳遞給 action,action service 可以通過 ctx.params 訪問傳遞參數,這是可選的
- ops:一個對象,用於設置或者覆蓋某些請求參數,例如:timeout、retry Count,這是可選的
- tiemout:請求超時,以毫秒為單位。如果請求超時,而您沒有定義應急響應,將會報錯。若要禁用設置0,請執行以下操作。如果未定義,將會啟用 new ServiceBroker 中的 requestTimeout 設置
- retries :請求重試次數,如果請求超時,代理將再次嘗試調用。若要禁用設置0。如果沒有定義,將啟用 new ServiceBroker 中的配置
- fallbackResponse :若請求失敗就返回,這是一個 Function
- nodeID:目標節點,如果設置,它將直接調用給定的節點
- meta:請求元數據,通過操作處理程序中的 ctx.meta 訪問它,它也將在嵌套調用中被傳輸和合並
- parentCtx:父親的上下文實例
- requestID:請求ID或相關ID。它出現在標准事件中
broker.call("user.recommendation", { limit: 5 }, {
timeout: 500,
retries: 3,
fallbackResponse: defaultRecommendation
}).then(res => console.log("Result: ", res));
meta
- 元信息發送到具有元屬性的服務,通過 action 處理程序中的ctx.meta訪問它。請注意,在嵌套調用時,元被合並。
Streaming
- Moleculer支持Node.js流作為請求參數和響應。使用它來傳輸從網關上傳的文件,或者編碼/解碼或壓縮/解壓縮流
const stream = fs.createReadStream(fileName);
broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});
- 請注意,參數應該是一個流,您不能向參數中添加更多的變量。使用元屬性傳輸其他數據。
- 服務中接受流
module.exports = {
name: "storage",
actions: {
save(ctx) {
const s = fs.createWriteStream(`/tmp/${ctx.meta.filename}`);
ctx.params.pipe(s);
}
}
};
- 將流作為服務中的響應返回
module.exports = {
name: "storage",
actions: {
get: {
params: {
filename: "string"
},
handler(ctx) {
return fs.createReadStream(`/tmp/${ctx.params.filename}`);
}
}
}
};
- 調用方接受流
const filename = "avatar-123.jpg";
broker.call("storage.get", { filename })
.then(stream => {
const s = fs.createWriteStream(`./${filename}`);
stream.pipe(s);
s.on("close", () => broker.logger.info("File has been received"));
})
- AES編解碼示例服務
const crypto = require("crypto");
const password = "moleculer";
module.exports = {
name: "aes",
actions: {
encrypt(ctx) {
const encrypt = crypto.createCipher("aes-256-ctr", password);
return ctx.params.pipe(encrypt);
},
decrypt(ctx) {
const decrypt = crypto.createDecipher("aes-256-ctr", password);
return ctx.params.pipe(decrypt);
}
}
};
action visibility
- visibility:該屬性控制 action service 是否可見、可調用
- published:公共的 action,它可以在本地調用,也可以遠程調用,並且可以通過API網關發布
- pulic:公共的 action ,可以在本地或者遠程調用,但不能通過APIGW發布
- protected:只能在本地 action service 調用(從本地服務調用)
- private:只能在內部調用(通過 this.actions.xy() 內部服務)
- 不設置,默認是 null,也就是 published,公共的
module.exports = {
name: "posts",
actions: {
// It's published by default
find(ctx) {},
clean: {
// Callable only via `this.actions.clean`
visibility: "private",
handler(ctx) {}
}
},
methods: {
cleanEntities() {
// Call the action directly
return this.actions.clean();
}
}
}
action hooks
- 定義 action 鈎子來包裝來自混合器的某些 action
- 有 before、after、error 鈎子,將其分配給指定的 action 或者所有 action service (*)
- 鈎子可以是函數,也可以是字符串。字符串必須是本地服務方法名。
const DbService = require("moleculer-db");
// before hook
module.exports = {
name: "posts",
mixins: [DbService]
hooks: {
before: {
// Define a global hook for all actions
// The hook will call the `resolveLoggedUser` method.
"*": "resolveLoggedUser",
// Define multiple hooks
remove: [
function isAuthenticated(ctx) {
if (!ctx.user)
throw new Error("Forbidden");
},
function isOwner(ctx) {
if (!this.checkOwner(ctx.params.id, ctx.user.id))
throw new Error("Only owner can remove it.");
}
]
}
},
methods: {
async resolveLoggedUser(ctx) {
if (ctx.meta.user)
ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
}
}
}
const DbService = require("moleculer-db");
// after hook
// error hook
module.exports = {
name: "users",
mixins: [DbService]
hooks: {
after: {
// Define a global hook for all actions to remove sensitive data
"*": function(ctx, res) {
// Remove password
delete res.password;
// Please note, must return result (either the original or a new)
return res;
},
get: [
// Add a new virtual field to the entity
async function (ctx, res) {
res.friends = await ctx.call("friends.count", { query: { follower: res._id }});
return res;
},
// Populate the `referrer` field
async function (ctx, res) {
if (res.referrer)
res.referrer = await ctx.call("users.get", { id: res._id });
return res;
}
]
},
error: {
// Global error handler
"*": function(ctx, err) {
this.logger.error(`Error occurred when '${ctx.action.name}' action was called`, err);
// Throw further the error
throw err;
}
}
}
};
- 推薦的用例是創建混合元素,用方法填充服務,並在鈎子中設置方法名
module.exports = {
methods: {
checkIsAuthenticated(ctx) {
if (!ctx.meta.user)
throw new Error("Unauthenticated");
},
checkUserRole(ctx) {
if (ctx.action.role && ctx.meta.user.role != ctx.action.role)
throw new Error("Forbidden");
},
checkOwner(ctx) {
// Check the owner of entity
}
}
}
// Use mixin methods in hooks
const MyAuthMixin = require("./my.mixin");
module.exports = {
name: "posts",
mixins: [MyAuthMixin]
hooks: {
before: {
"*": ["checkIsAuthenticated"],
create: ["checkUserRole"],
update: ["checkUserRole", "checkOwner"],
remove: ["checkUserRole", "checkOwner"]
}
},
actions: {
find: {
// No required role
handler(ctx) {}
},
create: {
role: "admin",
handler(ctx) {}
},
update: {
role: "user",
handler(ctx) {}
}
}
};
context
- 當你去 call 一個 action service,broker 就會創建一個上下文 context 實例,這個實例包含着所有的請求信息,最后這些信息都會被當做 action service 中 handler 的一個參數 ctx 進行傳遞使用
- 在 handler 中可以點出的上下文信息(屬性或者方法)
- ctx.id:context id
- ctx.broker:broker 對象實例
- ctx.action:action 定義實例
- ctx.nodeID:caller 或者 目標節點 id
- ctx.requestID:請求ID,如果在 nested-calls 中使用,它將是相同的ID。
- ctx.parentID:父親上下文實例 id(在 nested-calls 中使用)
- ctx.params:請求參數,也就是 broker.call 中第二個參數具體設置
- ctx.meta:請求元數據,它將會傳遞到 nested-calls 中
- ctx.level:請求等級(在 nested-calls 內部使用),第一層等級為1
- ctx.call():在 nested-calls 中觸發 action service,參數形式與 broker.call 一樣
- ctx.emit():emit an event,same as broker.emit
- ctx.broadcast():Broadcast an event, same as broker.broadcast
- 優雅地關閉服務,請在代理選項中啟用上下文跟蹤功能。如果啟用它,所有服務都將在關閉之前等待所有正在運行的上下文
- 一個超時值可以通過關閉Timeout Broker選項來定義。默認值為5秒
- 在 action services 中,關閉超時設置可以通過 $Shupdown Timeout 屬性重寫
const broker = new ServiceBroker({
nodeID: "node-1",
tracking: {
enabled: true,
shutdownTimeout: 10 * 1000
}
});
broker.call("posts.find", {}, { tracking: false }) // 關閉追蹤
event
- Broker 有一個內置的事件總線來支持事件驅動體系結構,並將事件發送到本地和遠程服務
- 事件偵聽器被排列成邏輯組,這意味着每個組中只觸發一個偵聽器
- 例如你有兩個主要服務 users、payments,這兩個服務都訂閱了 user.created 事件。此時,從 users 服務上注冊 3 個具體實例,同時從 paymengs 服務上注冊 2 個具體實例,當 emit 觸發 user.created 事件,只有一個 user 和一個 payments 服務會被觸發,效果如下
- 組名來自服務名稱,但可以在服務中的事件定義中覆蓋它。
module.exports = {
name: "payment",
events: {
"order.created": {
// Register handler to the "other" group instead of "payment" group.
group: "other",
handler(payload) {
// ...
}
}
}
}
Emit balanced events
- broker.emit 函數發送平衡的事件,第一個參數是事件的名稱,第二個參數是傳遞的載荷,如果是復雜數據,可以傳遞一個對象
// The `user` will be serialized to transportation.
broker.emit("user.created", user);
- 指定哪些組/服務接收事件
// Only the `mail` & `payments` services receives it
broker.emit("user.created", user, ["mail", "payments"]);
Broadcast event
- 廣播事件被發送到所有可用的本地和遠程服務,它是不平衡的,所有服務實例都會收到它
- 利用
broker.broadcast
發送廣播
broker.broadcast("config.changed", config);
- 指定哪些組/服務接收事件
// Send to all "mail" service instances
broker.broadcast("user.created", { user }, "mail");
// Send to all "user" & "purchase" service instances.
broker.broadcast("user.created", { user }, ["user", "purchase"]);
Local broadcast event
- Send broadcast events to only all local services with
broker.broadcastLocal
method
broker.broadcastLocal("config.changed", config);
Subscribe to events
- 通過 service 中的屬性 event 可以訂閱具體事件,在事件名稱中可以使用通配符
module.exports = {
events: {
// Subscribe to `user.created` event
"user.created"(user) {
console.log("User created:", user);
},
// Subscribe to all `user` events
"user.*"(user) {
console.log("User event:", user);
}
// Subscribe to all internal events
"$**"(payload, sender, event) {
console.log(`Event '${event}' received from ${sender} node:`, payload);
}
}
}
Internal events
- broker broadcasts 廣播內部事件,這些事件總是以$前綴開頭
- $services.changed
- 如果本地節點或遠程節點加載或破壞服務,代理將發送此事件
- $circuit-breaker.opened
- The broker sends this event when the circuit breaker module change its state to
open
- The broker sends this event when the circuit breaker module change its state to
- $circuit-breaker.half-opened
- The broker sends this event when the circuit breaker module change its state to
half-open
.
- The broker sends this event when the circuit breaker module change its state to
- $circuit-breaker.closed
- The broker sends this event when the circuit breaker module change its state to
closed
.
- The broker sends this event when the circuit breaker module change its state to
- $node.connected
- The broker sends this event when a node connected or reconnected.
- $node.updated
- The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
- $node.disconnected
- The broker sends this event when a node disconnected (gracefully or unexpectedly).
- $broker.started
- The broker sends this event once
broker.start()
is called and all local services are started.
- The broker sends this event once
- $broker.stopped
- The broker sends this event once
broker.stop()
is called and all local services are stopped.
- The broker sends this event once
- $transporter.connected
- The transporter sends this event once the transporter is connected.
- $transporter.disconnected
- The transporter sends this event once the transporter is disconnected.
lifecycle
Broker lifecycle
- starting logic
- broker 啟動傳輸連接,但是不會將本地服務列表發送到遠程節點
- 完成后,broker 將啟動所有服務(call service
started
handler) - 一旦所有服務啟動成功,broker 就會將本地服務列表發布到遠程節點上
- 因此,遠程節點只有在所有本地服務正確啟動之后才能發送請求
- avoid deadlocks
- broker start...
user
service hasdependencies: ["post"]
posts
service hasdependencies: ["users"]
- 這就死鎖了,按照順序加載,user 永元無法加載到依賴項 post
- stopping logic
- call
broker.stop
或者停止進程 - 首先,broker 會向遠程節點發送一個空的服務列表,所以他們可以將請求路由到其他實例而不是停止服務
- 之后,broker 開始停止所有本地服務,之后 transporter 斷開連接
- call
Service lifecycle
- created event handler
broker.createService
orbroker.loadService
會觸發此事件- 函數內部拿到 broker 實例(this),還可以創建其他模塊實例,例如 http 服務器、數據庫模塊
const http = require("http");
module.exports = {
name: "www",
created() {
// Create HTTP server
this.server = http.createServer(this.httpHandler);
}
};
// created function is sync event handler,can not use async/await
- started event handler
- 它被觸發的時候,代理會啟動所有的本地服務,而 broker 會啟動所有的本地服務。使用它連接到數據庫,偵聽服務器…等
module.exports = {
name: "users",
async started() {
try {
await this.db.connect();
} catch(e) {
throw new MoleculerServerError("Unable to connect to database.", e.message);
}
}
};
// started function is async handler. you can use async/await
- stopped event handler
- 它被觸發的時候,
broker.stop
被調用和 broker 開始停止所有的本地服務。使用它關閉數據庫連接,關閉套接字…等
- 它被觸發的時候,
module.exports = {
name: "users",
async stopped() {
try {
await this.db.disconnect();
} catch(e) {
this.logger.warn("Unable to stop database connection gracefully.", e);
}
}
};
// stopped function is async handler. you can use async/await
logging
- 在Moleculer框架中,所有核心模塊都有一個自定義記錄器實例。它們是從Broker記錄器實例繼承的,該實例可以在Broker選項中進行配置。
Built-in logger
- Moleculer有一個內置控制台記錄器。這是默認的記錄器
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "node-100",
// logger: true,
logLevel: "info"
});
broker.createService({
name: "posts",
actions: {
get(ctx) {
this.logger.info("Log message via Service logger");
}
}
});
broker.start()
.then(() => broker.call("posts.get"))
.then(() => broker.logger.info("Log message via Broker logger"));
[2018-06-26T11:38:06.728Z] INFO node-100/POSTS: Log message via Service logger
[2018-06-26T11:38:06.728Z] INFO node-100/BROKER: Log message via Broker logger
[2018-06-26T11:38:06.730Z] INFO node-100/BROKER: ServiceBroker is stopped. Good bye.
- 可以使用Broker選項中的logLevel選項更改日志級別。只與內置控制台記錄器一起使用
const broker = new ServiceBroker({
logger: true, // the `true` is same as `console`
logLevel: "warn" // only logs the 'warn' & 'error' entries to the console
});
- Available log levels:
fatal
,error
,warn
,info
,debug
,trace
- 可以為每個Moleculer模塊設置日志級別。允許通配符使用
const broker = new ServiceBroker({
logLevel: {
"MY.**": false, // Disable log
"TRANS": "warn", // Only 'warn ' and 'error' log entries
"*.GREETER": "debug", // All log entries
"**": "info", // All other modules use this level
}
});
// 此設置是從上到下計算的,因此*級別必須是最后一項。
- 有一些內置的日志格式化程序
- default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
- simple:INFO - Message
- short:[13:36:30.968Z] INFO BROKER: Message
- 可以為內置控制台記錄器設置自定義日志格式化程序函數
const broker = new ServiceBroker({
logFormatter(level, args, bindings) {
return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" ");
}
});
broker.logger.warn("Warn message");
broker.logger.error("Error message");
WARN dev-pc: Warn message
ERROR dev-pc: Error message
-
自定義對象&數組打印格式化程序
- 設置一個自定義格式化程序函數來打印對象和數組。默認函數將對象和數組打印到一行,以便便於使用外部日志工具進行處理。但是,當您正在開發時,將對象打印成人類可讀的多行格式將是有用的。為此,在代理選項中覆蓋logObjectPrint函數。
const util = require("util");
const broker = new ServiceBroker({
logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 })
});
broker.logger.warn(process.release);
[2017-08-18T12:37:25.720Z] INFO dev-pc/BROKER: { name: 'node',
lts: 'Carbon',
sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz',
headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }
External loggers
- 外部記錄器可以與Moleculer一起使用。在這種情況下,將創建者函數設置為LOGER。當一個新模塊繼承一個新的記錄器實例時,ServiceBroker將調用它
// pino
const pino = require("pino")({ level: "info" });
const broker = new ServiceBroker({
logger: bindings => pino.child(bindings)
});
// bunyan
const bunyan = require("bunyan");
const logger = bunyan.createLogger({ name: "moleculer", level: "info" });
const broker = new ServiceBroker({
logger: bindings => logger.child(bindings)
});
middlewares
networking
要通信其他節點(ServiceBrokers),您需要配置一個傳輸程序。大多數傳輸者連接到中心消息代理服務器,該服務器負責節點之間的消息傳輸。這些消息代理主要支持發布/訂閱消息傳遞模式
Transporters
如果要在多個節點上運行服務,傳輸程序是一個重要的模塊。傳送器與其他節點通信。它傳輸事件、調用請求和處理響應… 如果一個服務在不同節點上的多個實例上運行,則請求將在活動節點之間實現負載平衡
整個通信邏輯是在傳輸類之外的。這意味着在不改變代碼行的情況下,在傳送器之間切換是很容易的。
Moleculer框架中有幾個內置的運輸機。
NATS
NATS服務器是一個簡單、高性能的開源消息傳遞系統,用於雲本機應用程序、物聯網消息傳遞和微服務體系結構。
let { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "server-1",
transporter: "nats://nats.server:4222"
});
使用 nats 傳輸需要安裝 nats 模塊 npm install nats
// Connect to 'nats://localhost:4222'
const broker = new ServiceBroker({
transporter: "NATS"
});
// Connect to a remote NATS server
const broker = new ServiceBroker({
transporter: "nats://nats-server:4222"
});
// Connect with options
const broker = new ServiceBroker({
transporter: {
type: "NATS",
options: {
url: "nats://localhost:4222"
user: "admin",
pass: "1234"
}
}
});
// Connect with TLS
const broker = new ServiceBroker({
transporter: {
type: "NATS",
options: {
url: "nats://localhost:4222"
// More info: https://github.com/nats-io/node-nats#tls
tls: {
key: fs.readFileSync('./client-key.pem'),
cert: fs.readFileSync('./client-cert.pem'),
ca: [ fs.readFileSync('./ca.pem') ]
}
}
}
});
Serialization
傳輸程序需要一個序列化模塊來序列化和反序列化傳輸的數據包。默認的串行化程序是JSONS序列化程序,但是有幾個內置的串行化程序。
const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
nodeID: "server-1",
transporter: "NATS",
serializer: "ProtoBuf"
});
- JSON serializer:這是內置的默認序列化程序。它將數據包序列化為JSON字符串,並將接收到的數據反序列化為數據包。
const broker = new ServiceBroker({
// serializer: "JSON" // don't need to set, because it is the default
});
Load balancing
Built-in strategies
若要配置策略,請在注冊表屬性下設置策略代理選項。它可以是一個名稱(在內置策略的情況下),也可以是一個策略類(在自定義策略的情況下)。
Random strategy
const broker = new ServiceBroker({
registry: {
strategy: "Random"
}
});
RoundRobin strategy
const broker = new ServiceBroker({
registry: {
strategy: "RoundRobin"
}
});
CPU usage-based strategy
const broker = new ServiceBroker({
registry: {
strategy: "CpuUsage"
}
});
Fault tolerance
Circuit Breaker
- Moleculer有一個內置的斷路器解決方案.這是一個基於閾值的實現。它使用一個時間窗口來檢查失敗的請求率。一旦達到閾值,它就會觸發斷路器。
- 電路斷路器可以防止應用程序重復嘗試執行可能失敗的操作。允許它繼續,而不等待故障被修復或浪費CPU周期,而它確定故障是長期的。斷路器模式還允許應用程序檢測故障是否已經解決。如果問題似乎已經解決,應用程序可以嘗試調用操作。
- 如果啟用它,所有服務調用都將受到此內置斷路器的保護。
- 在代理選項中啟用它
const broker = new ServiceBroker({
circuitBreaker: {
enabled: true,
threshold: 0.5,
minRequestCount: 20,
windowTime: 60, // in seconds
halfOpenTime: 5 * 1000, // in milliseconds
check: err => err && err.code >= 500
}
});
- settings
- enabled:是否啟動此功能,默認是 false
- threshold:閾值,默認0.5,意味着50%的跳閘失敗
- minRequestCount:最小請求數,默認20,在它下面,回調函數不會觸發
- windowTime:時間窗口的秒數,默認60秒
- halfOpenTime:從打開狀態切換到半打開狀態的毫秒數,默認10000毫秒
- check:檢查失敗請求的函數,默認
err && err.code >= 500
- 如果斷路器狀態發生更改,ServiceBroker將發送內部事件
- 這些全局選項也可以在操作定義中重寫。
// users.service.js
module.export = {
name: "users",
actions: {
create: {
circuitBreaker: {
// All CB options can be overwritten from broker options.
threshold: 0.3,
windowTime: 30
},
handler(ctx) {}
}
}
};
Retry
- 重試解決方案
const broker = new ServiceBroker({
retryPolicy: {
enabled: true,
retries: 5,
delay: 100,
maxDelay: 2000,
factor: 2,
check: err => err && !!err.retryable
}
});
- settings
- enabled:是否啟用,默認 false
- retries:重試的次數,默認5次
- delay:第一次延遲以毫秒為單位,默認 100
- maxDelay:最大延遲(以毫秒為單位),默認 2000
- factor:延遲退避系數,默認是 2,表示指數退避
- check:檢查失敗請求的函數,
err && !!err.retryable
- 在調用選項中覆蓋retry值
broker.call("posts.find", {}, { retries: 3 });
- 在操作定義中覆蓋重試策略值
// users.service.js
module.export = {
name: "users",
actions: {
find: {
retryPolicy: {
// All Retry policy options can be overwritten from broker options.
retries: 3,
delay: 500
},
handler(ctx) {}
},
create: {
retryPolicy: {
// Disable retries for this action
enabled: false
},
handler(ctx) {}
}
}
};
Timeout
- 可以為服務調用設置超時。它可以在代理選項或調用選項中全局設置。如果定義了超時並且請求超時,代理將拋出RequestTimeoutError錯誤。
const broker = new ServiceBroker({
requestTimeout: 5 * 1000 // in seconds
});
- 覆蓋調用選項中的超時值
broker.call("posts.find", {}, { timeout: 3000 });
- 分布式超時:Moleculer使用分布式超時。在嵌套調用的情況下,超時值會隨着時間的推移而遞減。如果超時值小於或等於0,則跳過下一個嵌套調用(RequestSkippedError),因為第一個調用已被RequestTimeoutError錯誤拒絕。
Bulkhead
- 在Moleculer框架中實現了艙壁特性,以控制動作的並發請求處理。
const broker = new ServiceBroker({
bulkhead: {
enabled: true,
concurrency: 3,
maxQueueSize: 10,
}
});
-
settings
-
enabled:是否啟動,默認 false
-
concurreny:最大限度的並行數量,默認3
-
maxQueueSize:最大隊列大小,默認10
-
concurreny 值限制並發請求執行
-
如果 maxQueueSize大於0,則如果所有插槽都被占用,則 broker 將額外的請求存儲在隊列中
-
如果隊列大小達到maxQueueSize限制或為0,則 Broker 將對每個添加請求拋出QueueIsFull異常
-
-
這些全局選項也可以在操作定義中重寫
// users.service.js
// 在操作定義中覆蓋重試策略值
module.export = {
name: "users",
actions: {
find: {
bulkhead: {
enabled: false
},
handler(ctx) {}
},
create: {
bulkhead: {
// Increment the concurrency value
// for this action
concurrency: 10
},
handler(ctx) {}
}
}
};
Fallback
- 當您不想將錯誤返回給用戶時,回退功能是非常有用的。相反,調用其他操作或返回一些常見的內容。可以在調用選項或操作定義中設置回退響應。
- 它應該是一個返回包含任何內容的承諾的函數。borker 將當前 context&Error對象作為參數傳遞給此函數。
// fallback settings in calling options
const result = await broker.call("users.recommendation", { userID: 5 }, {
timeout: 500,
fallbackResponse(ctx, err) {
// Return a common response from cache
return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID);
}
});
- 回退響應也可以在接收端,在 action 中定義
- 請注意,只有在action 處理程序中發生錯誤時,才會使用此回退響應。如果從遠程節點調用請求,並且請求在遠程節點上超時,則不使用回退響應。在這種情況下,在調用選項中使用回退響應。
// fallback as a function
module.exports = {
name: "recommends",
actions: {
add: {
fallback: (ctx, err) => "Some cached result",
//fallback: "fakeResult",
handler(ctx) {
// Do something
}
}
}
};
// fallback as method name
module.exports = {
name: "recommends",
actions: {
add: {
// Call the 'getCachedResult' method when error occurred
fallback: "getCachedResult",
handler(ctx) {
// Do something
}
}
},
methods: {
getCachedResult(ctx, err) {
return "Some cached result";
}
}
};
NATS
幫助文檔
-
Node.js 微服務,這是一本外文書籍,淘寶上有