In this article, we will take you on a journey of integrating and leveraging LinkedIn’s Brooklin platform as a mirroring technology to replace Kafka Mirror Makers (KMM).
We will explain how we transitioned from configuring, operating, and maintaining 212 distinct Kafka Mirror Maker clusters to only 14 Brooklin clusters to support 33 local Kafka clusters and 17 aggregate Kafka clusters. Upon transitioning, we used 57 percent fewer CPUs while gaining new capabilities for successfully managing our Kafka environment. We carried out this migration with no global downtime in an environment composed of thousands of machines and many thousands of CPUs, processing over 15 million messages per second.
Before we dive into the implementation details, let’s set some context by discussing key technologies and challenges to get a handle on the lay of the land.
Kafka, Kafka everywhere...
Kafka, a distributed streaming platform from the Apache Software Foundation, is a key component in our Big Data platform at Wayfair. We use Kafka to build real-time stream applications and data pipelines serving a wide variety of use cases including event messaging, web activity tracking, change data capture (CDC) streaming, log and metrics aggregation, and stream processing.
Since its deployment 4 years ago, we have observed a dramatic increase in the usage and adoption of Kafka. This was primarily driven by two factors. Firstly, there has been a sustained growth in our core Storefront web traffic fueled by our rapidly growing customer base. Secondly, many teams moved away from the legacy message and batch based systems to real-time data pipelines that scale. Today, the users of Kafka publish a total of about 90K topics, creating around 15 million messages per second or around 1.3 trillion messages per day. This high volume of traffic runs on 33 local Kafka clusters that operate entirely within one of a handful of geographically distributed data centers (DCs) we operate in on-premise and also in Google Cloud (GCP).
Kafka was built to run within a single data center, although it can be built across multiple availability zones within a single DC. We also run another 17 clusters that function as Aggregate Kafka clusters, combining data from multiple local Kafka clusters located within various data centers. For example, we have local Kafka clusters for Logging workloads in all of our data centers. Systems send their logs to the local Kafka cluster in their ownDC, however, we also want to be able to process the data in aggregate. To accomplish this, we run two additional Aggregate Kafka clusters in two DCs dedicated solely for the Logging workloads. These two Aggregate Kafka clusters not only run our log ingestion, aggregation and reporting systems but also help achieve high availability, giving us the ability to survive a single data center loss and keep logs flowing.
More Mirror Makers
Now the question is - how do we get data in Kafka from one data center to another data center? More specifically, how do we replicate data from local Kafka clusters to the Aggregate Kafka clusters? This is where a Mirror Maker fits in. Kafka Mirror Makers (KMM) are their own type of “cluster” of servers, so we’ll call them KMM clusters. To support replication of data from the 33 local Kafka clusters to the appropriate 17 Aggregate Kafka clusters, we needed to operate 212 (!) distinct KMM clusters, totalling around ~8000 vCPUs (virtual Central Processing Units). Not surprisingly, this complex topology led to high capital and operational costs.
We had to run so many KMMs because they only support a single source sending data to a single target. That is, a KMM can send data from one local Kafka cluster to one Aggregate Kafka cluster. The figure below depicts the Kafka mirroring process for two target DCs for simplicity:
To illustrate the problem of high maintainability and cost due to the growing number of clusters, let’s take a simple example of twoDCs. The figure below illustrates the topology of the KMM data aggregation using DC1 and DC2. In the given example, you will notice that a total of four KMM clusters (depicted as purple boxes) are being used. Recall that KMM supports only a one-to-one mapping of the source and the destination.
If we extrapolate for a single datastream and local Kafka clusters scattered across d datacenters, replicating data from n sources to m targets, we would need n*m (where m and d have the same values) mirror making clusters in total.
It wasn’t long before the team realized it was time to look for a replacement, as the current system did not meet our quality standards and added huge operational toil. Specifically, the following immediate challenges and the need to support a growing user base became the driving factors for the change:
- Each of the KMM pipelines requires one to one replication (1 source : 1 destination) resulting in more than 212 clusters to support data aggregation use cases.
- The compute cost (# of vCPUs) was too high as we were using around 8K vCPUs across the KMM clusters.
- There were several issues with the KMM 1.0 version, leading to frequent service breakdowns. These included Kafka rebalances and subsequent Kafka topic lag build up, missing topics in target cluster, and unresolved bugs to name a few. On an average we were dealing with ~12 issues per month.
- Frequent service outages and increased latency not only left our internal customers dissatisfied but also overstrained our support workflows.
We selected the following systems for evaluation based on our preliminary research:
- Confluent Replicator from Confluent (commercial offering)
- Uber uReplicator from Uber (open source)
- Mirus from Salesforce (open source)
- Brooklin from Linkedin (open source)
To evaluate these offerings we used a requirements scorecard based on the MoSCoW (Must, Should or Could, or Won’t have) prioritization framework to rate the platforms against a predefined set of features. These features included ability to scale well, ease of configuration, cost, fine grained topic controls (selective topic replication), high performance (number of messages per second per server), resiliency, etc. Brooklin rated the highest among all of the platforms.
Better with Brooklin
Brooklin is an open-source system developed by the LinkedIn team as “an extensible distributed system for reliable nearline data streaming at scale.” It can be used as a data streaming platform to read from and publish to a heterogenous set of systems. To learn more, refer to Open Sourcing Brooklin: Near Real-Time Data Streaming at Scale blog. The diagram below provides a high level overview of the platform.
Brooklin was built as a general streaming platform and it can be used as a Mirror Maker (MM) to replicate data across Kafka clusters. To draw a comparison with the KMM topology in our previous example (Figure 2), let’s take a look at the BMM topology for two DCs. As seen in the figure below, we were able to employ a single Brooklin cluster to aggregate data from multiple sources located in different data centers. The multitenancy model allowed the data stream creation from multiple sources to multiple destinations in a single Brooklin cluster. This greatly reduced the number of mirroring clusters.
Among the four streaming platforms, Brooklin emerged as a clear winner due to the following reasons:
- Scalability: BMM allows for scaling the size of any given cluster in two different dimensions, both horizontally and vertically. Due to the flexibility of the task partitioning strategy, one can increase the memory and CPU on the nodes in an existing cluster (vertical scaling) or add new nodes to the cluster dynamically (horizontal scaling). That flexibility gives us a lot more control over our clusters and we can tune them to address specific needs of workloads.
- Multi-tenancy model: BMM allows the use of a single cluster to mirror from multiple sources to multiple destinations. This breaks from (and is a vast improvement to) the KMM’s "one cluster per source:destination pairing" approach. Adding new streams no longer requires building a new cluster.
- Easy to set up: The configuration of Brooklin is straightforward and easily automated (via puppet in our case), so adding new nodes to an existing cluster, or creating whole new clusters is as simple as overriding the parameters in the configuration.
- Restful APIs for data stream management: Adding new mirror streams to Brooklin requires a single, one-line command to create a new datastream. Now, simple REST calls can perform all major administrative operations needed. One particular call out: the ability to pause a stream via the APIs is a really helpful feature for managing the operations of the Mirror Makers.
- Early partnership with Linkedin: Early in the project, we reached out to LinkedIn with questions, and some fixes to issues we discovered in testing. The team welcomed us with open arms, and gave us insight into its experience, roadmap, and configuration. An open and early collaboration with LinkedIn’s team greatly helped us in our implementation.
- High throughput: BMM clusters provided ~2.5 times more network throughput (bytes in/ bytes out) than KMM clusters with identical setup and resources.
- Active development pipeline: An active development community meant bugs and features would continue to be fixed and added to the platform.
- Extensibility: The Brooklin architecture is a general data movement platform. If the migration goes well, we will have many other ways to leverage and extend Brooklin for our overall data movement needs.
Post evaluation, we started to implement the system to support our current business-critical messaging pipelines while also taking our future scaling needs into consideration. This needed to be achieved in a three month timeline before Wayfair’s large sales event in April. We share our approach and key learnings as follows:
- Try and Apply
In the past, we had conducted various iterations of tuning KMM parameters. These efforts did not go in vain. We started by drawing parallels and creating a map between the KMM and BMM parameters. We used an n1-standard-16 machine type against the Metrics workload. Our goal was to configure settings to set up a stable platform which we were able to accomplish by adjusting record size, batch size, and timeouts, as outlined in Table 1.
A significant advantage of Brooklin was the availability to tune the workload parameters of the producers’ (numProducersPerConnector in transportProvider) properties, which helped to increase the overall throughput of the producers. The ability to use two producer threads by node allowed us to scale linearly, which resulted in a balance between the bytes-in and bytes-out of the producers and consumers.
The table below shows the results that helped reduce the number of rebalances and Out of Memory Exceptions (OOMs), and increased throughput while standing up an initial stable test environment:
Table 1: BMM parameter tuning results
|max.poll.records||The maximum number of records returned in a single call to poll()||1000-6000||500|
|The maximum delay between invocations of poll() when using consumer group management||600,000 ms||120,000 ms|
|The timeout (in milliseconds) to spend waiting in poll calls to Kafka if no data is available||NA||900,000 -1,505,000 ms|
|batch.size||The number of records in one single batch for producer||32K-100K||16,384|
|buffer.memory (megabytes)||The total bytes of memory the producer can use to buffer records that are waiting to be sent to the server||256-1024 MB||128-512 MB|
We ran a pilot in December 2019 by deploying Brooklin for the standby instance of the Metrics Aggregate Kafka cluster, running in one of our GCP data centers. A successful pilot during the peak load of the holiday weeks between the Christmas and New Year further validated our work.
2. Solve the most complex problem first
At Wayfair, we have three primary Kafka workloads, Logging, Clickstream, and Metrics, that largely drive our requirements for Kafka and all other related services. The specifications for each vary, but they can generally be characterized as:
- High message rate, small messages (Metrics)
- Lower message rate, large messages (Logging)
- Lower message rate, heterogenous message sizes (Clickstream)
The following 2X2 matrix below maps the message sizes and input rate of message for these three workloads and provides a sense of the volume and velocity of these systems.
In the table below, we have listed the specific message rate, total number of Kafka topics, number of average partitions, average message size and average number of bytes received for each of the workloads per datacenter.
Table 2: Workload profile for Metrics, Logging, and Clickstream per data center
(messages per second)
|# of total topics||# of Avg partitions replicating||Avg message size (KB)|
For our first migration from KMM to BMM, we selected the Metrics Kafka clusters processing more than ~10.6 million messages per second across all DCs. We saw this as the most challenging workflow to address, due to the rapid rate of the messages that these clusters process. Moreover, these clusters have the tightest requirements for uptime, and the least tolerance for the topic lag. Hence, solving the challenges for the Metrics workflows first would serve as a litmus test for the rest of our data pipelines.
3. Collaborate and Contribute
During the course of this project, the team closely collaborated with internal teams and external partners at LinkedIn. The LinkedIn team was extremely helpful, and shared details about its architecture and environment that immensely helped us with our decisions. There were two major issues discovered during our implementation.
First, we hit a few bugs. We found a bug that caused the Brooklin cluster to go leaderless because of its handling of Zookeeper session expiry events. We reported a bug and posted a pull request (PR) -ZooKeeper Session Leak Fix. We encountered another bug for missing record keys in the destination topic, again posting a PR to address the issue - Missing Record keys in Kafka destination topic.
Second, we found that during a partial cluster shutdown, load balancing would send too many tasks to some of the active nodes. This would occasionally overwhelm a subset of nodes, causing Linux OOMs and subsequent node failures. To fix this issue, we introduced a new parameter (See New maxTasksPerInstance option) to limit the number of tasks assigned to a single node instance per data stream. By restricting the number of tasks handled by any single node, we could now prevent the domino effect caused due to the node failures, ultimately reducing the number of rebalances and improving overall stability.
During the different stages of the project, we also partnered with the three teams that owned Metrics, Logging, and Clickstream workloads to receive early feedback. In one case, we conducted a stress test with the Logging and Metrics teams to successfully test 3x their typical load. In another instance, we assisted the Metrics team with debugging an issue in which their back pressure mechanism kicked in. It turned out the team had assumed some lag while tuning the back pressure. Brooklin replication recovered so fast that it looked like a traffic spike, causing the back pressure mechanism to kick in. This led the Metrics team to perform a recalibration of their systems and set reasonable back pressure thresholds.
Through tight collaboration we ensured the transition from Kafka Mirror Maker was seamless for our customers.
4. Learn and Iterate!
Wayfair culture is try; measure; improve. We have two quick examples that demonstrate this mentality. First, we initially deployed Brooklin on VMs. When we mentioned this to the LinkedIn team, they were interested to hear how it went, given that they had deployed on physicals but were also considering migration to VMs. VMs are fine, but we wanted to do more. Given the numerous benefits of containers, we decided to pursue containerizing Brooklin. We achieved some initial success, however, we discovered that Brooklin services were very sensitive to the normal maintenance/rebuild cycles of K8s pods. The Wayfair K8s team only supported stateless applications at that point in time. In order to avoid instability in the environment, we decided to pivot away from K8s back to VMs.
Upon pivoting our deployment back to VMs, we needed to identify the appropriate settings to apply across our clusters. We wanted to pick a standardized cluster configuration which we could apply across workflows in order to simplify and streamline the management of those clusters. Each workflow was tested iteratively, tuning one setting at a time to gauge its impact and then moving to the next setting. After much testing, we worked to merge them into a single configuration that could apply across all three workflows. While these workflows differed in characteristics, we found that we were able to find a “sweet spot” in some of our settings without negatively impacting the performance.
To fine tune the system for the optimal performance, we worked closely with the performance team and systematically configured all the workloads by optimizing network traffic (bytes_in and bytes_out) for a single consumer and producer thread initially. After a second round of tuning we were able to scale the Brooklin system to provide ~30 MB /second of throughput with a single n1-standard-16 node in GCP. This gives us a standard “unit of performance” which we can then use for scaling horizontally. Next, we used that unit to calculate the number of nodes needed to achieve a certain amount of throughput.
The figure below depicts the impact of horizontal scaling with a lag of about 1.15 billion. The goal was to see how the rate of processing messages scaled with the number of nodes in the BMM cluster. To carry out this measurement, we generated a queue of unprocessed messages in a single data center across three different Kafka topics related to Metrics.
For the given total lag of 1.15 billion, a 30 node configuration brought the recovery time down to ~18 minutes compared to the ~40 minutes period with 10 nodes. The red line depicts the slope or rate at which the number of messages is processed. Higher (negative) slope means the messages are processed faster and there is a faster recovery time. We confirmed that the message processing rate scales well with the number of nodes.
This node level performance benchmark allowed us to easily calculate the number of nodes needed to recover from the lag in a certain amount of time based on specific Service Level Objectives (SLOs), keeping most other settings the same.
- Measure Once, Test Twice
Rigorous and comprehensive testing was combined with a focus on stability, resilience, and measurable performance of the systems was conducted, and the results were shared. The tests were performed for three different kinds of workflows; Metrics, Logging and Clickstream, each serving a unique use case.
Apart from system based metrics such as CPU utilization, memory utilization, bytes_in, and bytes_out from Datadog, Kafka specific metrics like num_rebalances.count, producer.waiting_threads, and aggregate.events_byte_processed_rate.mean_rate were used to compare performance. Kibana/ElasticSearch logs were used for measuring error frequency at different levels, number of rebalances, and record processing rate.
The metrics listed above were captured for the following test scenarios:
- Restart Brooklin cluster
- Restart Brooklin cluster with 1 hour and 4 hours worth of lag
- Restart Brooklin with ~1 billion message lag
- Cause a Kafka rebalance by stopping source and target kafka broker nodes.
- Cause a Brooklin rebalance by stopping up to 5 nodes of Brooklin cluster
We further tuned max.poll.interval.ms, commitIntervalMs, daemonThreadIntervalInSeconds, and nonGoodStateThresholdMs to minimize the lag during the restart of the Brooklin service. Tuning also addressed the issue of Java thread tasks getting killed due to the unexpected errors. The table below shows the final cluster level configuration settings for the three workloads.
Table 3: Final cluster level configuration settings for Metrics, Logging and Clickstream workloads (Configuration settings that were different than the rest are marked in bold)
|Workload Name||Total #BMM nodes||Consumer properties||Producer properties|
|# of consumer task per cluster||max.poll.records||# of producer tasks per node||buffer .memory(MB)||batch.size|
Results and Conclusion
Migrating to Brooklin not only provided us with a stable Kafka mirroring platform with lower replication lag, but also reduced the count of Mirror Maker clusters from 212 to a mere 14 Brooklin clusters. This was a 93 percent decline of systems we had to configure and operate. The move to Brooklin mirroring also yielded a 57 percent reduction in the vCPUs needed. Overall, the migration led to an annual savings of many hundreds of thousands of dollars post deployment.
In addition to the reduced infrastructure spending, the operational overhead for maintaining Mirror Makers was reduced by 90 percent due to the fewer number of clusters, simplified configuration management, and deployment process. The migration to BMM resulted in a reduction of production issues by over 10 times per month, improving both system availability and customer satisfaction.
The project was completed in March 2020, within three months from start to finish. This included the pilot, setup, testing, deployment, training, and documentation. After the successful deployment to production, we were ready for Wayfair’s April Save Big, Give Back sales event. The systems worked flawlessly during the large-scale event.
Building a new at-scale streaming pipeline and migrating to it within the tight time constraints was a huge undertaking. It wouldn’t have been possible without the management support, razor focused team, and strong stakeholder partnerships.
We would like to extend our thanks to our team members (Anil, Fred, Krishna, Santosh, Sukhvinder and Tom) and the project sponsors (Perry and Vinay). We would also like to thank all of the other cross-functional teams whose support and feedback helped us move quickly from PoC to pilot and then to the final rollout.
The Big Data Engineering (BDE) team at Wayfair is a core foundational team focused on building secure, scalable, and reliable next generation platforms. We drive the business’ ability to use data to create the best experience for millions of Wayfair users purchasing home furnishings online.