zl程序教程

您现在的位置是:首页 >  其他

当前栏目

中文全文检索技术路线(elasticsearch全文检索、中文分词ik、tika解析文档)

2023-04-18 12:51:32 时间

代码在开源仓库3xxxhttps://github.com/3xxx/engineercms

https://github.com/3xxx/engineercms

总体思路就是用docker安装es和tika服务,在cms里上传word之类文档,用tika解析,得到纯文本,提交给es存储。前端检索,在es里查询,返回高亮文本和结果列表,点击定位到文档打开。

es里安装ik插件,用head和postman或curl进行调试。

因为首次使用postman,es总是返回说缺少body……错误。解决办法是勾选上head里的content-length……

win下的curl命令,也是,要用双引号,不能用单引号。json文件要存成文本文件,在命令里用@文件名.json,不能在命令里直接带上json文件内容提交。

curl -X POST "localhost:9200/customer/_analyze?pretty" -H "Content-Type: application/json" -d@2.json

2.json文件内容:

{
  "analyzer": "ik_max_word",
  "text": "中华人民共和国国歌"
}

中文分词ik放插件里即可,版本一一对应和es。其他没啥。

golang开发需要用到go-elasticserach,或olivere/elastic,它们有什么区别呢,issue里有说明,不是很明白。技术选型很重要,涉及将来的修改,前者是官方的,后者是作者个人维护的,star数后者是前者2倍,但都很庞大的star数。

前者的教程很少,只有它官方的example可以学习。本文用的就是。

tika继续用docker安装。用go-tika来对接。

docker pull apache/tika
docker run -d -p 9998:9998 apache/tika:<tag>

engineercms需要做的就是上传、提交检索数据结构、返回和前端展示……

1.tika识别文档——提取文本数据

	f, err := os.Open("./test.pdf")
	if err != nil {
		log.Fatal(err)
	}
	defer f.Close()

	fmt.Println(f.Name())
	client := tika.NewClient(nil, "http://localhost:9998")
	body, err := client.Parse(context.Background(), f)
	// body, err := client.Detect(context.Background(), f) //application/pdf
	// fmt.Println(err)
	// fmt.Println(body)

	dom, err := goquery.NewDocumentFromReader(strings.NewReader(body))
	if err != nil {
		log.Fatalln(err)
	}

	dom.Find("p").Each(func(i int, selection *goquery.Selection) {
		if selection.Text() != " " || selection.Text() != "
" {
			fmt.Println(selection.Text())
		}
	})

2.es插入n条数据

  // 来自go-elasticsearch的example
  var (
		articles        []*Article
		countSuccessful uint64
		res             *esapi.Response
		// err error
	)

	log.Printf(
		"x1b[1mBulkIndexerx1b[0m: documents [%s] workers [%d] flush [%s]",
		humanize.Comma(int64(numItems)), numWorkers, humanize.Bytes(uint64(flushBytes)))
	log.Println(strings.Repeat("▁", 65))

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	//
	// Use a third-party package for implementing the backoff function
	//
	retryBackoff := backoff.NewExponentialBackOff()
	// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	//
	// Create the Elasticsearch client——0.初始化一个client
	//
	// NOTE: For optimal performance, consider using a third-party HTTP transport package.
	//       See an example in the "benchmarks" folder.
	//
	es, err := elasticsearch.NewClient(elasticsearch.Config{
		// Retry on 429 TooManyRequests statuses
		RetryOnStatus: []int{502, 503, 504, 429},
		// Configure the backoff function
		RetryBackoff: func(i int) time.Duration {
			if i == 1 {
				retryBackoff.Reset()
			}
			return retryBackoff.NextBackOff()
		},
		// Retry up to 5 attempts
		MaxRetries: 5,
	})
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	//
	// Create the BulkIndexer——1.建立索引,相当于mysql的建表
	//
	// NOTE: For optimal performance, consider using a third-party JSON decoding package.
	//       See an example in the "benchmarks" folder.
	//
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:         indexName,        // The default index name
		Client:        es,               // The Elasticsearch client
		NumWorkers:    numWorkers,       // The number of worker goroutines
		FlushBytes:    int(flushBytes),  // The flush threshold in bytes
		FlushInterval: 30 * time.Second, // The periodic flush interval
	})
	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}
	// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

	// Generate the articles collection——2.构造一批文档,
	//
	names := []string{"Alice", "John", "Mary"}
	for i := 1; i <= numItems; i++ {
		articles = append(articles, &Article{
			ID:        i,
			Title:     strings.Join([]string{"Title", strconv.Itoa(i)}, " "),
			Body:      "Lorem ipsum dolor sit amet...",
			Published: time.Now().Round(time.Second).UTC().AddDate(0, 0, i),
			Author: Author{
				FirstName: names[rand.Intn(len(names))],
				LastName:  "Smith",
			},
		})
		log.Printf(articles[i-1].Body)
	}
	log.Printf("→ Generated %s articles", humanize.Comma(int64(len(articles))))

	// Re-create the index——下面这个先删除以前建立的索引,实际没意义
	if res, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)); err != nil || res.IsError() {
		log.Fatalf("Cannot delete index: %s", err)
	}
	res.Body.Close()
	res, err = es.Indices.Create(indexName)
	if err != nil {
		log.Fatalf("Cannot create index: %s", err)
	}
	if res.IsError() {
		log.Fatalf("Cannot create index: %s", res)
	}
	res.Body.Close()

	start := time.Now().UTC()

	// Loop over the collection
	for _, a := range articles {
		// Prepare the data payload: encode article to JSON
		//
		data, err := json.Marshal(a)
		if err != nil {
			log.Fatalf("Cannot encode article %d: %s", a.ID, err)
		}
		log.Printf(string(data))
		// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
		//
		// Add an item to the BulkIndexer——3.批量添加记录
		// 
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				// Action field configures the operation to perform (index, create, delete, update)
				Action: "index",
				// DocumentID is the (optional) document ID
				DocumentID: strconv.Itoa(a.ID),
				// Body is an `io.Reader` with the payload
				Body: bytes.NewReader(data),
				// OnSuccess is called for each successful operation
				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					atomic.AddUint64(&countSuccessful, 1)
				},
				// OnFailure is called for each failed operation
				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf("ERROR: %s", err)
					} else {
						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
					}
				},
			},
		)
		if err != nil {
			log.Fatalf("Unexpected error: %s", err)
		}
		// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
	}

	// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
	// Close the indexer
	//
	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}
	// <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

	biStats := bi.Stats()

	// Report the results: number of indexed docs, number of errors, duration, indexing rate
	//
	log.Println(strings.Repeat("▔", 65))

	dur := time.Since(start)

	if biStats.NumFailed > 0 {
		log.Fatalf(
			"Indexed [%s] documents with [%s] errors in %s (%s docs/sec)",
			humanize.Comma(int64(biStats.NumFlushed)),
			humanize.Comma(int64(biStats.NumFailed)),
			dur.Truncate(time.Millisecond),
			humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
		)
	} else {
		log.Printf(
			"Sucessfuly indexed [%s] documents in %s (%s docs/sec)",
			humanize.Comma(int64(biStats.NumFlushed)),
			dur.Truncate(time.Millisecond),
			humanize.Comma(int64(1000.0/float64(dur/time.Millisecond)*float64(biStats.NumFlushed))),
		)
	}

3.查询

// 同样来自example
// 3. Search for the indexed documents
	// Build the request body.——1.先构造一个查询结构体
	var buf bytes.Buffer
	query := map[string]interface{}{
		"query": map[string]interface{}{
			"match": map[string]interface{}{
				// "title": "Title 10",
				"author.first_name": "John",
			},
		},
	}
	// query := map[string]interface{}{
	// 	"query": map[string]interface{}{
	// 		"match_all": map[string]interface{}{},
	// 	},
	// }

	if err := json.NewEncoder(&buf).Encode(query); err != nil {
		log.Fatalf("Error encoding query: %s", err)
	}
	// Perform the search request.——2.查询语句
	res, err := es.Search(
		es.Search.WithContext(context.Background()),
		es.Search.WithIndex(indexName), // default indexname
		es.Search.WithBody(&buf),
		es.Search.WithTrackTotalHits(true),
		es.Search.WithPretty(),
	)

	// const searchAll = `
	// "query" : { "match_all" : {} },
	// "size" : 25,
	// "sort" : { "published" : "desc", "_doc" : "asc" }`

	// var b strings.Builder
	// b.WriteString("{
")
	// b.WriteString(searchAll)
	// b.WriteString("
}")
	// strings.NewReader(b.String())

	// res, err = es.Search(
	// 	es.Search.WithIndex("test-bulk-example"),
	// 	es.Search.WithBody(strings.NewReader(b.String())),
	// 	// es.Search.WithQuery("{{{one OR two"), // <-- Uncomment to trigger error response
	// )

	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()
	log.Printf(res.String())// 打印查询结果
	if res.IsError() {
		var e map[string]interface{}
		if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
			log.Fatalf("Error parsing the response body: %s", err)
		} else {
			// Print the response status and error information.
			log.Fatalf("[%s] %s: %s",
				res.Status(),
				e["error"].(map[string]interface{})["type"],
				e["error"].(map[string]interface{})["reason"],
			)
		}
	}
	var r map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		log.Fatalf("Error parsing the response body: %s", err)
	}
	// Print the response status, number of results, and request duration.
	log.Printf(
		"[%s] %d hits; took: %dms",
		res.Status(),
		int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
		int(r["took"].(float64)),
	)
	// Print the ID and document source for each hit.
	for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
		log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
	}

	log.Println(strings.Repeat("=", 37))

查询输出结果如下:"author.first_name": "John",

[200 OK] 4 hits; took: 1ms
 * ID=2, map[author:map[first_name:John last_name:Smith] body:Lorem ipsum dolor sit amet... id:%!s(float64=2) published:2021-10-29T11:34:32Z title:Title 2]
 * ID=3, map[author:map[first_name:John last_name:Smith] body:Lorem ipsum dolor sit amet... id:%!s(float64=3) published:2021-10-30T11:34:32Z title:Title 3]
 * ID=7, map[author:map[first_name:John last_name:Smith] body:Lorem ipsum dolor sit amet... id:%!s(float64=7) published:2021-11-03T11:34:32Z title:Title 7]
 * ID=8, map[author:map[first_name:John last_name:Smith] body:Lorem ipsum dolor sit amet... id:%!s(float64=8) published:2021-11-04T11:34:32Z title:Title 8

调试的时候,如上述代码,先删除旧的index,然后新建index,再插入数据。坑:我把这些都放在在一段代码中,删除索引,新建索引,插入数据,立刻进行查询,始终获得不了结果。因为来不及查到数据。

下面是example中的xkcdsearch例子跑起来的效果。

下面这个是engineercms的电子规范全文检索效果:

通过全文检索,定位到具体规范,打开规范,再次搜索关键字。