Bulk Moving Elasticsearch Indices from localhost with Python

Having several large indices localhost and some limitations that would only allow me POST to a remote server not through a shared resource or via the cloud, I used this script to make a copy of the non-system indices.

from elasticsearch import Elasticsearch, helpers

# Set up connections to the source and destination clusters with authentication
src_es = Elasticsearch(
    hosts=["http://source-cluster-ip:9200"],
    http_auth=("source_username", "source_password")
)
dest_es = Elasticsearch(
    hosts=["http://target-cluster-ip:9200"],
    http_auth=("destination_username", "destination_password")
)

# Step 1: Retrieve all non-system indices from the source cluster with their sizes
indices_stats = src_es.indices.stats(index="*")['indices']
non_system_indices = [
    (index, indices_stats[index]['total']['store']['size_in_bytes'])
    for index in indices_stats if not index.startswith('.')
]

# Step 2: Sort indices by size (smallest to largest)
sorted_indices = sorted(non_system_indices, key=lambda x: x[1])

# Step 3: For each sorted index, get the mapping and settings, create it on the destination cluster, and transfer documents in bulk
for index_name, size in sorted_indices:
    # Get the settings (without defaults) and mappings for the current index from the source cluster
    settings = src_es.indices.get_settings(index=index_name)[index_name]["settings"]
    mappings = src_es.indices.get_mapping(index=index_name)[index_name]["mappings"]

    # Prepare the body to include settings and mappings for the new index
    index_body = {
        "settings": {
            "index": {
                # Copy over relevant settings without unnecessary defaults
                "number_of_shards": settings["index"].get("number_of_shards", 1),
                "number_of_replicas": settings["index"].get("number_of_replicas", 1),
                "analysis": settings["index"].get("analysis", {})  # Include custom analyzers, if any
            }
        },
        "mappings": mappings
    }

    # Create the index with the same settings and mapping on the destination cluster
    if not dest_es.indices.exists(index=index_name):
        dest_es.indices.create(index=index_name, body=index_body)
        print(f"Created index '{index_name}' on the destination cluster with custom analyzers.")

    # Step 4: Transfer documents in bulk from the source index to the destination index
    def bulk_transfer_actions():
        docs = helpers.scan(client=src_es, index=index_name)  # Fetch all documents from the source index
        for doc in docs:
            yield {
                "_op_type": "index",
                "_index": index_name,
                "_id": doc.get('_id'),  # Access _id correctly
                "_source": doc.get('_source')
            }

    # Use helpers.bulk to transfer all documents in bulk, adjusting batch size as needed
    helpers.bulk(client=dest_es, actions=bulk_transfer_actions(), chunk_size=1000)
    print(f"Transferred all documents from '{index_name}' to the destination cluster in bulk.")

print("Data transfer complete.")

Leave a Reply

Your email address will not be published. Required fields are marked *

Autonomous Navigation

Previous article

The T.A.N.K.