Skip to content

Latest commit

 

History

History
63 lines (47 loc) · 3.99 KB

File metadata and controls

63 lines (47 loc) · 3.99 KB

Elasticsearch Sink

Sink that indexes documents into Elasticsearch.

This Elasticsearch sink only supports indexing JSON documents. It consumes data from an input destination and then indexes it to Elasticsearch. The input data can be a plain json string, or a java.util.Map that represents the JSON. It also accepts the data as the Elasticsearch provided XContentBuilder. However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder. This is provided mainly for direct invocation of the consumer.

Options

The Elasticsearch sink has the following options:

Properties grouped by prefix:

elasticsearch.consumer

async

Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default: false)

batch-size

Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default: 1)

group-timeout

Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default: -1)

id

The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default: <none>)

index

Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default: <none>)

routing

Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default: <none>)

timeout-seconds

Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default: 0)

spring.elasticsearch

connection-timeout

Connection timeout used when communicating with Elasticsearch. (Duration, default: 1s)

password

Password for authentication with Elasticsearch. (String, default: <none>)

path-prefix

Prefix added to the path of every request sent to Elasticsearch. (String, default: <none>)

socket-keep-alive

Whether to enable socket keep alive between client and Elasticsearch. (Boolean, default: false)

socket-timeout

Socket timeout used when communicating with Elasticsearch. (Duration, default: 30s)

uris

Comma-separated list of the Elasticsearch instances to use. (List<String>, default: [http://localhost:9200])

username

Username for authentication with Elasticsearch. (String, default: <none>)

spring.elasticsearch.restclient.sniffer

delay-after-failure

Delay of a sniff execution scheduled after a failure. (Duration, default: 1m)

interval

Interval between consecutive ordinary sniff executions. (Duration, default: 5m)

spring.elasticsearch.restclient.ssl

bundle

SSL bundle name. (String, default: <none>)

Examples of running this sink

  1. From the folder elasticsearch-sink: ./mvnw clean package

  2. cd apps

  3. cd to the proper binder generated app (Kafka or RabbitMQ)

  4. ./mvnw clean package

  5. Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command. docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

  6. Start the middleware (Kafka or RabbitMQ) if it is not already running.

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing

  8. Send some JSON data into the middleware destination. For e.g: {"foo":"bar"}

  9. Verify that the data is indexed: curl localhost:9200/testing/_search