php 使用 ElasticSearch/es 的最佳實踐


PHP 使用elasticsearch

composer require elasticsearch/elasticsearch

會自動加載合適的版本!我的php是7.1的,它會自動加載7.9的elasticsearch版本!

elasticsearch 的安裝:

https://www.cnblogs.com/-mrl/p/13854210.html

 

elasticsearch的簡單使用:

例子1:

<?php

require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;

//https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
//$client = ClientBuilder::create()->build();
//如果沒有設置主機地址默認為127.0.0.1:9200
$client = Elasticsearch\ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
//var_dump($client);
//索引一個文檔
echo '索引一個文檔'.PHP_EOL;
$params = [
    'index' => 'my_index',
    'type' => 'my_type',
    'id' => 'my_id',
    'body' => ['testField' => 'abc']
];
$response = $client->index($params);
pr($response);

//獲取一個文檔
echo '獲取一個文檔'.PHP_EOL;
$params = [
    'index' => 'my_index',
    'type' => 'my_type',
    'id' => 'my_id'
];

$response = $client->get($params);
pr($response);
echo '搜索一個文檔'.PHP_EOL;
//搜索一個文檔
$params = [
    'index' => 'my_index',
    'type' => 'my_type',
    'body' => [
        'query' => [
            'match' => [
                'testField' => 'abc'
            ]
        ]
    ]
];

$response = $client->search($params);
pr($response);


//DIE;
echo '刪除一個文檔'.PHP_EOL;
//刪除一個文檔
$params = [
    'index' => 'my_index',
    'type' => 'my_type',
    'id' => 'my_id'
];
$response = $client->delete($params);
pr($response);

echo '刪除一個索引'.PHP_EOL;
//刪除一個索引
$deleteParams = [
    'index' => 'my_index'
];
$response = $client->indices()->delete($deleteParams);
pr($response);


echo '創建一個索引'.PHP_EOL;
//創建一個索引
$params = [
    'index' => 'my_index',
    'body' => [
        'settings' => [
            'number_of_shards' => 2,
            'number_of_replicas' => 0
        ]
    ]
];

$response = $client->indices()->create($params);
pr($response);

function pr($response){
    echo '<pre>';
    print_r($response);
    echo '</pre>';
}
View Code

例子2:

先在 MySQL 中創建一張數據表 product,建表語句如下:

CREATE TABLE `product` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '商品 ID',
  `title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名稱',
  `long_title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品長名稱',
  `sku` varchar(32) NOT NULL DEFAULT '' COMMENT '商品 SKU',
  `money` int(10) NOT NULL DEFAULT '0' COMMENT '商品價格',
  `sales` int(11) NOT NULL DEFAULT '0' COMMENT '商品銷量',
  `created_at` datetime DEFAULT NULL COMMENT '創建時間',
  `updated_at` datetime DEFAULT NULL COMMENT '修改時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4;

創建test.php

<?php

require 'vendor/autoload.php';
require 'Db.php';

use Elasticsearch\ClientBuilder;

const ES_INDEX = 'product_index';
const ES_TYPE = 'product';


$nowTime = date("Y-m-d H:i:s");
//https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
//$client = ClientBuilder::create()->build();
//如果沒有設置主機地址默認為127.0.0.1:9200
$client = Elasticsearch\ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
//var_dump($client);
$db = new Db();
//print_r($db);die;
//http://localhost/es/test.php?do=create
if (isset($_GET['do']) && $_GET['do'] == 'create') {    // 只能創建一次
    //通過設置mapping結構創建一個索引庫(相當於mysql創建一個數據庫)
    $params = [
        'index' => ES_INDEX,  //索引名(相當於mysql的數據庫)
        /*'client' => [
            'ignore' => 404
        ],*/
        'body' => [
            'settings' => [
                'number_of_shards' => 2,  #分片數
                'number_of_replicas' => 0,
            ],
            'mappings' => [
                'properties' => [ //文檔類型設置(相當於mysql的數據類型)
                    'product_id' => [
                        'type' => 'integer',
                    ],
                    'title' => [
                        'type' => 'text'
                    ],
                    'long_title' => [
                        'type' => 'text'
                    ],
                    'sku' => [
                        'type' => 'text'
                    ],
                    'money' => [
                        'type' => 'integer'
                    ],
                    'sales' => [
                        'type' => 'integer'
                    ],
                    'created_at' => [
                        'type' => 'date',
                        'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                    ],
                    'updated_at' => [
                        'type' => 'date',
                        'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                    ],
                ]
            ]
        ]
    ];
    try {
        echo '創建一個索引' . PHP_EOL;
        //創建庫索引
        $response = $client->indices()->create($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=index
if (isset($_GET['do']) && $_GET['do'] == 'index') {
    /**
     * 創建商品數據
     */
    // 商品數據寫入 DB
    $title = ['蘋果', '香蕉', '橙子', '馬蹄', '草莓'];
    $data = [
        'title' => $title[array_rand($title, 1)],
        'long_title' => 'long_title' . $nowTime,
        'sku' => rand(1, 3),
        'money' => rand(1, 10000000),
        'sales' => rand(1, 1000),
        'created_at' => $nowTime,
        'updated_at' => $nowTime
    ];
    $productId = $db->table('product')->insert($data);
    if ($productId) {
        $body = array_merge($data, [
            'product_id' => $productId,
        ]);
        $params = [
            'body' => $body,
            'id' => $productId, //(提供id,則會更新對應id的記錄。若沒有提供,則會生成一條文檔)#可以手動指定id,也可以不指定隨機生成
            'index' => ES_INDEX,
        ];
        try {
            // 商品數據寫入 ES
            $response = $client->index($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=delete&id=14
if (isset($_GET['do']) && $_GET['do'] == 'delete' && isset($_GET['id'])) {
    $res = $db->table('product')->where("id = {$_GET['id']}")->delete();
    if ($res) {
        //刪除一個文檔
        $params = [
            'index' => ES_INDEX,
            'id' => $_GET['id']
        ];
        try {
            echo '刪除一個文檔' . PHP_EOL;
            // 商品數據寫入 ES
            $response = $client->delete($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=deleteindex
if (isset($_GET['do']) && $_GET['do'] == 'deleteindex') {
    //刪除一個索引
    $params = [
        'index' => ES_INDEX,
        //'index' => 'test_index*', //刪除以test_index開始的所有索引
    ];
    try {
        echo '刪除一個索引' . PHP_EOL;
        // 商品數據寫入 ES
        $response = $client->indices()->delete($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=update&id=14
if (isset($_GET['do']) && $_GET['do'] == 'update' && isset($_GET['id'])) {
    $title = ['蘋果', '香蕉', '橙子', '馬蹄', '草莓'];
    $data = [
        'title' => $title[array_rand($title, 1)],
        'long_title' => 'long_title' . $nowTime,
        'sku' => rand(1, 3),
        'money' => rand(1, 10000000),
        'sales' => rand(1, 1000),
        'created_at' => $nowTime,
        'updated_at' => $nowTime
    ];
    $res = $db->table('product')->where("id = {$_GET['id']}")->update($data);
    if ($res) {
        //更新一個文檔
        $body = array_merge($data, [
            'product_id' => $_GET['id'],
        ]);
        $params = [
            'body' => [
                'doc' => $body
            ],
            'id' => $_GET['id'],
            'index' => ES_INDEX,
        ];
        try {
            echo '更新一個文檔' . PHP_EOL;
            // 商品數據寫入 ES
            $response = $client->update($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=get&id=13
if (isset($_GET['do']) && $_GET['do'] == 'get' && isset($_GET['id'])) {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '獲取一個文檔' . PHP_EOL;
        $response = $client->get($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=mget&ids=24,25
if (isset($_GET['do']) && $_GET['do'] == 'mget' && isset($_GET['ids'])) {
    $params = [
        'index' => ES_INDEX,
        'body' => ['ids' => explode(',', $_GET['ids'])],
        '_source' => ['money', 'title'], // 請求指定的字段
    ];
    try {
        echo '獲取多個文檔' . PHP_EOL;
        $response = $client->mget($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=search
if (isset($_GET['do']) && $_GET['do'] == 'search') {
    //$query1.相當於sql語句:select * from product where title='香蕉' limit 0,10;
    $query1 = [
        'match' => [
            'title' => '香蕉'
        ]
    ];
    $params = [
        'index' => [ES_INDEX],      //['my_index1', 'my_index2'],可以通過這種形式進行跨庫查詢
        '_source' => ['money', 'title','sku'], // 請求指定的字段
        'body' => [
            'query' => $query1,
            'sort' => [['money' => ['order' => 'desc']]],     //排序
            'from' => 0,
            'size' => 10
        ]
    ];
    try {
        echo '搜索文檔' . PHP_EOL;
        $response = $client->search($params);
        pr($response);
        $data = $response['hits']['hits'];
        pr($data);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=putSettings
if (isset($_GET['do']) && $_GET['do'] == 'putSettings') {
    $params = [
        'index' => ES_INDEX,
        'body' => [
            'settings' => [
                'number_of_replicas' => 2,
            ]
        ]
    ];
    try {
        echo '更改索引的配置參數' . PHP_EOL;
        $response = $client->indices()->putSettings($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getSource&id=24
if (isset($_GET['do']) && $_GET['do'] == 'getSource') {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '獲取指定文檔的sourse內容(即字段的信息)' . PHP_EOL;
        $response = $client->getSource($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getSettings
if (isset($_GET['do']) && $_GET['do'] == 'getSettings') {
    $params = [
        'index' => [ES_INDEX],
    ];
    try {
        echo '更改索引的配置參數' . PHP_EOL;
        $response = $client->indices()->getSettings($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    } catch (Exception $e) {
        pr($e);
    }
}
//http://localhost/es/test.php?do=exists&id=13
if (isset($_GET['do']) && $_GET['do'] == 'exists' && isset($_GET['id'])) {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '判斷文檔是否存在' . PHP_EOL;
        $response = $client->exists($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getMapping
if (isset($_GET['do']) && $_GET['do'] == 'getMapping') {
    $params = [
        'index' => ES_INDEX,
    ];
    try {
        echo '獲取查看映射' . PHP_EOL;
        $response = $client->indices()->getMapping($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}

function pr($response)
{
    echo '<pre>';
    print_r($response);
    echo '</pre>';
}
View Code

創建Db.php

<?php
header("Content-Type:text/html;charset=utf-8");

/**
 *php操作mysql的工具類
 */
class Db
{
    private $_db = null;//數據庫連接句柄
    private $_table = null;//表名
    private $_where = null;//where條件
    private $_order = null;//order排序
    private $_limit = null;//limit限定查詢
    private $_group = null;//group分組
    private $_configs = array(
        'hostname' => 'localhost',
        'dbname' => 'test',
        'username' => 'root',
        'password' => 'root'
    );//數據庫配置

    /**
     * 構造函數,連接數據庫
     */
    public function __construct()
    {
        $link = $this->_db;
        if (!$link) {
            $db = mysqli_connect($this->_configs['hostname'], $this->_configs['username'], $this->_configs['password'], $this->_configs['dbname']);
            mysqli_query($db, "set names utf8");
            if (!$db) {
                $this->ShowException("錯誤信息" . mysqli_connect_error());
            }
            $this->_db = $db;
        }
    }

    /**
     * 獲取所有數據
     *
     * @param <type> $table The table
     *
     * @return     boolean  All.
     */
    public function getAll($table = null)
    {
        $link = $this->_db;
        if (!$link) return false;
        $sql = "SELECT * FROM {$table}";
        $data = mysqli_fetch_all($this->execute($sql));
        return $data;
    }

    public function table($table)
    {
        $this->_table = $table;
        return $this;
    }

    /**
     * 實現查詢操作
     *
     * @param string $fields The fields
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function select($fields = "*")
    {
        $fieldsStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($fields)) {
            $fieldsStr = implode(',', $fields);
        } elseif (is_string($fields) && !empty($fields)) {
            $fieldsStr = $fields;
        }
        $sql = "SELECT {$fields} FROM {$this->_table} {$this->_where} {$this->_order} {$this->_limit}";
        $data = mysqli_fetch_all($this->execute($sql));
        return $data;
    }

    /**
     * order排序
     *
     * @param string $order The order
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function order($order = '')
    {
        $orderStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_string($order) && !empty($order)) {
            $orderStr = "ORDER BY " . $order;
        }
        $this->_order = $orderStr;
        return $this;
    }

    /**
     * where條件
     *
     * @param string $where The where
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function where($where = '')
    {
        $whereStr = '';
        $link = $this->_db;
        if (!$link) return $link;
        if (is_array($where)) {
            foreach ($where as $key => $value) {
                if ($value == end($where)) {
                    $whereStr .= "`" . $key . "` = '" . $value . "'";
                } else {
                    $whereStr .= "`" . $key . "` = '" . $value . "' AND ";
                }
            }
            $whereStr = "WHERE " . $whereStr;
        } elseif (is_string($where) && !empty($where)) {
            $whereStr = "WHERE " . $where;
        }
        $this->_where = $whereStr;
        return $this;
    }

    /**
     * group分組
     *
     * @param string $group The group
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function group($group = '')
    {
        $groupStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($group)) {
            $groupStr = "GROUP BY " . implode(',', $group);
        } elseif (is_string($group) && !empty($group)) {
            $groupStr = "GROUP BY " . $group;
        }
        $this->_group = $groupStr;
        return $this;
    }

    /**
     * limit限定查詢
     *
     * @param string $limit The limit
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function limit($limit = '')
    {
        $limitStr = '';
        $link = $this->_db;
        if (!$link) return $link;
        if (is_string($limit) || !empty($limit)) {
            $limitStr = "LIMIT " . $limit;
        } elseif (is_numeric($limit)) {
            $limitStr = "LIMIT " . $limit;
        }
        $this->_limit = $limitStr;
        return $this;
    }

    /**
     * 執行sql語句
     *
     * @param <type> $sql The sql
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function execute($sql = null)
    {
        $link = $this->_db;
        if (!$link) return false;
        $res = mysqli_query($this->_db, $sql);
        if (!$res) {
            $errors = mysqli_error_list($this->_db);
            $this->ShowException("報錯啦!<br/>錯誤號:" . $errors[0]['errno'] . "<br/>SQL錯誤狀態:" . $errors[0]['sqlstate'] . "<br/>錯誤信息:" . $errors[0]['error']);
            die();
        }
        return $res;
    }

    /**
     * 插入數據
     *
     * @param <type> $data The data
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function insert($data)
    {
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($data)) {
            $keys = '';
            $values = '';
            foreach ($data as $key => $value) {
                $keys .= "`" . $key . "`,";
                $values .= "'" . $value . "',";
            }
            $keys = rtrim($keys, ',');
            $values = rtrim($values, ',');
        }
        $sql = "INSERT INTO `{$this->_table}`({$keys}) VALUES({$values})";
        mysqli_query($this->_db, $sql);
        $insertId = mysqli_insert_id($this->_db);
        return $insertId;
    }

    /**
     * 更新數據
     *
     * @param <type> $data The data
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function update($data)
    {
        $link = $this->_db;
        if (!$link) return $link;
        if (is_array($data)) {
            $dataStr = '';
            foreach ($data as $key => $value) {
                $dataStr .= "`" . $key . "`='" . $value . "',";
            }
            $dataStr = rtrim($dataStr, ',');
        }
        $sql = "UPDATE `{$this->_table}` SET {$dataStr} {$this->_where} {$this->_order} {$this->_limit}";
        $res = $this->execute($sql);
        return $res;
    }

    /**
     * 刪除數據
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function delete()
    {
        $link = $this->_db;
        if (!$link) return $link;
        $sql = "DELETE FROM `{$this->_table}` {$this->_where}";
        $res = $this->execute($sql);
        return $res;
    }

    /**
     * 異常信息輸出
     *
     * @param <type> $var The variable
     */
    private function ShowException($var)
    {
        if (is_bool($var)) {
            var_dump($var);
        } else if (is_null($var)) {
            var_dump(NULL);
        } else {
            echo "<pre style='position:relative;z-index:1000;padding:10px;border-radius:5px;background:#F5F5F5;border:1px solid #aaa;font-size:14px;line-height:18px;opacity:0.9;'>" . print_r($var, true) . "</pre>";
        }
    }

}
View Code

注意:

創建好索引之后,我們就從 MySQL 將數據同步到 ES,同步的方案有如下三種:

1、可以直接在存儲入 MySQL 之后,就直接寫入 ES。

2、通過 Logstash 定時,從 MySQL 數據庫中拉取數據同步到 ES。

3、可以通過第三方中間件(例如:canal、go-mysql-elasticsearch),拉取 MySQL 的 binlog 日志,之后中間件解析日志將數據同步到 ES。

業務發展到中后時期的時候,可能發現字段越來越多了,這個時候想要刪除一些字段。 但是,在 ES 中的 Mapping 中是不能直接刪除字段的,只能重新創建。 很多情況,我們還是不建議去刪除字段,因為這會增加很多不必要的成本以及帶來的風險。 如果,為了節省存儲空間,Boss 一定要刪除字段。那就按照下面的方法,也是可以實現的。

1、創建一個新的索引

2、創建新的映射關系 mapping

3、將原索引的數據到入到新索引

4、新索引創建原索引一致的別名 5、刪除原索引


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM