Skip to main content

Confluent Kafka Multi Node Cluster Setup on CentOS 7

Kafka is a distributed system and data is read from and written to the partition leader. The leader can be on any broker in a cluster. When a client (producer or consumer) starts, it will request metadata about which broker is the leader for a partition. This request for metadata can come from any broker. The metadata that is returned will include the available endpoints for the lead broker of that partition. The client will use those endpoints to connect to the broker to read or write data as required.

1. Architecture Design

1.1. Architecture Diagram


1.2. Kafka Broker Disks Sizing

Because Kafka persists messages to log segment files, excellent disk I/O performance is a requirement. Writing Kafka logs to multiple disks improves performance through parallelism. Since Kafka requires that an entire partition fit on a single disk, this is an upper bound on the amount of data that can be stored by Kafka for a given topic partition. Hence, the number of topics, the number of partitions per topic, the size of the records, etc. all affect the log retention limits. So, if you have a high volume partition, allocate it to a dedicated drive. For resilience, also be sure to allocate enough disk space cluster-wide for partition replicas.

danger

Kafka does not degrade gracefully if it runs out of disk space. Make sure your sizing estimates are generous, monitor disk utilization, and take corrective action well before disk exhaustion occurs! In a default deployment Kafka can consume all disk space available on the disks it accesses, which not only causes Kafka to fail, but also the other applications and services on the node.

Based on the anticipated data rate, you can calculate how much disk space may be required and how to trade off the following variables to meet your needs and keep Kafka healthy.

  • Disk Size - Multi-TB hard drives have more capacity compared to SSDs, but trade off read and write performance.

  • Number of Partitions - Using more partitions increases throughput per topic through parallelism. It also allows more drives to be used, one per partition, which increases capacity. Note that Kafka only preserves message order per partition, not over a whole, multi-partition topic. Topic messages can be partitioned randomly or by hashing on a key.

  • Replication Factor - The replication factor provides resiliency for a partition. Replicas are spread across available brokers, therefore your replication factor is always <= the number of brokers in your cluster. It must be accounted for when determining disk requirements because there is no difference between how much space a leader partition and a replica partition uses. The total number of partitions for a given a topic is the number of partitions * the replication factor.

  • Dedicated Drives - Using dedicated drives for the Kafka brokers not only improves performance, by removing other activity from the drive, but should the drive become full, then other services that need drive space will not be adversely affected.

  • Do not use RAID for fault tolerance. Kafka provides fault tolerance guarantees at the application layer (with topic replication factor) making fault tolerance at the hard disk level redundant.

To estimate disk requirements for your need, per broker, begin by estimating the projected usage. Here is an example.

QuantityExampleDiscussion
Average topic message throughput (MT)500 msg/secFor each topic, how many messages on average per second.
Average message size (MS)5 KBNote that Kafka works best with messages under 1MB in size.
Broker message retention length (RL)24 hours (86400 seconds)The Kafka cluster retains all published messages whether or not they have been consumed for a configurable period of time.

For example if the log retention is set to 24 hours, then for 24 hours after a message is published it is available for consumption, after which it will be discarded to free up space.
Number of topics (NT)1
Average replication factor (NR)2
Number of Kafka Brokers (KB)3
Disk size per broker = ( MT x MS x RL x NT x NR ) / KB

= (500 x 5 x 86400 x 1 x 2) / 3
= 144000000 KB
= 138 GB

1.3. Kafka Broker Memory (RAM) Sizing

tip

Assuming message stays in memory in the worst case for 1 hour (3600 seconds)

Memory (RAM) per broker = (MT x MS x 3600)

= 9000000 KB
= 9 GB

2. System Requirements

ComponentDescription
Number of VMs3
Operating SystemCentOS 7 x64
File SystemXFS
PrivilegesROOT access prefered

3. Hardware Specifications (Per VM)

ComponentDescription
CPU4 Cores
Memory16 GB
OS Disk Size40 GB SSD
Data Disk Size250 GB SSD

4. Preparing the VMs

info
  • Verify the MAC address and product_uuid are unique for every node. You can get the MAC address of the network interfaces using the command ip link | grep link/ether
  • The product_uuid can be checked by using the command cat /sys/class/dmi/id/product_uuid

4.1. Install prerequisites.

# Clean YUM repository cache
yum clean all -y

# Update packages
yum update -y

# Install prerequisites
yum install -y curl which ntp

4.2. Install JDK 8.

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

4.3. Setting up JAVA_HOME by adding the following line at the bottom of /etc/bashrc

export JAVA_HOME=$(dirname $(dirname $(readlink $(readlink $(which javac)))))

4.4. Get the updated JAVA_HOME into current shell.

source /etc/bashrc

4.5. Verify the Java version.

java -version

4.6. Synchronize server time with Google NTP server.

# Add Google NTP Server
sed -i '/^pool/c\pool time.google.com iburst' /etc/chrony.conf

# Set timezone to Asia/Colombo
timedatectl set-timezone Asia/Colombo

# Enable NTP time synchronization
timedatectl set-ntp true

4.7. Start and enable chronyd service.

# Start and enable chronyd service
systemctl enable --now chronyd && systemctl status chronyd

4.8. Display time synchronization status.

# Verify synchronisation state
ntpstat

# Check Chrony Source Statistics
chronyc sourcestats -v

4.9. Start and enable NTP service.

systemctl restart ntpd.service && systemctl enable ntpd.service

4.10. Add the following lines to your /etc/hosts file in ALL nodes.

cat <<EOF | sudo tee /etc/hosts > /dev/null
# localhost
127.0.0.1 localhost localhost.localdomain

# When DNS records are updated in the DNS server, remove these entries.
192.168.16.100 KAFKA01 KAFKA01.EXAMPLE.LOCAL
192.168.16.101 KAFKA02 KAFKA02.EXAMPLE.LOCAL
192.168.16.102 KAFKA03 KAFKA03.EXAMPLE.LOCAL
EOF

4.11. Tweaking the system for high concurrancy and security.

cat <<EOF | sudo tee /etc/sysctl.d/00-sysctl.conf > /dev/null
#############################################################################################
# Tweak virtual memory
#############################################################################################

# Default: 30
# 0 - Never swap under any circumstances.
# 1 - Do not swap unless there is an out-of-memory (OOM) condition.
vm.swappiness = 1

# vm.dirty_background_ratio is used to adjust how the kernel handles dirty pages that must be flushed to disk.
# Default value is 10.
# The value is a percentage of the total amount of system memory, and setting this value to 5 is appropriate in many situations.
# This setting should not be set to zero.
vm.dirty_background_ratio = 5

# The total number of dirty pages that are allowed before the kernel forces synchronous operations to flush them to disk
# can also be increased by changing the value of vm.dirty_ratio, increasing it to above the default of 30 (also a percentage of total system memory)
# vm.dirty_ratio value in-between 60 and 80 is a reasonable number.
vm.dirty_ratio = 60

# vm.max_map_count will calculate the current number of memory mapped files.
# The minimum value for mmap limit (vm.max_map_count) is the number of open files ulimit (cat /proc/sys/fs/file-max).
# map_count should be around 1 per 128 KB of system memory. Therefore, max_map_count will be 262144 on a 32 GB system.
# Reference: https://docs.confluent.io/current/kafka/deployment.html
# Default: 65530
vm.max_map_count = 2097152

#############################################################################################
# Tweak file handles
#############################################################################################

# Increases the size of file handles and inode cache and restricts core dumps.
fs.file-max = 2097152
fs.suid_dumpable = 0

#############################################################################################
# Tweak network settings
#############################################################################################

# Default amount of memory allocated for the send and receive buffers for each socket.
# This will significantly increase performance for large transfers.
net.core.wmem_default = 25165824
net.core.rmem_default = 25165824

# Maximum amount of memory allocated for the send and receive buffers for each socket.
# This will significantly increase performance for large transfers.
net.core.wmem_max = 25165824
net.core.rmem_max = 25165824

# In addition to the socket settings, the send and receive buffer sizes for
# TCP sockets must be set separately using the net.ipv4.tcp_wmem and net.ipv4.tcp_rmem parameters.
# These are set using three space-separated integers that specify the minimum, default, and maximum sizes, respectively.
# The maximum size cannot be larger than the values specified for all sockets using net.core.wmem_max and net.core.rmem_max.
# A reasonable setting is a 4 KiB minimum, 64 KiB default, and 2 MiB maximum buffer.
net.ipv4.tcp_wmem = 20480 12582912 25165824
net.ipv4.tcp_rmem = 20480 12582912 25165824

# Increase the maximum total buffer-space allocatable
# This is measured in units of pages (4096 bytes)
net.ipv4.tcp_mem = 65536 25165824 262144
net.ipv4.udp_mem = 65536 25165824 262144

# Minimum amount of memory allocated for the send and receive buffers for each socket.
net.ipv4.udp_wmem_min = 16384
net.ipv4.udp_rmem_min = 16384

# Enabling TCP window scaling by setting net.ipv4.tcp_window_scaling to 1 will allow
# clients to transfer data more efficiently, and allow that data to be buffered on the broker side.
net.ipv4.tcp_window_scaling = 1

# Increasing the value of net.ipv4.tcp_max_syn_backlog above the default of 1024 will allow
# a greater number of simultaneous connections to be accepted.
net.ipv4.tcp_max_syn_backlog = 10240

# Increasing the value of net.core.netdev_max_backlog to greater than the default of 1000
# can assist with bursts of network traffic, specifically when using multigigabit network connection speeds,
# by allowing more packets to be queued for the kernel to process them.
net.core.netdev_max_backlog = 65536

# Increase the maximum amount of option memory buffers
net.core.optmem_max = 25165824

# Number of times SYNACKs for passive TCP connection.
net.ipv4.tcp_synack_retries = 2

# Allowed local port range.
net.ipv4.ip_local_port_range = 2048 65535

# Protect Against TCP Time-Wait
# Default: net.ipv4.tcp_rfc1337 = 0
net.ipv4.tcp_rfc1337 = 1

# Decrease the time default value for tcp_fin_timeout connection
net.ipv4.tcp_fin_timeout = 15

# The maximum number of backlogged sockets.
# Default is 128.
net.core.somaxconn = 4096

# Turn on syncookies for SYN flood attack protection.
net.ipv4.tcp_syncookies = 1

# Avoid a smurf attack
net.ipv4.icmp_echo_ignore_broadcasts = 1

# Turn on protection for bad icmp error messages
net.ipv4.icmp_ignore_bogus_error_responses = 1

# Enable automatic window scaling.
# This will allow the TCP buffer to grow beyond its usual maximum of 64K if the latency justifies it.
net.ipv4.tcp_window_scaling = 1

# Turn on and log spoofed, source routed, and redirect packets
net.ipv4.conf.all.log_martians = 1
net.ipv4.conf.default.log_martians = 1

# Tells the kernel how many TCP sockets that are not attached to any
# user file handle to maintain. In case this number is exceeded,
# orphaned connections are immediately reset and a warning is printed.
# Default: net.ipv4.tcp_max_orphans = 65536
net.ipv4.tcp_max_orphans = 65536

# Do not cache metrics on closing connections
net.ipv4.tcp_no_metrics_save = 1

# Enable timestamps as defined in RFC1323:
# Default: net.ipv4.tcp_timestamps = 1
net.ipv4.tcp_timestamps = 1

# Enable select acknowledgments.
# Default: net.ipv4.tcp_sack = 1
net.ipv4.tcp_sack = 1

# Increase the tcp-time-wait buckets pool size to prevent simple DOS attacks.
# net.ipv4.tcp_tw_recycle has been removed from Linux 4.12. Use net.ipv4.tcp_tw_reuse instead.
net.ipv4.tcp_max_tw_buckets = 1440000
net.ipv4.tcp_tw_reuse = 1

# The accept_source_route option causes network interfaces to accept packets with the Strict Source Route (SSR) or Loose Source Routing (LSR) option set.
# The following setting will drop packets with the SSR or LSR option set.
net.ipv4.conf.all.accept_source_route = 0
net.ipv4.conf.default.accept_source_route = 0

# Turn on reverse path filtering
net.ipv4.conf.all.rp_filter = 1
net.ipv4.conf.default.rp_filter = 1

# Disable ICMP redirect acceptance
net.ipv4.conf.all.accept_redirects = 0
net.ipv4.conf.default.accept_redirects = 0
net.ipv4.conf.all.secure_redirects = 0
net.ipv4.conf.default.secure_redirects = 0

# Disables sending of all IPv4 ICMP redirected packets.
net.ipv4.conf.all.send_redirects = 0
net.ipv4.conf.default.send_redirects = 0

# Disable IP forwarding.
# IP forwarding is the ability for an operating system to accept incoming network packets on one interface,
# recognize that it is not meant for the system itself, but that it should be passed on to another network, and then forwards it accordingly.
net.ipv4.ip_forward = 0

# Disable IPv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1

#############################################################################################
# Tweak kernel parameters
#############################################################################################

# Address Space Layout Randomization (ASLR) is a memory-protection process for operating systems that guards against buffer-overflow attacks.
# It helps to ensure that the memory addresses associated with running processes on systems are not predictable,
# thus flaws or vulnerabilities associated with these processes will be more difficult to exploit.
# Accepted values: 0 = Disabled, 1 = Conservative Randomization, 2 = Full Randomization
kernel.randomize_va_space = 2

# Allow for more PIDs (to reduce rollover problems)
kernel.pid_max = 65536
EOF

4.12. Reload all sysctl variables without rebooting the server.

sysctl -p /etc/sysctl.d/00-sysctl.conf

4.13. Configure firewall for ZooKeeper.

sudo firewall-cmd --permanent --add-port={2888,3888,2181}/tcp
sudo firewall-cmd --reload

4.14. Configure firewall for Kafka.

sudo firewall-cmd --permanent --add-port=9092/tcp
sudo firewall-cmd --reload

4.15. Verify the changes.

firewall-cmd --list-services

5. Creating XFS File System based on LVM for DATA

The XFS filesystem is a high performance journalling filesystem and it is the default file system for RedHat/CentOS Linux 7. XFS supports a maximum file system size of 500 TB and a maximum file size of 16 TB.

5.1. In order to continue further, you must attach a new disk to your server. The lsblk command lists all the block storage devices attached to the system. This includes raw disks, primary partitions, logical partitions and even network attached storage. Here, we have attached a new 250 GB disk which shows up as device sdb. The sda device holds the operating system.

[root@kafka01 ~]# lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
fd0 2:0 1 4K 0 disk
sda 8:0 0 30G 0 disk
├─sda1 8:1 0 500M 0 part /boot
└─sda2 8:2 0 29.5G 0 part
├─centos_vg-swap 253:0 0 2G 0 lvm
└─centos_vg-root 253:1 0 27.5G 0 lvm /
sdb 8:16 0 250G 0 disk
sr0 11:0 1 1024M 0 rom

5.2. To create a new XFS file system based on LVM you will first need a partition to format. You can use fdisk to create a new partition, like in the example below, you first need to invoke fdisk with the name of the hard disk you wish to create the partition on and then use "n" command inside fdisk for a new parttion. After you have set the size you will need to change the partition type from "Linux" to "Linux LVM" by typing "t" and "8e" respectively. Use the "w" command to write the new partition table to disk.

[root@kafka01 ~]# fdisk /dev/sdb
Welcome to fdisk (util-linux 2.23.2).

Changes will remain in memory only, until you decide to write them.
Be careful before using the write command.

Device does not contain a recognized partition table
Building a new DOS disklabel with disk identifier 0x382e7ba5.

Command (m for help): n
Partition type:
p primary (0 primary, 0 extended, 4 free)
e extended
Select (default p): p
Partition number (1-4, default 1):
First sector (2048-524287999, default 2048):
Last sector, +sectors or +size{K,M,G} (2048-524287999, default 524287999):
Partition 1 of type Linux and of size 250 GiB is set

Command (m for help): t
Selected partition 1
Hex code (type L to list all codes): 8e
Changed type of partition 'Linux' to 'Linux LVM'

Command (m for help): w
The partition table has been altered!

Calling ioctl() to re-read partition table.
Syncing disks.

5.3. Inform the operating system kernel of partition table changes, by requesting that the operating system re-read the partition table.

partprobe

5.4. Verify the changes using lsblk.

[root@kafka01 ~]# lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
fd0 2:0 1 4K 0 disk
sda 8:0 0 30G 0 disk
├─sda1 8:1 0 500M 0 part /boot
└─sda2 8:2 0 29.5G 0 part
├─centos_vg-swap 253:0 0 2G 0 lvm
└─centos_vg-root 253:1 0 27.5G 0 lvm /
sdb 8:16 0 250G 0 disk
└─sdb1 8:17 0 250G 0 part
sr0 11:0 1 1024M 0 rom

5.5. Create a physical volume.

Physical volume is the actual storage device that will be used in the LVM configuration. It can be an entire disk, a partition on disk or a LUN on the SAN. You can use pvcreate to create the physical volume. The pvcreate command initialize these disks so that they can be a part in forming volume groups.

[root@kafka01 ~]# pvcreate /dev/sdb1
Physical volume "/dev/sdb1" successfully created.

5.6. Display Physical Volumes using pvdisplay.

[root@kafka01 ~]# pvdisplay
--- Physical volume ---
PV Name /dev/sda2
VG Name centos_vg
PV Size 29.51 GiB / not usable 3.00 MiB
Allocatable yes (but full)
PE Size 4.00 MiB
Total PE 7554
Free PE 0
Allocated PE 7554
PV UUID GARu2F-XnFS-tOxz-Ri5A-ao8W-h2Tu-IZmwNJ

"/dev/sdb1" is a new physical volume of "<250.00 GiB"
--- NEW Physical volume ---
PV Name /dev/sdb1
VG Name
PV Size <250.00 GiB
Allocatable NO
PE Size 0
Total PE 0
Free PE 0
Allocated PE 0
PV UUID 6jTcKd-ITid-jxkZ-mQDV-2YHr-cFZT-wFf98q

5.7. Create a Volume Group.

Physical volumes are combined into volume groups (VGs). It creates a pool of disk space out of which logical volumes can be allocated. The disk space available for allocation in Volume Group is divided into units of a fixed-size called extents. An extent is the smallest unit of storage that can be allocated. Within a physical volume, extents are referred to as physical extents. A logical volume is allocated into logical extents of the same size as the physical extents. The extent size is thus the same for all logical volumes in the volume group. The volume group maps the logical extents to physical extents.

[root@kafka01 ~]# vgcreate kafka_vg /dev/sdb1
Volume group "kafka_vg" successfully created

5.8. Display Volume Groups using vgdisplay.

[root@kafka01 ~]# vgdisplay
--- Volume group ---
VG Name kafka_vg
System ID
Format lvm2
Metadata Areas 1
Metadata Sequence No 1
VG Access read/write
VG Status resizable
MAX LV 0
Cur LV 0
Open LV 0
Max PV 0
Cur PV 1
Act PV 1
VG Size <250.00 GiB
PE Size 4.00 MiB
Total PE 63999
Alloc PE / Size 0 / 0
Free PE / Size 63999 / <250.00 GiB
VG UUID jG0Rxk-wwuq-WdRd-vFxL-cXTH-b8Cb-zY3CXn

--- Volume group ---
VG Name centos_vg
System ID
Format lvm2
Metadata Areas 1
Metadata Sequence No 5
VG Access read/write
VG Status resizable
MAX LV 0
Cur LV 2
Open LV 1
Max PV 0
Cur PV 1
Act PV 1
VG Size <29.51 GiB
PE Size 4.00 MiB
Total PE 7554
Alloc PE / Size 7554 / <29.51 GiB
Free PE / Size 0 / 0
VG UUID 8ASRqW-YyA2-xTBD-54Zo-hPd7-8Vyy-D7w71O

5.9. Create Logical Volume.

A volume group is divided up into logical volumes. So if you have created kafka_vg earlier then you can create logical volumes from that VG. The amount of space you want to allocate depends on your requirement. You might want to create LV of 200MB, 1GB etc. In here, we create a logical volume that uses the entire volume group space.

[root@kafka01 ~]# lvcreate -n kafka_lv -l 100%FREE kafka_vg
Logical volume "kafka_lv" created.

5.10. Display Logical Volumes using lvdisplay.

[root@kafka01 ~]# lvdisplay
--- Logical volume ---
LV Path /dev/kafka_vg/kafka_lv
LV Name kafka_lv
VG Name kafka_vg
LV UUID Btk9AG-jpvM-WqWn-TGVU-45xl-3Zrc-8inmIu
LV Write Access read/write
LV Creation host, time kafka01, 2020-02-16 01:12:19 +0530
LV Status available
# open 0
LV Size <250.00 GiB
Current LE 63999
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 256
Block device 253:2

--- Logical volume ---
LV Path /dev/centos_vg/swap
LV Name swap
VG Name centos_vg
LV UUID 96S1dS-hnIt-cB4L-JhBB-Cq8s-Ehxe-zmqX8g
LV Write Access read/write
LV Creation host, time localhost, 2016-03-02 11:36:51 +0530
LV Status available
# open 0
LV Size <2.02 GiB
Current LE 516
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 256
Block device 253:0

--- Logical volume ---
LV Path /dev/centos_vg/root
LV Name root
VG Name centos_vg
LV UUID aTfjzg-5qff-b20m-PI8f-KHRX-sjOY-inkuze
LV Write Access read/write
LV Creation host, time localhost, 2016-03-02 11:36:51 +0530
LV Status available
# open 1
LV Size 27.49 GiB
Current LE 7038
Segments 1
Allocation inherit
Read ahead sectors auto
- currently set to 256
Block device 253:1

5.11. Format logical partition to XFS filesystem.

[root@kafka01 ~]# mkfs.xfs /dev/kafka_vg/kafka_lv
meta-data=/dev/kafka_vg/kafka_lv isize=512 agcount=4, agsize=16383744 blks
= sectsz=512 attr=2, projid32bit=1
= crc=1 finobt=0, sparse=0
data = bsize=4096 blocks=65534976, imaxpct=25
= sunit=0 swidth=0 blks
naming =version 2 bsize=4096 ascii-ci=0 ftype=1
log =internal log bsize=4096 blocks=31999, version=2
= sectsz=512 sunit=0 blks, lazy-count=1
realtime =none extsz=4096 blocks=0, rtextents=0

5.12. Create a directory where this XFS file system will be mounted.

mkdir /data

5.13. Find the Universally Unique Identifier (UUID) of the XFS filesystem using blkid command. This UUID will be used to mount newly created logical volume under /data directory.

[root@kafka01 ~]# blkid
/dev/sda1: UUID="6a16b2c9-009d-47a8-ac3d-ee7a8c6430b2" TYPE="xfs"
/dev/sda2: UUID="GARu2F-XnFS-tOxz-Ri5A-ao8W-h2Tu-IZmwNJ" TYPE="LVM2_member"
/dev/sdb1: UUID="6jTcKd-ITid-jxkZ-mQDV-2YHr-cFZT-wFf98q" TYPE="LVM2_member"
/dev/mapper/centos_vg-swap: UUID="464ca698-fb34-4887-bab0-3276ed86b5ca" TYPE="swap"
/dev/mapper/centos_vg-root: UUID="03c97344-9b3d-45e2-9140-cbbd57b6f085" TYPE="xfs"
/dev/mapper/kafka_vg-kafka_lv: UUID="8c926153-41f9-422b-96a1-a13c753872c8" TYPE="xfs"

5.14. Making mounts persistent by adding it to /etc/fstab.

# Example: Mount kafka_lv under /data
UUID="8c926153-41f9-422b-96a1-a13c753872c8" /data xfs defaults,noatime,nodiratime,allocsize=64m 0 0

5.15. Mount the file system.

mount -a

5.16. Verify the changes using lsblk.

[root@kafka01 ~]# lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
fd0 2:0 1 4K 0 disk
sda 8:0 0 30G 0 disk
├─sda1 8:1 0 500M 0 part /boot
└─sda2 8:2 0 29.5G 0 part
├─zone_vg-swap 253:0 0 2G 0 lvm
└─zone_vg-root 253:1 0 27.5G 0 lvm /
sdb 8:16 0 250G 0 disk
└─sdb1 8:17 0 250G 0 part
└─kafka_vg-kafka_lv 253:2 0 250G 0 lvm /data
sr0 11:0 1 1024M 0 rom

5.17. In order to permanently disable swap space in Linux, open /etc/fstab file, search for the swap line and comment the entire line by adding a "#" sign in front of the line.

# Example:
# UUID="464ca698-fb34-4887-bab0-3276ed86b5ca" swap swap defaults 0 0

5.18. Execute the following command to turn off all swap devices and files.

swapoff -a

5.19. Disable File Access Time Logging and enable Combat Fragmentation to enhance XFS file system performance. Add noatime,nodiratime,allocsize=64m to all XFS volumes.

# Example:
UUID="03c97344-9b3d-45e2-9140-cbbd57b6f085" / xfs defaults,noatime,nodiratime,allocsize=64m 0 0

5.20. The servers need to be restarted before continue further.

reboot

6. Install and Configure Confluent Community Components

6.1. Setting up Confluent repository

6.1.1. Install the Confluent Platform public key. This key is used to sign packages in the YUM repository.

rpm --import https://packages.confluent.io/rpm/5.4/archive.key

6.1.2. Add the Confluent repository.

cat <<EOF | sudo tee /etc/yum.repos.d/confluent.repo > /dev/null
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.4/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1
EOF

6.1.3. Update the YUM caches and install Confluent platform using only Confluent Community components.

yum update -y && yum install -y confluent-community-2.12

6.2. Configure ZooKeeper

6.2.1. Create the zookeeper-data directory.

mkdir /data/zookeeper-data

6.2.2. Each of our cluster nodes needs a UNIQUE SERVER ID. ZooKeeper looks up this information from the file /data/zookeeper-data/myid.

# In the KAFKA01.EXAMPLE.LOCAL server
echo "1" > /data/zookeeper-data/myid

# In the KAFKA02.EXAMPLE.LOCAL server
echo "2" > /data/zookeeper-data/myid

# In the KAFKA03.EXAMPLE.LOCAL server
echo "3" > /data/zookeeper-data/myid

6.2.3. Verify all nodes have a UNIQUE SERVER ID.

cat /data/zookeeper-data/myid

6.2.4. Setting up zookeeper-data directory ownership.

chown -R cp-kafka:confluent /data/zookeeper-data

6.2.5. Backup existing zookeeper.properties.

# Create a backup of existing configurations
cp /etc/kafka/zookeeper.properties /etc/kafka/zookeeper.properties.backup

6.2.6. Configure ZooKeeper.

# Configure ZooKeeper
cat <<EOF | sudo tee /etc/kafka/zookeeper.properties > /dev/null
# The unit of time for ZooKeeper translated to milliseconds.
# This governs all ZooKeeper time dependent operations. It is used for heartbeats and timeouts especially.
# Note that the minimum session timeout will be two ticks.
tickTime=2000

# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this.
admin.enableServer=false

# The port at which the clients will connect.
clientPort=2181

# The initLimit is the amount of time to allow followers to connect with a leader.
# The syncLimit value limits how out-of-sync followers can be with the leader.
# Both values are a number of tickTime units, which makes the initLimit 20 * 2000 ms, or 40 seconds.
initLimit=5
syncLimit=2

# When enabled, ZooKeeper auto purge feature retains the autopurge.snapRetainCount most recent snapshots
# and the corresponding transaction logs in the dataDir and dataLogDir respectively and deletes the rest.
autopurge.snapRetainCount=3

# Migrate topic partition leadership before the broker is stopped.
controlled.shutdown.enable=true

# The time interval in hours for which the purge task has to be triggered.
# Set to a positive integer (1 and above) to enable the auto purging.
autopurge.purgeInterval=24

# The maximum allowed number of client connections for a ZooKeeper server.
# To avoid running out of allowed connections set this to 0 (unlimited).
maxClientCnxns=0

# The directory where ZooKeeper in-memory database snapshots and, unless specified in dataLogDir,
# the transaction log of updates to the database. This location should be a dedicated disk that is ideally an SSD.
dataDir=/data/zookeeper-data

# The servers are specified in the format server.X=hostname:peerPort:leaderPort, with the following parameters.
# The myid must match the X in server.X parameter.
# X - The ID number of the server. This must be an integer, but it does not need to be zero-based or sequential.
# hostname - The hostname or IP address of the server.
# peerPort - The TCP port over which servers in the ensemble communicate with each other.
# leaderPort - The TCP port over which leader election is performed.
# Clients only need to be able to connect to the ensemble over the clientPort,
# but the members of the ensemble must be able to communicate with each other over all three ports.
server.1=KAFKA01.EXAMPLE.LOCAL:2888:3888
server.2=KAFKA02.EXAMPLE.LOCAL:2888:3888
server.3=KAFKA03.EXAMPLE.LOCAL:2888:3888
EOF

6.2.7. Start and enable ZooKeeper.

# Start and enable ZooKeeper
systemctl enable --now confluent-zookeeper

# Check service is started
systemctl status confluent-zookeeper

6.3. Configure Kafka

6.3.1. Create the kafka-data directory.

mkdir /data/kafka-data

6.3.2. Setting up kafka-data directory ownership.

chown -R cp-kafka:confluent /data/kafka-data

6.3.3. Backup existing Kafka server.properties.

# Create a backup of existing configurations
cp /etc/kafka/server.properties /etc/kafka/server.properties.backup

6.3.4. Configure Kafka.

  • Please note that broker.id and advertised.listeners properties are unique for each cluster nodes. Change its values accordingly.
# Configure Kafka
cat <<EOF | sudo tee /etc/kafka/server.properties > /dev/null
# References
# https://docs.confluent.io/current/kafka/deployment.html
# https://docs.confluent.io/1.0/kafka/post-deployment.html
# https://docs.confluent.io/current/installation/configuration/broker-configs.html#broker-configurations

# Enable auto creation of topic on the server
# Default: true
auto.create.topics.enable=true

# The id of the broker. This must be set to a unique integer for each broker.
# If unset, a unique broker id will be generated.
# To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1.
broker.id=1

# Specify the final compression type for a given topic.
# This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd').
# It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.
compression.type=producer

# Disable Confluent Support Metrics
# Default: true
confluent.support.metrics.enable=false

# Migrate topic partition leadership before the broker is stopped.
controlled.shutdown.enable=true

# Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off.
# Default: true
delete.topic.enable=true

# The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance.
# A longer delay means potentially fewer rebalances, but increases the time until processing begins.
# Default: 3000
group.initial.rebalance.delay.ms=3000

# Default replication factors for automatically created topics
# Default: 1
default.replication.factor=2

# The maximum allowed session timeout for registered consumers.
# Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures.
# Default: 1800000
group.max.session.timeout.ms=900000

# The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion
# Default: 300000
log.retention.check.interval.ms=60000

# The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property
# Default: 168
log.retention.hours=24

# The maximum size of a single log file
# Default: 1073741824
log.segment.bytes=67108864

# The amount of time to wait before deleting a file from the filesystem
# Default: 60000
log.segment.delete.delay.ms=1000

# The number of threads that the server uses for processing requests, which may include disk I/O
# Default: 8
num.io.threads=8

# The number of threads that the server uses for receiving requests from the network and sending responses to the network.
# Default: 3
num.network.threads=6

# The default number of log partitions per topic.
# Default: 1
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown
# Default: 1
num.recovery.threads.per.data.dir=4

# The replication factor for the offsets topic (set higher to ensure availability).
# Internal topic creation will fail until the cluster size meets this replication factor requirement.
# Default: 3
offsets.topic.replication.factor=3

# The SO_RCVBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.
# OS default /proc/sys/net/core/rmem_default
# Default: 102400
socket.receive.buffer.bytes=102400

# The maximum number of bytes in a socket request
# Default: 104857600
socket.request.max.bytes=104857600

# The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used.
# OS default /proc/sys/net/core/wmem_default
# Default: 102400
socket.send.buffer.bytes=102400

# Overridden min.insync.replicas config for the transaction topic.
# Default: 2
transaction.state.log.min.isr=2

# The replication factor for the transaction topic (set higher to ensure availability).
# Internal topic creation will fail until the cluster size meets this replication factor requirement.
# Default: 3
transaction.state.log.replication.factor=3

# Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.
# Default: false
unclean.leader.election.enable=false

# The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used.
# Default: null
zookeeper.connection.timeout.ms=6000

# The directories in which the log data is kept. If not set, the value in log.dir (Default: /tmp/kafka-logs) is used.
log.dirs=/data/kafka-data

# Listener List - Comma-separated list of URIs we will listen on and the listener names.
# If the listener name is not a security protocol, listener.security.protocol.map must also be set.
# Specify hostname as 0.0.0.0 to bind to all interfaces. Leave hostname empty to bind to default interface.
# Examples of legal listener lists: PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
listeners=PLAINTEXT://:9092

# Listeners to publish to ZooKeeper for clients to use, if different than the listeners config property.
# In IaaS environments, this may need to be different from the interface to which the broker binds.
# If this is not set, the value for listeners will be used.
# Unlike listeners it is not valid to advertise the 0.0.0.0 meta-address.
advertised.listeners=PLAINTEXT://KAFKA01.EXAMPLE.LOCAL:9092

# Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.
# To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify
# multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=KAFKA01.EXAMPLE.LOCAL:2181,KAFKA02.EXAMPLE.LOCAL:2181,KAFKA03.EXAMPLE.LOCAL:2181
EOF

6.3.5. Start and enable Kafka.

# Start and enable Kafka
systemctl enable --now confluent-kafka

# Check service is started
systemctl status confluent-kafka

7. Maintenance

tip
  • The confluent-zookeeper service must be started before the confluent-kafka service.
  • All confluent-zookeeper must be up and running before starting the confluent-kafka service.

7.1. Server Reboot Procedure

7.1.1. Stop confluent-kafka service in all nodes ONE BY ONE.

7.1.2. Stop confluent-zookeeper in one node and reboot the server.

7.1.3. Once the above rebooted server back online and healthy, stop confluent-zookeeper in the next server and reboot.

7.1.4. Follow the above steps for all nodes in the cluster.

7.1.5. Verify all confluent services are up and running using systemctl status confluent* command.

7.2. How to clean up a Kafka Cluster

7.2.1. Stop confluent-kafka service in all nodes ONE BY ONE.

7.2.2. Stop confluent-zookeeper in all nodes ONE BY ONE.

7.2.3. Remove Kafka log.dir data in all nodes using the following command.

find /data/kafka-data -mindepth 1 -maxdepth 1 -exec rm -rf {} +

7.2.4. Remove ZooKeeper dataDir data in all nodes excluding the myid file using the below command.

find /data/zookeeper-data -mindepth 1 -maxdepth 1 ! -name 'myid' -exec rm -rf {} +

7.2.5. Reboot all nodes.

7.2.6. Verify all confluent services are up and running using systemctl status confluent* command. Most likely, Kafka nodes stop working after the reboot since all ZooKeeper nodes are NOT running at the same time. please make sure to start it using systemctl start confluent-kafka in failed nodes.

8. References

  1. 20 Best Practices for Working With Apache Kafka at Scale
  2. Broker Configurations
  3. Kafka Performance Tuning
  4. Apache Kafka broker configuration example
  5. Manual Install using Systemd on RHEL and CentOS
  6. Kafka Clusters
  7. Running Kafka in Production
  8. Apache Kafka
  9. Kafka The Definitive Guide
  10. Apache Kafka Cookbook