“前两天在大批量删除ES数据的时候,出现部分数据删除,部分遗留的问题,原因在于偏移量”
思路
在删除之前考虑是应用Es的“_bulk”,但也不是一股脑把几万条,或者几十万条的数据全部堆进去, 而且符合条件的才能进行删除, 所以需要根据条件对数据进行一个查询,然后删除。
出现问题的处理方式
TestController.php (文件名称)
public $page = 0;
... 以上都是类信息
public function actionDelEsData($limit = 1000, $chunkLimit = 100) {
$esModel = $this->container->get('EsModel');
$index = $esModel->index; #这里可以多个index
$type = $esModel->type; #相对应index,可以多个type
$query = [ // Yii2.0 框架中Es Query的写法
'bool' => [
'must' => [
'term' => []
],
'must_not' => [
'range' => []
]
]
];
$sort = ['_id' => SORT_ASC];
while($list = $esModel->getList($index, $type, $query, $sort, $limit, $this->page)) {
$idList = array_column($list, '_id');
// 通过array_chunk进行分割,批次进行删除
$chunkIdList = array_chunk($idList, $chunkLimit);
foreach ($chunkIdList as $item) {
$esModel->batchDelEsData($index, $type, $item);
}
// 将page + 1
$this->page += 1;
}
}
EsModel.php (文件名称)
/**
* 获取列表
**/
public function getList(string $index, string $type, array $query = [], array $sort = [], $limit, $page) {
self::$index = $index;
self::$type = $type;
$esModel = self::find();
if ($query) {
$esModel->query($query);
}
if (!$sort) {
$sort = ['field' => SORT_ASC/SORT_DESC];
}
$offset = ($page - 1) * $limit;
return $esModel->orderBy($sort)->offset($offset)->limit($limit)->asArray()->all();
}
从上边可以看出,我是利用limit, offset来进行偏移查询, 然后在进行批量删除,可是在进行删除的过程中执行近一半,跳出了while循环,es数据没有删除完整,原因则在于
数据的偏移
修改后的处理方式
- 固定的query, 可以在外加上"_id"当作查询条件,每次 "> $minId"
TestController.php (文件名称)
public $minId= 0;
... 以上都是类信息
public function actionDelEsData($limit = 1000, $chunkLimit = 100) {
$esModel = $this->container->get('EsModel');
$index = $esModel->index; #这里可以多个index
$type = $esModel->type; #相对应index,可以多个type
$query = [ // Yii2.0 框架中Es Query的写法
'bool' => [
'must' => [
'term' => []
],
'must_not' => [
'range' => []
]
]
];
$sort = ['_id' => SORT_ASC];
while($list = $esModel->getList($index, $type, $query, $sort, $limit, $this->minId)) {
$idList = array_column($list, '_id');
// 通过array_chunk进行分割,批次进行删除
$chunkIdList = array_chunk($idList, $chunkLimit);
foreach ($chunkIdList as $item) {
$esModel->batchDelEsData($index, $type, $item);
}
// 每次置换最小ID
$this->minId = end($list)['_id'];
}
}
EsModel.php (文件名称)
/**
* 获取列表
**/
public function getList(string $index, string $type, array $query = [], array $sort = [], $limit = 100, $minId = 0) {
self::$index = $index;
self::$type = $type;
$esModel = self::find();
$range = [
'bool' => [
'must' => [
'range' => [
'id' => ['gt' => $minId]
]
]
]
];
if ($query) {
$query['bool']['must'] = $range;
}else{
$query = $range;
}
$esModel->query($query);
if (!$sort) {
$sort = ['field' => SORT_ASC/SORT_DESC];
}
return $esModel->orderBy($sort)->limit($limit)->asArray()->all();
}
- 在偏移查询的时候,offset可以一直从0开始,样例如下:
TestController.php (文件名称)
public $page = 0;
... 以上都是类信息
public function actionDelEsData($limit = 1000, $chunkLimit = 100) {
$esModel = $this->container->get('EsModel');
$index = $esModel->index; #这里可以多个index
$type = $esModel->type; #相对应index,可以多个type
$query = [ // Yii2.0 框架中Es Query的写法
'bool' => [
'must' => [
'term' => []
],
'must_not' => [
'range' => []
]
]
];
$sort = ['_id' => SORT_ASC];
while($list = $esModel->getList($index, $type, $query, $sort, $limit, $this->page)) {
$idList = array_column($list, '_id');
// 通过array_chunk进行分割,批次进行删除
$chunkIdList = array_chunk($idList, $chunkLimit);
foreach ($chunkIdList as $item) {
$esModel->batchDelEsData($index, $type, $item);
}
}
}
EsModel.php (文件名称)
/**
* 获取列表
**/
public function getList(string $index, string $type, array $query = [], array $sort = [], $limit, $page) {
self::$index = $index;
self::$type = $type;
$esModel = self::find();
if ($query) {
$esModel->query($query);
}
if (!$sort) {
$sort = ['field' => SORT_ASC/SORT_DESC];
}
$offset = ($page - 1) * $limit;
return $esModel->orderBy($sort)->offset($offset)->limit($limit)->asArray()->all();
}