SpringData ES中一些底層原理的分析


之前寫過一篇SpringData ES 關於字段名和索引中的列名字不一致導致的查詢問題,順便深入學習下Spring Data Elasticsearch。
 
Spring Data ElasticsearchSpring Data針對Elasticsearch的實現。

它跟Spring Data一樣,提供了Repository接口,我們只需要定義一個新的接口並繼承這個Repository接口,然后就可以注入這個新的接口使用了。
 
定義接口:
 

@Repository public interface TaskRepository extends ElasticsearchRepository<Task, String> { }


注入接口進行使用:
 

@Autowired private TaskRepository taskRepository; .... taskRepository.save(task);


Repository接口的代理生成
 
上面的例子中TaskRepository是個接口,而我們卻直接注入了這個接口並調用方法;很明顯,這是錯誤的。

其實SpringData ES內部基於這個TaskRepository接口構造一個SimpleElasticsearchRepository,真正被注入的是這個SimpleElasticsearchRepository。

這個過程是如何實現的呢?  來分析一下。

ElasticsearchRepositoriesAutoConfiguration自動化配置類會導入ElasticsearchRepositoriesRegistrar這個ImportBeanDefinitionRegistrar。

ElasticsearchRepositoriesRegistrar繼承自AbstractRepositoryConfigurationSourceSupport,是個ImportBeanDefinitionRegistrar接口的實現類,會被Spring容器調用registerBeanDefinitions進行自定義bean的注冊。

ElasticsearchRepositoriesRegistrar委托給RepositoryConfigurationDelegate完成bean的解析。

整個解析過程可以分3個步驟:
 

  1. 找出模塊中的org.springframework.data.repository.Repository接口的實現類或者org.springframework.data.repository.RepositoryDefinition注解的修飾類,並會過濾掉org.springframework.data.repository.NoRepositoryBean注解的修飾類。找出后封裝到RepositoryConfiguration中
  2. 遍歷這些RepositoryConfiguration,然后構造成BeanDefinition並注冊到Spring容器中。需要注意的是這些RepositoryConfiguration會以beanClass為ElasticsearchRepositoryFactoryBean這個類的方式被注冊,並把對應的Repository接口當做構造參數傳遞給ElasticsearchRepositoryFactoryBean,還會設置相應的屬性比如elasticsearchOperations、evaluationContextProvider、namedQueries、repositoryBaseClass、lazyInitqueryLookupStrategyKey
  3. ElasticsearchRepositoryFactoryBean被實例化的時候設置對應的構造參數和屬性。設置完畢以后調用afterPropertiesSet方法(實現了InitializingBean接口)。在afterPropertiesSet方法內部會去創建RepositoryFactorySupport類,並進行一些初始化,比如namedQueries、repositoryBaseClass等。然后通過這個RepositoryFactorySupport的getRepository方法基於Repository接口創建出代理類,並使用AOP添加了幾個MethodInterceptor

 

// 遍歷基於第1步條件得到的RepositoryConfiguration集合 for (RepositoryConfiguration<? extends RepositoryConfigurationSource> configuration : extension .getRepositoryConfigurations(configurationSource, resourceLoader, inMultiStoreMode)) { // 構造出BeanDefinitionBuilder BeanDefinitionBuilder definitionBuilder = builder.build(configuration); extension.postProcess(definitionBuilder, configurationSource); if (isXml) { // 設置elasticsearchOperations屬性 extension.postProcess(definitionBuilder, (XmlRepositoryConfigurationSource) configurationSource); } else { // 設置elasticsearchOperations屬性 extension.postProcess(definitionBuilder, (AnnotationRepositoryConfigurationSource) configurationSource); } // 使用命名策略生成bean的名字 AbstractBeanDefinition beanDefinition = definitionBuilder.getBeanDefinition(); String beanName = beanNameGenerator.generateBeanName(beanDefinition, registry); if (LOGGER.isDebugEnabled()) { LOGGER.debug(REPOSITORY_REGISTRATION, extension.getModuleName(), beanName, configuration.getRepositoryInterface(), extension.getRepositoryFactoryClassName()); } beanDefinition.setAttribute(FACTORY_BEAN_OBJECT_TYPE, configuration.getRepositoryInterface()); // 注冊到Spring容器中 registry.registerBeanDefinition(beanName, beanDefinition); definitions.add(new BeanComponentDefinition(beanDefinition, beanName)); } // build方法 public BeanDefinitionBuilder build(RepositoryConfiguration<?> configuration) { Assert.notNull(registry, "BeanDefinitionRegistry must not be null!"); Assert.notNull(resourceLoader, "ResourceLoader must not be null!"); // 得到factoryBeanName,這里會使用extension.getRepositoryFactoryClassName()去獲得 // extension.getRepositoryFactoryClassName()返回的正是ElasticsearchRepositoryFactoryBean String factoryBeanName = configuration.getRepositoryFactoryBeanName(); factoryBeanName = StringUtils.hasText(factoryBeanName) ? factoryBeanName : extension.getRepositoryFactoryClassName(); // 基於factoryBeanName構造BeanDefinitionBuilder BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(factoryBeanName); builder.getRawBeanDefinition().setSource(configuration.getSource()); // 設置ElasticsearchRepositoryFactoryBean的構造參數,這里是對應的Repository接口 // 設置一些的屬性值 builder.addConstructorArgValue(configuration.getRepositoryInterface()); builder.addPropertyValue("queryLookupStrategyKey", configuration.getQueryLookupStrategyKey()); builder.addPropertyValue("lazyInit", configuration.isLazyInit()); builder.addPropertyValue("repositoryBaseClass", configuration.getRepositoryBaseClassName()); NamedQueriesBeanDefinitionBuilder definitionBuilder = new NamedQueriesBeanDefinitionBuilder( extension.getDefaultNamedQueryLocation()); if (StringUtils.hasText(configuration.getNamedQueriesLocation())) { definitionBuilder.setLocations(configuration.getNamedQueriesLocation()); } builder.addPropertyValue("namedQueries", definitionBuilder.build(configuration.getSource())); // 查找是否有對應Repository接口的自定義實現類 String customImplementationBeanName = registerCustomImplementation(configuration); // 存在自定義實現類的話,設置到屬性中 if (customImplementationBeanName != null) { builder.addPropertyReference("customImplementation", customImplementationBeanName); builder.addDependsOn(customImplementationBeanName); } RootBeanDefinition evaluationContextProviderDefinition = new RootBeanDefinition( ExtensionAwareEvaluationContextProvider.class); evaluationContextProviderDefinition.setSource(configuration.getSource()); // 設置一些的屬性值 builder.addPropertyValue("evaluationContextProvider", evaluationContextProviderDefinition); return builder; } // RepositoryFactorySupport的getRepository方法,獲得Repository接口的代理類 public <T> T getRepository(Class<T> repositoryInterface, Object customImplementation) { // 獲取Repository的元數據 RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface); // 獲取Repository的自定義實現類 Class<?> customImplementationClass = null == customImplementation ? null : customImplementation.getClass(); // 根據元數據和自定義實現類得到Repository的RepositoryInformation信息類 // 獲取信息類的時候如果發現repositoryBaseClass是空的話會根據meta中的信息去自動匹配 // 具體匹配過程在下面的getRepositoryBaseClass方法中說明 RepositoryInformation information = getRepositoryInformation(metadata, customImplementationClass); // 驗證 validate(information, customImplementation); // 得到最終的目標類實例,會通過repositoryBaseClass去查找 Object target = getTargetRepository(information); // 創建代理工廠 ProxyFactory result = new ProxyFactory(); result.setTarget(target); result.setInterfaces(new Class[] { repositoryInterface, Repository.class }); // 進行aop相關的設置 result.addAdvice(SurroundingTransactionDetectorMethodInterceptor.INSTANCE); result.addAdvisor(ExposeInvocationInterceptor.ADVISOR); if (TRANSACTION_PROXY_TYPE != null) { result.addInterface(TRANSACTION_PROXY_TYPE); } // 使用RepositoryProxyPostProcessor處理 for (RepositoryProxyPostProcessor processor : postProcessors) { processor.postProcess(result, information); } if (IS_JAVA_8) { // 如果是JDK8的話,添加DefaultMethodInvokingMethodInterceptor result.addAdvice(new DefaultMethodInvokingMethodInterceptor()); } // 添加QueryExecutorMethodInterceptor result.addAdvice(new QueryExecutorMethodInterceptor(information, customImplementation, target)); // 使用代理工廠創建出代理類,這里是使用jdk內置的代理模式 return (T) result.getProxy(classLoader); } // 目標類的獲取 protected Class<?> getRepositoryBaseClass(RepositoryMetadata metadata) { // 如果Repository接口屬於QueryDsl,拋出異常。目前還不支持 if (isQueryDslRepository(metadata.getRepositoryInterface())) { throw new IllegalArgumentException("QueryDsl Support has not been implemented yet."); } // 如果主鍵是數值類型的話,repositoryBaseClass為NumberKeyedRepository if (Integer.class.isAssignableFrom(metadata.getIdType()) || Long.class.isAssignableFrom(metadata.getIdType()) || Double.class.isAssignableFrom(metadata.getIdType())) { return NumberKeyedRepository.class; } else if (metadata.getIdType() == String.class) { // 如果主鍵是String類型的話,repositoryBaseClass為SimpleElasticsearchRepository return SimpleElasticsearchRepository.class; } else if (metadata.getIdType() == UUID.class) { // 如果主鍵是UUID類型的話,repositoryBaseClass為UUIDElasticsearchRepository return UUIDElasticsearchRepository.class; } else { // 否則報錯 throw new IllegalArgumentException("Unsupported ID type " + metadata.getIdType()); } }


ElasticsearchRepositoryFactoryBean是一個FactoryBean接口的實現類,getObject方法返回的上面提到的getRepository方法返回的代理對象;getObjectType方法返回的是對應Repository接口類型。

我們文章一開始提到的注入TaskRepository的時候,實際上這個對象是ElasticsearchRepositoryFactoryBean類型的實例,只不過ElasticsearchRepositoryFactoryBean實現了FactoryBean接口,所以注入的時候會得到一個代理對象,這個代理對象是由jdk內置的代理生成的,並且它的target對象是SimpleElasticsearchRepository(主鍵是String類型)。
 
 
SpringData ES中ElasticsearchOperations的介紹
 
ElasticsearchTemplate實現了ElasticsearchOperations接口。

ElasticsearchOperations接口是SpringData對Elasticsearch操作的一層封裝,比如有創建索引createIndex方法、獲取索引的設置信息getSetting方法、查詢對象queryForObject方法、分頁查詢方法queryForPage、刪除文檔delete方法、更新文檔update方法等等。

ElasticsearchTemplate是具體的實現類,它有這些屬性:
 

// elasticsearch提供的基於java的客戶端連接接口。java對es集群的操作使用這個接口完成 private Client client; // 一個轉換器接口,定義了2個方法,分別可以獲得MappingContext和ConversionService // MappingContext接口用於獲取所有的持久化實體和這些實體的屬性 // ConversionService目前在SpringData ES中沒有被使用 private ElasticsearchConverter elasticsearchConverter; // 內部使用EntityMapper完成對象到json字符串和json字符串到對象的映射。默認使用jackson完成映射,可自定義 private ResultsMapper resultsMapper; // 查詢超時時間 private String searchTimeout;


Client接口在ElasticsearchAutoConfiguration自動化配置類里被構造:
 

@Bean @ConditionalOnMissingBean public Client elasticsearchClient() { try { return createClient(); } catch (Exception ex) { throw new IllegalStateException(ex); } }


ElasticsearchTemplate、ElasticsearchConverter以及SimpleElasticsearchMappingContext在ElasticsearchDataAutoConfiguration自動化配置類里被構造:
 

@Bean @ConditionalOnMissingBean public ElasticsearchTemplate elasticsearchTemplate(Client client, ElasticsearchConverter converter) { try { return new ElasticsearchTemplate(client, converter); } catch (Exception ex) { throw new IllegalStateException(ex); } } @Bean @ConditionalOnMissingBean public ElasticsearchConverter elasticsearchConverter( SimpleElasticsearchMappingContext mappingContext) { return new MappingElasticsearchConverter(mappingContext); } @Bean @ConditionalOnMissingBean public SimpleElasticsearchMappingContext mappingContext() { return new SimpleElasticsearchMappingContext(); }


 需要注意的是這個bean被自動化配置類構造的前提是它們在Spring容器中並不存在。
 
Repository的調用過程
 
以自定義的TaskRepository的save方法為例,大致的執行流程如下所示:



SimpleElasticsearchRepository的save方法具體的分析在SpringData ES 關於字段名和索引中的列名字不一致導致的查詢問題中分析過。

像自定義的Repository查詢方法,或者Repository接口的自定義實現類的操作這些底層,可以去QueryExecutorMethodInterceptor中查看,大家有興趣的可以自行查看源碼。

http://spring4all.com/article/17

最近工作中使用了Spring Data Elasticsearch。發生它存在一個問題:

Document對應的POJO的屬性跟es里面文檔的字段名字不一樣,這樣Repository里面編寫自定義的查詢方法就會查詢不出結果。

比如有個Person類,它有2個屬性goodFace和goodAt。這2個屬性在es的索引里對應的字段表為good_face和good_at:

1
2
3
4
5
6
7
8
9
10
11
@Document(replicas = 1, shards = 1, type = "person", indexName = "person")
@Getter
@Setter
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class Person {
@Id
private String id;
private String name;
private boolean goodFace;
private String goodAt;
}

Repository中的自定義查詢:

1
2
3
4
5
@Repository
public interface PersonRepository extends ElasticsearchRepository<Person, String> {
List<Person> findByGoodFace(boolean isGoodFace);
List<Person> findByName(String name);
}

方法findByGoodFace是查詢不出結果的,而findByName是ok的。

為什么findByGoodFace不行而findByName可以呢,來探究一下。

Person類的name屬性跟ES中的字段名是一模一樣的,而goodFace字段在ES中的字段是good_face(因為我們使用了SnakeCaseStrategy策略)。

所以產生這個問題的原因在於ES中文檔的字段名跟POJO中的字段名不統一造成的。

但是我們使用PersonRepository的save方法保存文檔的時候屬性和字段是可以對上的。

那為什么使用repository的save方法能對應上文檔和字段,而自定義的find方法卻不行呢?

ES是使用jackson來完成POJO到json的映射關系的。

在Person類上使用@JsonNaming注解完成POJO和json的映射,我們使用了SnakeCaseStrategy策略,這個策略會把屬性從駝峰方式改成小寫帶下划線的方式。

比如goodAt屬性映射的時候就會變成good_at,good_face變成good_face,name變成name。

Spring Data Elasticsearch把對ES的操作封裝成了一個ElasticsearchOperations接口。比如queryForObject、queryForPage、count、queryForList方法。

ElasticsearchOperations接口目前有一個實現類ElasticsearchTemplate。

ElasticsearchTemplate內部有個ResultsMapper屬性,這個ResultsMapper目前只有一個實現類DefaultResultMapper,DefaultResultMapper內部使用DefaultEntityMapper完成映射。DefaultEntityMapper是個EntityMapper接口的實現類,它的定義如下:

1
2
3
4
public interface EntityMapper {
public String mapToString(Object object) throws IOException;
public <T> T mapToObject(String source, Class<T> clazz) throws IOException;
}

方法很明白:對象到json字符串的轉換和json字符串倒對象的轉換。

DefaultEntityMapper內部使用jackson的ObjectMapper完成。

自定義的Repository繼承自ElasticsearchRepository,最后會使用代理映射成SimpleElasticsearchRepository。

SimpleElasticsearchRepository內部有個屬性ElasticsearchOperations用於完成與ES的交互。

我們看下SimpleElasticsearchRepository的save方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public <S extends T> S save(S entity) {
Assert.notNull(entity, "Cannot save 'null' entity.");
// createIndexQuery方法會構造一個IndexQuery,然后調用ElasticsearchOperations的index方法
elasticsearchOperations.index(createIndexQuery(entity));
elasticsearchOperations.refresh(entityInformation.getIndexName());
return entity;
}
 
// ElasticsearchTemplate的index方法
@Override
public String index(IndexQuery query) {
// 調用prepareIndex方法構造一個IndexRequestBuilder
String documentId = prepareIndex(query).execute().actionGet().getId();
// 設置保存文檔的id
if (query.getObject() != null) {
setPersistentEntityId(query.getObject(), documentId);
}
return documentId;
}
 
private IndexRequestBuilder prepareIndex(IndexQuery query) {
try {
// 從@Document注解中得到索引的名字
String indexName = isBlank(query.getIndexName()) ? retrieveIndexNameFromPersistentEntity(query.getObject()
.getClass())[ 0] : query.getIndexName();
// 從@Document注解中得到索引的類型
String type = isBlank(query.getType()) ? retrieveTypeFromPersistentEntity(query.getObject().getClass())[ 0]
: query.getType();
 
IndexRequestBuilder indexRequestBuilder = null;
 
if (query.getObject() != null) { // save方法這里保存的object就是POJO
// 得到id字段
String id = isBlank(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
if (id != null) { // 如果設置了id字段
indexRequestBuilder = client.prepareIndex(indexName, type, id);
} else { // 如果沒有設置id字段
indexRequestBuilder = client.prepareIndex(indexName, type);
}
// 使用ResultsMapper映射POJO到json字符串
indexRequestBuilder.setSource(resultsMapper.getEntityMapper().mapToString(query.getObject()));
} else if (query.getSource() != null) { // 如果自定義了source屬性,直接賦值
indexRequestBuilder = client.prepareIndex(indexName, type, query.getId()).setSource(query.getSource());
} else { // 沒有設置object屬性或者source屬性,拋出ElasticsearchException異常
throw new ElasticsearchException("object or source is null, failed to index the document [id: " + query.getId() + "]");
}
if (query.getVersion() != null) { // 設置版本
indexRequestBuilder.setVersion(query.getVersion());
indexRequestBuilder.setVersionType(EXTERNAL);
}
 
if (query.getParentId() != null) { // 設置parentId
indexRequestBuilder.setParent(query.getParentId());
}
 
return indexRequestBuilder;
} catch (IOException e) {
throw new ElasticsearchException("failed to index the document [id: " + query.getId() + "]", e);
}
}

save方法使用ResultsMapper完成了POJO到json的轉換,所以save方法保存成功對應的文檔數據:

1
indexRequestBuilder.setSource(resultsMapper.getEntityMapper().mapToString(query.getObject()));

自定義的findByGoodFace方法:

由於是Repository中的自定義方法,會被Spring Data通過代理進行構造,內部還是用了AOP,最終在QueryExecutorMethodInterceptor中並解析成ElasticsearchPartQuery這個RepositoryQuery接口的實現類,然后調用execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
public Object execute(Object[] parameters) {
ParametersParameterAccessor accessor = new ParametersParameterAccessor(queryMethod.getParameters(), parameters);
CriteriaQuery query = createQuery(accessor);
if(tree.isDelete()) { // 如果是刪除方法
Object result = countOrGetDocumentsForDelete(query, accessor);
elasticsearchOperations.delete(query, queryMethod.getEntityInformation().getJavaType());
return result;
} else if (queryMethod.isPageQuery()) { // 如果是分頁查詢
query.setPageable(accessor.getPageable());
return elasticsearchOperations.queryForPage(query, queryMethod.getEntityInformation().getJavaType());
} else if (queryMethod.isStreamQuery()) { // 如果是流式查詢
Class<?> entityType = queryMethod.getEntityInformation().getJavaType();
if (query.getPageable() == null) {
query.setPageable( new PageRequest(0, 20));
}
 
return StreamUtils.createStreamFromIterator((CloseableIterator<Object>) elasticsearchOperations.stream(query, entityType));
 
} else if (queryMethod.isCollectionQuery()) { // 如果是集合查詢
if (accessor.getPageable() == null) {
int itemCount = (int) elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
query.setPageable( new PageRequest(0, Math.max(1, itemCount)));
} else {
query.setPageable(accessor.getPageable());
}
return elasticsearchOperations.queryForList(query, queryMethod.getEntityInformation().getJavaType());
} else if (tree.isCountProjection()) { // 如果是count查詢
return elasticsearchOperations.count(query, queryMethod.getEntityInformation().getJavaType());
}
// 單個查詢
return elasticsearchOperations.queryForObject(query, queryMethod.getEntityInformation().getJavaType());
}

findByGoodFace方法是個集合查詢,最終會調用ElasticsearchOperations的queryForList方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public <T> List<T> queryForList(CriteriaQuery query, Class<T> clazz) {
// 調用queryForPage方法
return queryForPage(query, clazz).getContent();
}
 
@Override
public <T> Page<T> queryForPage(CriteriaQuery criteriaQuery, Class<T> clazz) {
// 查詢解析器進行語法的解析
QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(criteriaQuery.getCriteria());
QueryBuilder elasticsearchFilter = new CriteriaFilterProcessor().createFilterFromCriteria(criteriaQuery.getCriteria());
SearchRequestBuilder searchRequestBuilder = prepareSearch(criteriaQuery, clazz);
 
if (elasticsearchQuery != null) {
searchRequestBuilder.setQuery(elasticsearchQuery);
} else {
searchRequestBuilder.setQuery(QueryBuilders.matchAllQuery());
}
 
if (criteriaQuery.getMinScore() > 0) {
searchRequestBuilder.setMinScore(criteriaQuery.getMinScore());
}
 
if (elasticsearchFilter != null)
searchRequestBuilder.setPostFilter(elasticsearchFilter);
if (logger.isDebugEnabled()) {
logger.debug( "doSearch query:\n" + searchRequestBuilder.toString());
}
 
SearchResponse response = getSearchResponse(searchRequestBuilder
.execute());
// 最終的結果是用ResultsMapper進行映射
return resultsMapper.mapResults(response, clazz, criteriaQuery.getPageable());
}

自定義的方法使用ElasticsearchQueryCreator去創建CriteriaQuery,內部做一些詞法的分析,有了CriteriaQuery之后,使用CriteriaQueryProcessor基於Criteria構造了QueryBuilder,最后使用QueryBuilder去做rest請求得到es的查詢結果。這些過程中是沒有用到ResultsMapper,而只是用反射得到POJO的屬性,只有在得到查詢結果后才會用ResultsMapper去做映射。

如果出現了這種情況,解決方案目前有兩種:

1.使用repository的search方法,參數可以是QueryBuilder或者SearchQuery

1
2
3
4
personRepository.search(
QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery( "good_face", true))
)

2.使用@Query注解

1
2
@Query("{\"bool\" : {\"must\" : {\"term\" : {\"good_face\" : \"?0\"}}}}")
List<Person> findByGoodFace(boolean isGoodFace);

暫時發現這兩種解決方法,不知還有否更好的解決方案。http://fangjian0423.github.io/2017/05/24/spring-data-es-query-problem/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM