Skip to content

Kafka Partitions Assignment Optimizer

Pointers

Usage Example

Set $ZK

In order to run the below example seamlessly, set the zookeeper server(s):

ZK=daplab-wn-22.fri.lan:2181

Retrieve current assignment

$ echo '{"topics": [{"topic": "public.tweets"},{"topic": "trumpet"}], "version":1}' > topics-to-move.json
$ /usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh  --zookeeper $ZK --generate --topics-to-move-json-file topics-to-move.json --broker-list 0,1,2,3
Current partition replica assignment

{"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[2,1]},{"topic":"public.tweets","partition":5,"replicas":[1,0]},{"topic":"public.tweets","partition":0,"replicas":[0,2]},{"topic":"trumpet","partition":0,"replicas":[1,0,2]},{"topic":"public.tweets","partition":3,"replicas":[3,1]},{"topic":"public.tweets","partition":8,"replicas":[0,1]},{"topic":"public.tweets","partition":1,"replicas":[1,3]},{"topic":"public.tweets","partition":7,"replicas":[3,2]},{"topic":"public.tweets","partition":2,"replicas":[2,0]},{"topic":"public.tweets","partition":9,"replicas":[1,2]},{"topic":"public.tweets","partition":4,"replicas":[0,3]}]}

Generate REST payload

Copy the Current partition replica assignment part of the above output and paste it in the partitions attribute in the payload.json file, i.e. something like:

{
    "brokers": "0:a,1:b,2:a,3:b",
    "partitions": {"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]}
}

See the payload attributes definition below.

Call the REST API

POST the previously generated payload:

$ curl -X POST --data @payload.json https://kafka-optimizer.sqooba.io/submit
{"version":1,"partitions":[{"topic":"public.tweets","partition":4,"replicas":[3,2]},{"topic":"public.tweets","partition":5,"replicas":[0,1]},{"topic":"public.tweets","partition":6,"replicas":[1,0]},{"topic":"public.tweets","partition":7,"replicas":[2,1]}]}

You can now copy the output of the command above and paste it into reassignment-file.json file and call:

/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $ZK --reassignment-json-file reassignment-file.json -execute

You can now verify the re-assignment calling:

/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper $ZK --reassignment-json-file reassignment-file.json -verify

Increase Number of Replicas

Another common use case is the increase of the replication factor. This can be done quickly in setting the attribute newReplicationFactor in the payload

{
    "brokers": "0,1,2,3,4,5,6,7",
    "partitions": {"version":1,"partitions":[{"topic":"public.tweets","partition":6,"replicas":[1,3]},{"topic":"public.tweets","partition":5,"replicas":[0,2]},{"topic":"public.tweets","partition":0,"replicas":[3,0]},{"topic":"trumpet","partition":0,"replicas":[1,3,0]},{"topic":"public.tweets","partition":3,"replicas":[2,3]},{"topic":"public.tweets","partition":8,"replicas":[3,2]},{"topic":"public.tweets","partition":7,"replicas":[2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,1]},{"topic":"public.tweets","partition":2,"replicas":[1,2]},{"topic":"public.tweets","partition":9,"replicas":[0,3]},{"topic":"public.tweets","partition":4,"replicas":[3,1]}]},
    "newReplicationFactor": 3
}

Call again the service:

$ curl -X POST --data @payload.json https://kafka-optimizer.sqooba.io/submit
{"version":1,"partitions":[{"topic":"public.tweets","partition":0,"replicas":[3,2,0]},{"topic":"public.tweets","partition":1,"replicas":[0,2,1]},{"topic":"public.tweets","partition":2,"replicas":[1,3,2]},{"topic":"public.tweets","partition":3,"replicas":[2,3,1]},{"topic":"public.tweets","partition":4,"replicas":[3,0,1]},{"topic":"public.tweets","partition":5,"replicas":[0,2,1]},{"topic":"public.tweets","partition":6,"replicas":[1,0,3]},{"topic":"public.tweets","partition":7,"replicas":[2,1,0]},{"topic":"public.tweets","partition":8,"replicas":[3,0,2]},{"topic":"public.tweets","partition":9,"replicas":[0,1,3]}]}

All the partitions have 3 replicas now, and the existing ones are preserved as much as possible (in this example, all the existing replicas are preserved, some leader have been changed though).

No changes

Please note that the API do return only the changes. If the current assignment is already optimal, the API will simply answer with an empty list, as follow:

{"version":1,"partitions":[]}

Payload attributes

Name Mandatory Value Example(s)
brokers yes brokerId[:rack][,brokerId[:rack]]*, i.e. a list of comma-separated broker ids and optional :rack assignment. 0,1,2,3
0:a,1:b,2:a,3:b
partitions yes copy-paste of the first part of the output of kafka-reassign-partitions command {"version":1,"partitions":[{"topic":"...","partition":0,"replicas":[0,1]},...]}
newReplicationFactor no a number. If not set, the replication factor is derived from the partitions 3