实例讲解go操作ES索引增删查,以及封装查询工具
如果对ES还不甚了解,或者还无环境,可先移步 ES小白开局
本文将使用go语言,从连接ES服务器开始,讲解操作索引,以及具体查询案例。
话不多说,开干
连接ES
先按照插件 github.com/olivere/elastic/v7
由于我搭建的是docker环境,所以需要设置不用检测变量。
func EsConnect() (*elastic.Client, error) {
client, err := elastic.NewClient(elastic.SetURL("http://124.223.47.250:9200"), elastic.SetSniff(false), elastic.SetBasicAuth("", "")) // false设置:因为我们es使用docker搭建,和直连ip不一样,所以不用检测
if err != nil {
return nil, err
}
return client, nil
}
- setBasicAuth中是设置账号密码,如果为空,可以直接设置空字符串即可。
ES认证
大多数情况下,我们都需要进行认证的,不然人家直接连接上来了。除非是es客户端和服务端在同一实例下,端口由防火墙管控着。
如果es是对外提供的服务,那肯定是需要认证的,具体的认证可以移步给docker创建ES容器添加认证
如果设置了账号密码,则上述连接,需要添加账号密码。
索引相关操作
前期准备
我们一般是基于某个业务,做一个索引,比如用户User,就使用一个user结构体,实现对应的接口,如下:
type ESInfo interface {
IndexName() string // 索引名称
Mapping() string // 索引对应的mapping文件
}
- mapping文件是处理数据的方式和规则方面做一些限制
比如我们的例子,user结构体,如下:
type User struct {
ID int `json:"id"`
Name string `json:"name"`
Category string `json:"category"`
Info string `json:"info"`
CreateAt string `json:"createdAt"`
}
func (u *User) IndexName() string {
return "user_index"
}
func (u *User) Mapping() string {
return `{
"mappings": {
"properties": {
"category": {
"type": "keyword"
},
"id": {
"type": "long"
},
"info": {
"type": "text"
},
"name": {
"type": "keyword"
},
"createdAt": {
"type": "date",
"null_value": "null",
"format": "[yyyy-MM-dd HH:mm:ss]"
}
}
}
}`
}
索引操作
有了前面的准备,我们新起一个package为index,作为索引相关操作的包,文件为operate.go
func Create(client *elastic.Client, info contracts.ESInfo) error {
_, err := client.CreateIndex(info.IndexName()).BodyString(info.Mapping()).Do(context.Background())
return err
}
func Exist(client *elastic.Client, indexName string) bool {
exists, _ := client.IndexExists(indexName).Do(context.Background())
return exists
}
func Delete(client *elastic.Client, indexName string) error {
_, err := client.DeleteIndex(indexName).Do(context.Background())
return err
}
如上,分别是创建索引,判断索引是否存在,删除索引,也就可以直接使用index.Create(),index.Exist(),index.Delete(),有了包名作为前缀,方法名也就不需要前缀了,这样看上去简介。希望都有良好的代码习惯。
索引注意事项:
- 创建索引不支持幂等,也就是索引存在了就不能重新创建。
- 所以在创建索引的时候,在之前判断索引是否存在,有两种方式处理,一是存在即不创建,一是存在即删除再创建。具体要看业务如何处理。
如下代码:
user := models.User{}
exist := indexs.Exist(esClient, "user_index")
/*if !exist {
// 所以不存在,则创建
err = index.Create(esClient, &user)
if err != nil {
log.Fatal(err)
}
}*/
// 或者存在,删除再创建
if exist {
index.Delete(esClient, "user_index")
}
err = index.Create(esClient, &user)
if err != nil {
log.Fatal(err)
}
- 删除的时候注意,如果不存在索引,则会报404的error,可以针对性的捕获异常。
文档操作
文档操作,主要是增删改查,其中查为重点。
我们可以新起一个package叫做doc,新建一个operage.go文件。
文档创建
// DocCreateOrUpdate 创建文档信息
func DocCreateOrUpdate(esClient *elastic.Client, indexName, id string, obj interface{}) (*elastic.IndexResponse, error) {
if id == "" {
return esClient.Index().Index(indexName).BodyJson(obj).Do(context.Background()) // id随机
}
return esClient.Index().Index(indexName).Id(id).BodyJson(obj).Do(context.Background()) // id为user的id,支持幂等,有则更新,无则创建
}
// DocBatchCreate 批量加入数据,sources为map,key为id,value为加入的对象 ,只有id未创建的才加入,已有的id不修改
func DocBatchCreate(esClient *elastic.Client, indexName string, sources map[string]interface{}) (*elastic.BulkResponse, error) {
bulk := esClient.Bulk().Index(indexName).Refresh("true")
for id, obj := range sources {
req := elastic.NewBulkCreateRequest().Id(id).Doc(obj)
bulk.Add(req)
}
return bulk.Do(context.Background())
}
- 如上,文档可以创建单个文件,当文档id不存在的时候,则随机一个文档id。
- 创建单个文档的时候,如id还未创建,则创建,如id已经创建了,则全量修改当前文档。所以取名为CreateOrUpdate
- 在批量创建的时候,批量数据存在map中,id为key,value为结构体对象。
- 与批量不同,如id未创建过文档则创建,已创建过文档则忽略,并不更新。
例子:
user := models.User{
ID: 1,
Name: "张三",
Category: "程序员",
Info: "这是一个匿名的首选,任劳任怨,优秀的代号,张三同学。",
CreateAt: time.Now().Format("2006-01-02 15:04:05"),
}
// 单个添加
_, err = common.DocCreateOrUpdate(esClient, user.IndexName(), strconv.Itoa(user.ID), user)
if err != nil {
log.Fatal(err)
}
// 批量添加
uMap := make(map[string]interface{}, 0)
for i := 0; i < 11; i++ {
user := models.User{
ID: i + 100,
Name: fmt.Sprintf("张%d", i+10),
Category: "测试工程师",
Info: "吾道一以贯之",
CreateAt: time.Now().Format("2006-01-02 15:04:05"),
}
uMap[strconv.Itoa(user.ID)] = user
}
_, err = common.DocBatchCreate(esClient, "user_index", uMap)
if err != nil {
log.Fatal(err)
}
文档更新
其实刚刚的创建,也类似于更新了,但是有时候想更新文档中的某个字段,或者某几个字段,如下:
// DocUpdate 更新某些字段,在map中定义,而不是全量更新
func DocUpdate(esClient *elastic.Client, indexName, id string, updateField map[string]interface{}) (*elastic.UpdateResponse, error) {
return esClient.Update().Index(indexName).Id(id).Doc(updateField).Do(context.Background())
}
文档删除
// DocDeleteById 删除文档,如果id不存在,会报错。 resp的result为not_found
func DocDeleteById(esClient *elastic.Client, indexName, id string) (*elastic.DeleteResponse, error) {
return esClient.Delete().Index(indexName).Id(id).Refresh("true").Do(context.Background()) // refresh 表示是否立即删除,立即刷新索引
}
// DocBatchDeleteId 批量删除文档,如果id不存在,error也是nil,
// DocBatchDeleteId,分别对应ids的删除情况,如正常删除,result为delete,如id找不到则是not_found
func DocBatchDeleteId(esClient *elastic.Client, indexName string, ids []string) (*elastic.BulkResponse, error) {
bulk := esClient.Bulk().Index(indexName).Refresh("true") // 构建一个桶,然后一次性提交上去
for _, id := range ids {
req := elastic.NewBulkDeleteRequest().Id(id)
bulk.Add(req)
}
return bulk.Do(context.Background())
}
文档的删除也分为单个删除和批量删除
- 单个删除,如id不存在则会报错,在上层可以捕获,针对处理。
// 删除文档,如果id不存在,会报错。 resp的result为not_found
// 所以删除的时候,需要先判断id是否存在,或者兼容下错误信息
delResp, err := common.DocDeleteById(esClient, user.IndexName(), "dddd")
if err != nil && (delResp != nil && delResp.Result != "not_found") {
log.Fatal(err)
}
- 批量删除的时候,id不存在,error为nil,批量在es中都会新建一个bulk,然后统一提交。
文档查询
查询为核心操作,我们可以新建一个query.go的文件。
定义查询的结构体,以方便返回:
type QueryInfo struct {
Id string
Source json.RawMessage // 方便上层 unmarshal
}
分页查询索引数据
// DocQueryListPage 分页查询索引数据,以json返回 分页公式 from=(page-1)*size
func DocQueryListPage(esClient *elastic.Client, from, size int, indices ...string) ([]QueryInfo, error) {
query := elastic.NewBoolQuery()
return queryListPage(esClient, query, from, size, indices...)
}
该查询会把所有的数据返回,相当于select * 了,不建议这样,哪怕是有分页。
通过字段,精准匹配
// DocQueryTermListPage 通过字段,精确匹配 分页显示,
// 字段的类型一定要是keyword才可以,在mapping中配置的,如果是text则不行,text只能模糊匹配
// 但是如果配置了fields的话,text字段可以通过field.keyword的方式,进行精确匹配
func DocQueryTermListPage(esClient *elastic.Client, from, size int, field, key string, indices ...string) ([]QueryInfo, error) {
query := elastic.NewTermQuery(field, key)
return queryListPage(esClient, query, from, size, indices...)
}
模糊匹配
// DocQueryMatchListPage 模糊匹配,主要是查询text类型的字段,当然也是可以查询非text的,但是match查询keyword字段,需要填写完整
func DocQueryMatchListPage(esClient *elastic.Client, from, size int, field, key string, indices ...string) ([]QueryInfo, error) {
query := elastic.NewMatchQuery(field, key)
return queryListPage(esClient, query, from, size, indices...)
}
公共的查询 queryListPage
func queryListPage(esClient *elastic.Client, query elastic.Query, from, size int, indices ...string) ([]QueryInfo, error) {
res, err := esClient.Search(indices...).Query(query).From(from).Size(size).Do(context.Background())
if err != nil {
return nil, err
}
sources := make([]QueryInfo, 0, res.Hits.TotalHits.Value)
for _, hit := range res.Hits.Hits {
sources = append(sources, QueryInfo{
Id: hit.Id,
Source: hit.Source,
})
}
return sources, nil
}
最后
好了,本文主要是最最基础的go操作es,如需了解更多es相关,可看上述连接es小白开局。后续有时间会继续更新出更多查询相关,比如聚合、排序等等。