Bulk Moving Elasticsearch Indices from localhost with Python
Having several large indices localhost and some limitations that would only allow me POST to a remote server not through a shared resource or via the cloud, I used this script to make a copy of the non-system indices.
from elasticsearch import Elasticsearch, helpers
# Set up connections to the source and destination clusters with authentication
src_es = Elasticsearch(
hosts=["http://source-cluster-ip:9200"],
http_auth=("source_username", "source_password")
)
dest_es = Elasticsearch(
hosts=["http://target-cluster-ip:9200"],
http_auth=("destination_username", "destination_password")
)
# Step 1: Retrieve all non-system indices from the source cluster with their sizes
indices_stats = src_es.indices.stats(index="*")['indices']
non_system_indices = [
(index, indices_stats[index]['total']['store']['size_in_bytes'])
for index in indices_stats if not index.startswith('.')
]
# Step 2: Sort indices by size (smallest to largest)
sorted_indices = sorted(non_system_indices, key=lambda x: x[1])
# Step 3: For each sorted index, get the mapping and settings, create it on the destination cluster, and transfer documents in bulk
for index_name, size in sorted_indices:
# Get the settings (without defaults) and mappings for the current index from the source cluster
settings = src_es.indices.get_settings(index=index_name)[index_name]["settings"]
mappings = src_es.indices.get_mapping(index=index_name)[index_name]["mappings"]
# Prepare the body to include settings and mappings for the new index
index_body = {
"settings": {
"index": {
# Copy over relevant settings without unnecessary defaults
"number_of_shards": settings["index"].get("number_of_shards", 1),
"number_of_replicas": settings["index"].get("number_of_replicas", 1),
"analysis": settings["index"].get("analysis", {}) # Include custom analyzers, if any
}
},
"mappings": mappings
}
# Create the index with the same settings and mapping on the destination cluster
if not dest_es.indices.exists(index=index_name):
dest_es.indices.create(index=index_name, body=index_body)
print(f"Created index '{index_name}' on the destination cluster with custom analyzers.")
# Step 4: Transfer documents in bulk from the source index to the destination index
def bulk_transfer_actions():
docs = helpers.scan(client=src_es, index=index_name) # Fetch all documents from the source index
for doc in docs:
yield {
"_op_type": "index",
"_index": index_name,
"_id": doc.get('_id'), # Access _id correctly
"_source": doc.get('_source')
}
# Use helpers.bulk to transfer all documents in bulk, adjusting batch size as needed
helpers.bulk(client=dest_es, actions=bulk_transfer_actions(), chunk_size=1000)
print(f"Transferred all documents from '{index_name}' to the destination cluster in bulk.")
print("Data transfer complete.")