Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added bulk samples and explained error handling. #448

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions guides/bulk.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
- [Bulk Indexing](#bulk-indexing)
- [Use a Helper](#use-a-helper)
- [Line-Delimited JSON](#line-delimited-json)
- [Bulk Helper](#bulk-helper)

# Bulk Indexing

The [Bulk API](https://opensearch.org/docs/latest/api-reference/document-apis/bulk/) lets you add, update, or delete multiple documents in a single request.

## Line-Delimited JSON

The `bulk` API accepts line-delimited JSON. This method requires the caller to evaluate the return value and parse errors in the case of a failure or partial success. See [samples/bulk/bulk-ld.py](../samples/bulk/bulk-ld.py) for a working sample.

```python
from opensearchpy import OpenSearch

Expand All @@ -20,27 +25,47 @@ docs = '''
'''

response = client.bulk(docs)
print(response)
if response["errors"]:
print(f"There were errors!")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```

## Use a Helper
The client can also serialize an array of data into bulk-delimited JSON for you. See [samples/bulk/bulk-array.py](../samples/bulk/bulk-array.py) for a working sample.

```python
data = [
{ "index": { "_index": "index-2022-06-08", "_id": 1 }}
{ "name": "foo"}
{ "index": { "_index": "index-2022-06-09", "_id": 2 }}
{ "name": "bar"}
{ "index": { "_index": "index-2022-06-10", "_id": 3 }}
{ "name": "baz"}
]

response = client.bulk(data)
if response["errors"]:
print(f"There were errors!")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")
```

## Bulk Helper

A helper can generate the line-delimited JSON for you from a Python array that contains `_index` and `_id` fields, and parse errors. The `helpers.bulk` implementation will raise `BulkIndexError` if any error occurs. This may indicate a partially successful result. See [samples/bulk/bulk-helpers.py](../samples/bulk/bulk-helpers.py) for a working sample.

```python
from opensearchpy import OpenSearch, helpers

client = OpenSearch(...)

docs = []
def generate_data():
mywords = ['foo', 'bar', 'baz']
for index, word in enumerate(mywords):
docs.append({
"_index": "mywords",
"word": word,
"_id": index
})
return docs

response = helpers.bulk(client, generate_data(), max_retries=3)
docs = [
{ "_index": "words", "_id": "word1", word: "foo" },
{ "_index": "words", "_id": "word2", word: "bar" },
{ "_index": "words", "_id": "word3", word: "baz" },
]

response = helpers.bulk(client, docs, max_retries=3)
print(response)
```

64 changes: 64 additions & 0 deletions samples/bulk/bulk-array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = []
for i in range(100):
data.append({ "index": {"_index": index_name, "_id": i }})
data.append({ "value": i })

rc = client.bulk(data)
if rc["errors"]:
print(f"There were errors:")
for item in rc["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")

# delete index
client.indices.delete(index=index_name)

58 changes: 58 additions & 0 deletions samples/bulk/bulk-helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch, helpers

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = []
for i in range(100):
data.append({ "_index": index_name, "_id": i, "value": i })

rc = helpers.bulk(client, data)
print(f"Bulk-inserted {rc[0]} items.")

# delete index
client.indices.delete(index=index_name)

64 changes: 64 additions & 0 deletions samples/bulk/bulk-ld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.

import os
import json

from opensearchpy import OpenSearch

# connect to an instance of OpenSearch

host = os.getenv('HOST', default='localhost')
port = int(os.getenv('PORT', 9200))
auth = (
os.getenv('USERNAME', 'admin'),
os.getenv('PASSWORD', 'admin')
)

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_auth = auth,
use_ssl = True,
verify_certs = False,
ssl_show_warn = False
)

# check whether an index exists
index_name = "my-index"

if not client.indices.exists(index_name):

client.indices.create(index_name,
body={
"mappings":{
"properties": {
"value": {
"type": "float"
},
}
}
}
)

# index data
data = ''
for i in range(100):
data += json.dumps({ "index": {"_index": index_name, "_id": i }}) + "\n"
data += json.dumps({ "value": i }) + "\n"

rc = client.bulk(data)
if rc["errors"]:
print(f"There were errors:")
for item in rc["items"]:
print(f"{item['index']['status']}: {item['index']['error']['type']}")
else:
print(f"Bulk-inserted {len(rc['items'])} items.")

# delete index
client.indices.delete(index=index_name)

Loading