17

search(2)- elasticsearch scala终端:elastic4s

 4 years ago
source link: http://www.cnblogs.com/tiger-xc/p/12538488.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上篇谈到:elasticsearch本身是一个完整的后台系统,对其的操作使用是通过终端api进行的。elasticsearch本身提供了多种编程语言的api,包括java的esjava。而elastic4s是一套基于esjava之上的scala api。

先看看scala 终端 ElasticClient的构建过程:

  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)

先构建JavaClient,JavaClient包嵌了个esjava的RestClient进行具体的操作:

class JavaClient(client: RestClient) extends HttpClient {
...
//send request to elasticsearch
override def send(req: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = {
    if (logger.isDebugEnabled) {
      logger.debug("Executing elastic request {}", Show[ElasticRequest].show(req))
    }

    val l = new ResponseListener {
      override def onSuccess(r: org.elasticsearch.client.Response): Unit = callback(Right(fromResponse(r)))
      override def onFailure(e: Exception): Unit = e match {
        case re: ResponseException => callback(Right(fromResponse(re.getResponse)))
        case t => callback(Left(JavaClientExceptionWrapper(t)))
      }
    }

    val request = new Request(req.method, req.endpoint)
    req.params.foreach { case (key, value) => request.addParameter(key, value) }
    req.entity.map(apacheEntity).foreach(request.setEntity)
//perform actual request sending
    client.performRequestAsync(request, l)
  }
...
}

上面这个RestClient即是elasticsearch提供的javaClient。而elastic4s的具体操作是通过RestClient.performRequestAsync进行的,如下:

public class RestClient implements Closeable {
...
    /**
     * Sends a request to the Elasticsearch cluster that the client points to.
     * The request is executed asynchronously and the provided
     * {@link ResponseListener} gets notified upon request completion or
     * failure. Selects a host out of the provided ones in a round-robin
     * fashion. Failing hosts are marked dead and retried after a certain
     * amount of time (minimum 1 minute, maximum 30 minutes), depending on how
     * many times they previously failed (the more failures, the later they
     * will be retried). In case of failures all of the alive nodes (or dead
     * nodes that deserve a retry) are retried until one responds or none of
     * them does, in which case an {@link IOException} will be thrown.
     *
     * @param request the request to perform
     * @param responseListener the {@link ResponseListener} to notify when the
     *      request is completed or fails
     */
    public void performRequestAsync(Request request, ResponseListener responseListener) {
        try {
            FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
            InternalRequest internalRequest = new InternalRequest(request);
            performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
        } catch (Exception e) {
            responseListener.onFailure(e);
        }
    }
...

}

另外,ElasticProperties是一个javaClient与ES连接的参数结构,包括IP地址:

/**
  * Contains the endpoints of the nodes to connect to, as well as connection properties.
  */
case class ElasticProperties(endpoints: Seq[ElasticNodeEndpoint], options: Map[String, String] = Map.empty)

ElasticProperties包含了ES地址ElasticNodeEndPoint及其它连接参数(如果需要的话),如下:

 it should "support prefix path with trailing slash" in {
    ElasticProperties("https://host1:1234,host2:2345/prefix/path/") shouldBe
      ElasticProperties(Seq(ElasticNodeEndpoint("https", "host1", 1234, Some("/prefix/path")), ElasticNodeEndpoint("https", "host2", 2345, Some("/prefix/path"))))
  }

当elastic4s完成了与elasticsearch的连接之后,就可以把按ES要求组合的Json指令发送到后台ES去执行了。elastic4s提供了一套DSL, 一种嵌入式语言,可以帮助用户更方便的用编程模式来组合ES的指令Json。当然,用户也可以直接把字符类的Json直接通过ElasticClient发送到后台ES。下面是一个简单可以运行的elastic4s示范:

import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.requests.common.RefreshPolicy
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}

object HttpClientExampleApp extends App {

  // you must import the DSL to use the syntax helpers
  import com.sksamuel.elastic4s.ElasticDsl._
  val esjava =  JavaClient(ElasticProperties("http://localhost:9200"))
  val client = ElasticClient(esjava)


    client.execute {
      bulk(
        indexInto("books" ).fields("title" -> "重庆火锅的十种吃法", "content" -> "在这部书里描述了火锅的各种烹饪方式"),
        indexInto("books" ).fields("title" -> "中国火锅大全", "content" -> "本书是全国中式烹饪中有关火锅的各种介绍")
      ).refresh(RefreshPolicy.WaitFor)
    }.await

  val json =
    """
      |{
      |  "query" : {
      |    "match" : {"title" : "火锅"}
      |  }
      |}
      |""".stripMargin
  val response = client.execute {
    search("books").source(json)   //      .matchQuery("title", "火锅")
  }.await

  // prints out the original json
  println(response.result.hits.hits.head.sourceAsString)

  client.close()

}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK