Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## [Unreleased 3.x]
### Added
- Added BulkIngester helper for efficient bulk operations with buffering, retries, and backpressure. Ported from elasticsearch-java (commit e7120d4) ([#1809](https://github.com/opensearch-project/opensearch-java/pull/1809))

### Dependencies

Expand Down
37 changes: 37 additions & 0 deletions guides/bulk.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ The [Bulk API](https://opensearch.org/docs/latest/api-reference/document-apis/bu

## Bulk Indexing

The `BulkIngester` is a helper class that simplifies bulk indexing by automatically buffering operations and flushing them to OpenSearch based on configurable thresholds. It provides:

- Automatic flushing based on number of operations, total size in bytes, or time interval
- Backpressure control to prevent overwhelming the cluster
- Automatic retries with configurable backoff policies for failed operations
- Thread-safe concurrent operation

```java
String indexName = "sample-index";

// Create a BulkIngester with custom settings
BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(client)
.maxOperations(1000) // Flush every 1000 operations
.flushInterval(5, TimeUnit.SECONDS) // Or every 5 seconds
.maxConcurrentRequests(2) // Allow 2 concurrent bulk requests
);

// Add operations - they are automatically buffered and flushed
IndexData doc1 = new IndexData("Document 1", "The text of document 1");
ingester.add(op -> op.index(i -> i.index(indexName).id("id1").document(doc1)));

IndexData doc2 = new IndexData("Document 2", "The text of document 2");
ingester.add(op -> op.index(i -> i.index(indexName).id("id2").document(doc2)));

IndexData doc3 = new IndexData("Document 3", "The text of document 3");
ingester.add(op -> op.index(i -> i.index(indexName).id("id3").document(doc3)));

// Close the ingester - this flushes any remaining buffered operations
ingester.close();
```

[IndexData](../samples/src/main/java/org/opensearch/client/samples/util/IndexData.java) refers to sample data class.

You can find a working sample of the above code in [BulkIngesterBasics.java](../samples/src/main/java/org/opensearch/client/samples/BulkIngesterBasics.java).


## Bulk requests

```java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import org.opensearch.client.opensearch.security.OpenSearchSecurityAsyncClient;
import org.opensearch.client.opensearch.snapshot.OpenSearchSnapshotAsyncClient;
import org.opensearch.client.opensearch.tasks.OpenSearchTasksAsyncClient;
import org.opensearch.client.opensearch.ubi.OpenSearchUbiAsyncClient;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportOptions;
Expand Down Expand Up @@ -212,6 +213,10 @@ public OpenSearchTasksAsyncClient tasks() {
return new OpenSearchTasksAsyncClient(this.transport, this.transportOptions);
}

public OpenSearchUbiAsyncClient ubi() {
return new OpenSearchUbiAsyncClient(this.transport, this.transportOptions);
}

// ----- Endpoint: clear_scroll

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.opensearch.client.opensearch.security.OpenSearchSecurityClient;
import org.opensearch.client.opensearch.snapshot.OpenSearchSnapshotClient;
import org.opensearch.client.opensearch.tasks.OpenSearchTasksClient;
import org.opensearch.client.opensearch.ubi.OpenSearchUbiClient;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportOptions;
Expand Down Expand Up @@ -211,6 +212,10 @@ public OpenSearchTasksClient tasks() {
return new OpenSearchTasksClient(this.transport, this.transportOptions);
}

public OpenSearchUbiClient ubi() {
return new OpenSearchUbiClient(this.transport, this.transportOptions);
}

// ----- Endpoint: clear_scroll

/**
Expand Down
Loading
Loading