簡介
服務網格本質上還是遠程方法調用(RPC),而在ignite中注冊的服務本質體現還是以cache的形式存在,集群中的節點可以相互調用部署在其它節點上的服務,而且ignite集群會負責部署服務的容錯和負載均衡,並且服務可以在集群節點間傳播(前提是節點類路徑中包含服務類),並且給服務的部署方式提供了多種選擇。
ignite服務部署的最常見的兩種方式: 集群單例和節點單例
節點單例(deployNodeSingleton) : 在節點范圍內的單例,表示針對同一個服務集群中每個節點上只有一個實例。當在集群組中啟動了新的節點時,Ignite會自動地在每個新節點上部署一個新的服務實例。
集群單例(deployClusterSingleton) :在集群范圍內的單例,表示一個服務在整個集群中只有一個實例。當部署該服務的節點故障或者停止時,Ignite會自動在另一個節點上重新部署該服務。然而,如果部署該服務的節點仍然在網絡中,那么服務會一直部署在該節點上,除非拓撲發生了變化。
服務節點
- 定義服務
public interface MyCounterService {
/**
* Increment counter value and return the new value.
*/
int increment() throws CacheException;
/**
* Get current counter value.
*/
int get() throws CacheException;
}
@Component
public class MyCounterServiceImpl implements Service, MyCounterService {
/** Auto-injected instance of Ignite. */
@IgniteInstanceResource
private Ignite ignite;
/** Distributed cache used to store counters. */
private IgniteCache<String, Integer> cache;
/** Service name. */
private String svcName;
/**
* Service initialization.
*/
@Override public void init(ServiceContext ctx) {
// Pre-configured cache to store counters.
cache = ignite.cache("myCounterCache");
svcName = ctx.name();
System.out.println("Service was initialized: " + svcName);
}
/**
* Cancel this service.
*/
@Override public void cancel(ServiceContext ctx) {
// Remove counter from cache.
cache.remove(svcName);
System.out.println("Service was cancelled: " + svcName);
}
/**
* Start service execution.
*/
@Override public void execute(ServiceContext ctx) {
// Since our service is simply represented by a counter
// value stored in cache, there is nothing we need
// to do in order to start it up.
System.out.println("Executing distributed service: " + svcName);
}
@Override public int get() throws CacheException {
Integer i = cache.get(svcName);
return i == null ? 0 : i;
}
@Override public int increment() throws CacheException {
return cache.invoke(svcName, new CounterEntryProcessor());
}
/**
* Entry processor which atomically increments value currently stored in cache.
*/
private static class CounterEntryProcessor implements EntryProcessor<String, Integer, Integer> {
@Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
int newVal = e.exists() ? e.getValue() + 1 : 1;
// Update cache.
e.setValue(newVal);
return newVal;
}
}
}
- 配置服務節點過濾器並注冊服務
public class ServiceNodeFilter implements IgnitePredicate<ClusterNode>{
public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("service.node");
return dataNode != null && dataNode;
}
}
<property name="userAttributes">
<map key-type="java.lang.String" value-type="java.lang.Boolean">
<entry key="service.node" value="true"/> <!--服務節點屬性-->
</map>
</property>
<property name="serviceConfiguration">
<list>
<!--Setting up MaintenanceService. -->
<bean class="org.apache.ignite.services.ServiceConfiguration">
<!-- Unique service name -->
<property name="name" value="myCounterService"/>
<!-- Service implementation's class -->
<property name="service">
<!--<bean class="org.cord.ignite.servicegrid.MyCounterServiceImpl"/>-->
<ref bean="myCounterServiceImpl" />
</property>
<!-- Only one instance of the service will be deployed cluster wide. -->
<property name="totalCount" value="1"/>
<!-- Only one instance of the service can be deployed on a single node. -->
<property name="maxPerNodeCount" value="1"/>
<!-- Enabling a special nodes filter for this service.-->
<property name="nodeFilter">
<bean class="org.cord.ignite.initial.ServiceNodeFilter"/>
</property>
</bean>
</list>
</property>
- 調用服務
@GetMapping("/test1")
public String test1() {
//分布式計算如果不指定集群組的話則會傳播到所有節點
IgniteCompute compute = ignite.compute(ignite.cluster().forAttribute("service.node", true));
// IgniteCompute compute = ignite.compute(); //未部署服務的節點會拋出空指針
compute.run(new IgniteRunnable() {
@ServiceResource(serviceName = "myCounterService", proxySticky = false) //非粘性代理
private MyCounterService counterService;
@Override
public void run() {
int newValue = counterService.increment();
System.out.println("Incremented value : " + newValue);
}
});
return "all executed.";
}
分布式計算默認會傳播到集群中的所有節點,如果某個節點沒有部署相關服務,則調此服務負載均衡到該節點的時候會報空指針異常,因為該節點找不到此服務。針對此情景有兩種解決辦法:
1.獲取分布式計算對象的時候過濾節點,則計算只會傳播到過濾后的節點上
ignite.compute(ignite.cluster().forAttribute("service.node", true))
2.設置粘性代理
如果代理是粘性的,Ignite會總是訪問同一個集群節點的服務,如果代理是非粘性的,那么Ignite會在服務部署的所有集群節點內對遠程服務代理的調用進行負載平衡。
服務網格故障轉移
只有在分布式計算中使用服務網格調用才能實現服務調用故障轉移
所謂故障轉移,也就是服務調用過程中節點宕機,這時候會在其它節點繼續執行。另外,如果服務是集群單例的話,那么如果節點宕機,首先發生的是服務的故障轉移,這個時候分布式計算的故障轉移會出錯,因為其它節點的服務不一定會已經初始化成功。所以如果要保證服務調用能故障轉移,最好在服務部署的時候保證服務是集群內有多個實例,並且在不同的節點,這樣在節點故障的時候進行服務調用可以進行故障轉移。
分布式計算中使用服務網格(可以故障轉移):
IgniteCompute compute = ignite.compute(ignite.cluster().forServers());
compute.run(new IgniteRunnable() {
@ServiceResource(serviceName = "simpleMapService", proxyInterface = SimpleMapService.class, proxySticky = true)
private SimpleMapService simpleMapService;
@Override
public void run() {
if (simpleMapService != null) {
Object ret = simpleMapService.get("sleep");
} else {
System.out.println("simpleMapService is null");
}
}
});
正常調用服務網格(無法故障轉移):
Ignite ignite = Ignition.ignite();
IgniteServices svcs = ignite.services(ignite.cluster().forRemotes());
//非粘性,負載均衡
SimpleMapService sms = svcs.serviceProxy("simpleMapService", SimpleMapService.class, false);
Object ret = sms.get("sleep");
數據節點
ignite節點可以配置成server模式或者client模式,主要區別在於client模式無法存儲數據,而server能存儲數據,但是無法更細粒度控制數據,比如控制數據在某部分節點上存儲。通過節點過濾器可以實現集群中划分部分節點作為某緩存的節點。
注意:如果一個節點本身是client模式,那么即使這個節點配置成了數據節點,它也是無法存儲數據的。
1,實現數據節點過濾器
public class DataNodeFilter implements IgnitePredicate<ClusterNode>{
public boolean apply(ClusterNode node) {
Boolean dataNode = node.attribute("data.node");
return dataNode != null && dataNode;
}
}
該過濾器通過檢查節點屬性中是否含有data.node==true
的過濾標志來確定該節點是否會被當做數據節點。
過濾器的實現類需要放到每個節點的classpath
路徑下,不管該節點是否會成為數據節點。
2,緩存配置中添加過濾器
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<property name="name" value="student"/>
<property name="cacheMode" value="REPLICATED"/>
<property name="backups" value="1"/>
<property name="atomicityMode" value="ATOMIC"/>
<!--Force cache to return the instance that is stored in cache instead of creating a copy. -->
<property name="copyOnRead" value="false"/>
<property name="dataRegionName" value="Default_Region"/>
<property name="sqlSchema" value="PUBLIC"/>
<property name="indexedTypes">
<list>
<value>java.lang.Long</value>
<value>org.cord.ignite.data.domain.Student</value>
</list>
</property>
<property name="nodeFilter"> <!--配置節點過濾器-->
<bean class="org.cord.ignite.initial.DataNodeFilter"/>
</property>
</bean>
這個過濾器會在緩存啟動時調用,它會定義一個要存儲緩存數據的集群節點的子集--數據節點。同樣的過濾器在網絡拓撲發生變化時也會被調用,比如新節點加入集群或者舊節點離開集群。
3,為需要作為數據節點的節點添加過濾標志屬性
<property name="userAttributes">
<map key-type="java.lang.String" value-type="java.lang.Boolean">
<entry key="data.node" value="true"/> <!--數據節點屬性-->
</map>
</property>
示例:啟動兩個節點,其中一個cache配置了過濾器,並且只有一個節點添加了過濾標志,最后數據的存儲狀態如下:
visor> cache
Time of the snapshot: 11/14/18, 21:57:06
+=========================================================================
| Name(@) | Mode | Nodes | Entries (Heap / Off-heap) |
+=========================================================================
| myCounterCache(@c0) | REPLICATED | 2 | min: 0 (0 / 0) |
| | | | avg: 0.50 (0.00 / 0.50) |
| | | | max: 1 (0 / 1) |
+---------------------+------------+-------+-----------------------------+
| student(@c1) | REPLICATED | 1 | min: 500 (0 / 500) |
| | | | avg: 500.00 (0.00 / 500.00) |
| | | | max: 500 (0 / 500) |
+-------------------------------------------------------------------------
可見student
緩存只分布在一個節點上了,並沒有像普通情況一樣發生數據的再平衡,說明節點的過濾器起作用了。
通過節點過濾器,可以將數據節點和服務節點進行分離,因為部署新服務需要部署新的服務相關的包,涉及到重啟之類的,這樣便於維護。另外,通過部署一系列的服務,這就形成了一套微服務的解決方案,而且不用考慮負載均衡和容錯處理,這些ignite都會處理,我們需要做的就是實現服務並將服務部署到ignite集群,就能實現服務的拆分,和服務的治理,達到微服務的效果。
附: