As part of the Systems @Scale event, engineers participated in a series of live Q&As about the engineering work presented in the technical talks. We’ve collected those questions and the engineers’ responses below.
Asynchronous computing @Facebook: Driving efficiency and developer productivity at Facebook scale
Carla Souza details how Facebook handles asynchronous processing at scale and challenges that come with maintaining the reliability of a large multitenant system. Read more about asynchronous computing.
Q: Why aren’t you afraid of machine idle time during off-peak hours when engineers add more machines to Async today’s pool? Since there are obvious peaks and off-peaks of traffic, why not just let autoscaling do the work? Were any trade-off made to decide not to do autoscaling?
A:Adding machines to Async’s worker pool happens when there is a business need for use cases that are not delay tolerant to run during peak hours. To utilize the idle worker resources during off-peak hours, we encourage users to defer computation to off-peak hours by making the off-peak capacity as close to freely available as possible. This already keeps the utilization close to peak usage throughout the day. In the future, if need arises, we can also think about potentially releasing the idle machines to off-peak computation use cases that happen on other systems inside Facebook.
Autoscaling works well when plenty of resources are available at that time. Unfortunately, during peak hours many Facebook services demand more resources to handle the traffic and leave very few resources available in the machine pool.
Instead of competing for resources with other services, Async stores workloads that can tolerate long delays and processes them during off-peak hours. Async also encourages use cases to opt for running during off-peak hours by making it easy to get off-peak capacity. That keeps the worker pool close to peak throughput throughout the day. This strategy helps us reduce high resource demand on peak hours effectively.
Q: Are all the workers identical? In other words, does every worker know how to handle all requests, or does each use case have its own dedicated workers?
A:It depends on worker runtimes. For runtimes like HHVM or Django, all use cases share the same source code repository and deployment, so all workers are identical and a job can run on any worker host. For runtimes in which the corresponding functions for each use case need to run in their own processes (because either it is impossible to launch all processes on one host or they need different deployment policies), Async uses dedicated workers for each use case. The worker is not exchangeable across use cases. Please note that worker hosts are homogeneous on hardware.
Q: How does Async provide fault tolerance?
A: The job information persists in the queue. When Async dispatcher dequeues the job from the queue, it is leased (instead of being deleted from the queue). When a job fails on a worker, Async dispatcher will send it to another worker until it exhausts all retries set by the use case owner or the job execution succeeds. After that, the job is removed from the queue.
The lease has a predefined timeout. When Async dispatchers hold a job for too long or the job execution takes too long due to bugs, the lease could expire and the queue would make the job available for dequeue again. Another dispatcher could pick it up and send it to a worker.
The system supports features like idempotent vs. non-idempotent jobs and max retry counts for how many times a job can be retried, and they are built on top of features provided by the queue, like AT_MOST_ONCE delivery semantics and tracking the delivery retry counts.
Q: The previous queues were represented by MySQL tables, which weren’t scalable. How are queues currently represented? Is there a distributed queue service that’s currently used?
A:The first version of the system had the dispatcher managing the MySQL databases directly. The dispatcher algorithm read jobs from the database treating it as a single logical queue and dispatched them to workers. When jobs could not be dispatched for any reason (e.g., lack of worker capacity), they were put back into the database. This behavior caused read and write amplification per database record, which overloaded the databases. We solved this problem with the following improvements in the architecture:
- Separating out the queue management layer into its own distributed queue service, which used concepts like in-memory index to efficiently fetch items from databases, and rate limiting access to database operations to prevent database overload issues. Stay tuned for a blog post on this service in near future.
- Moving to an improved dispatcher algorithm that modeled each use case with its own queue and fetched jobs based on the rate at which they were getting dispatched.
Regarding the queueing service, we use an in-house solution. We considered internal and external solutions, and the motivation for building in-house has been the ability to support unique features and Facebook scale, and the ease of integrating with other Facebook infra. We are trading off the significant work needed to adapt external solutions to meet the last two requirements for building and maintaining an in-house solution. Stay tuned for a blog on this service.
Scaling services with Shard Manager
Gerald Guo and Thawan Kooburat present Shard Manager, a generic shard management platform that facilitates hundreds of stateful sharded services hosted on hundreds of thousands of servers. Read more about Shard Manager.
Q: Did you make any trade-offs in designing the architecture of Shard Manager?
A: There are various trade-offs we made along the way when building Shard Manager (SM). We’ll cover two interesting ones:
- Fully integrated approach
SM provides a fully integrated end-to-end solution that computes optimal shard assignment, drives shard state transition, handles all operational events, and supports client-side request routing. This holistic approach hides the infrastructure detail from users, and makes the development and operation of sharded applications easier.But the downside is that onboarding SM means adopting all the contracts and design choices we offer as a whole. This turns out to be too rigid for complex applications that require customization. Therefore, we are working on improving the modularity and pluggability of our system.
- Pure control plane vs. data plane
We intentionally chose to make SM a pure control plane for multiple reasons: 1) Supporting both control plane and data plane in one system drastically increases the complexity; 2) Separating control plane and data plane enables them to evolve and scale independently; 3) Not being on data plane reduces the criticality of SM being unavailable. Even if SM is down, applications can keep functioning. Some functionalities, such as server failure or network partition detection, can be done faster on the data plane than the control plane. But what SM offers as a control plane is sufficient for most applications.
A:Thank you for asking this question! The most relevant systems to SM are Google Slicer (GS), Azure Service Fabric, and Apache Helix. Previously, we had in-person discussions with the developers of these systems based on public information.
SM started in 2011. To our knowledge, SM probably predated the other systems, but we were so excited in 2016 when noticing the publication of the GS paper. We learned a lot from that paper about both the similarity and difference, which will be further explained below. With SM’s hundreds of diverse applications running on hundreds of thousands of servers, we speculate that SM might be the largest deployment in the industry according to published numbers.
One advantage of GS is that it does not divide the key space into fixed shards ahead of time. In other words, the shards are transparent to applications. Internally, GS can dynamically merge or split shards and move shards around to balance the load as needed. This is more flexible than SM’s approach, which divides the key space into fixed shards. SM can move shards around to balance the load, but currently does not merge or split shards.
On the other hand, to enable shard merge or split, GS hashes application keys into its internal 64-bit random key space, which is then internally divided into shards.This approach means that it is hard to support certain constraints, e.g., preference for key locality, that are important for some of our user-facing global applications. In addition, to our knowledge, SM supports a wider range of load-balancing constraints by leveraging a generic constraint solver, which allows us to more easily introduce new constraints or change the load-balancing algorithm.
Facebook infrastructure takes a layered but fully integrated approach, which enables SM, as one layer of an integrated stack, to offer more out-of-the-box functionalities. For example, SM is integrated with our cluster management system, Twine, via TaskControl. This allows SM to better manage shard availability in the face of planned and unplanned infra churns, including container rolling upgrades, maintenance operations, and hardware failures.
There are certainly more similarities or differences among SM, GS, Azure Service Fabric, and Apache Helix, beyond what we can describe here. We hope to publish a paper in the future to dive more deeply into these topics.
Q: Does any Facebook application that can be sharded run natively on the application server? Or do all Facebook applications need to be run in a container?
A: Facebook’s infrastructure is moving toward running applications in containers. Currently, all shard manager–based applications are hosted in containers. Running on a common container framework (Twine) also allows us to have deeper standard integration with maintenance and deployment handling via TaskControl. In the early days, we could not offer this functionality out of the box, and services have to develop ad-hoc integration for maintenance handling.
Containerizing ZooKeeper: Powering container orchestration from within
Christopher Bunn lays out the 18-month journey that enabled hundreds of ZooKeeper ensembles to safely run atop the cluster management platform that they make possible. Read more about our work containerizing ZooKeeper.
Q: How did the newly designed SID assignment logic hold up after a network partition if two ZooKeeper participants accidentally came up with the same identifier during the partition (supposing the participants had the same history and ended up with the same result for the value in the 0–255 range that had been recently assigned), or was that an impossible scenario?
A: Excellent question. Unique SID assignment alone is insufficient protection if two partitioned tasks attempt to generate a new SID at the same instant using the same inputs, since there’s no randomized seed involved. The additional protection we need comes in the form of atomic, one-at-a-time reconfigurations.
ZooKeeper’s dynamic reconfiguration uses the quorum for agreement and propagation of the new configuration, which allows it to do two very useful things:
- Confirm that the new configuration will be able to support quorum before committing it
- Propagate quorum adjustments in a strictly serialized way, assigning a logical timestamp to each
Technically, we can replace more than one participant at a time, but since our task control gates one-at-a-time semantics, in practice we only replace sequentially. Each replacement either succeeds or fails atomically using a conditional transaction, so if two tasks try to join the ensemble concurrently, only one of them will win, and the other will retry. We use task version to break ties between two tasks with the same ID, so they don’t flap.
Q: Is there anything extra you need to do in terms of security for widely broadcasted services, assuming there are a number of widely broadcasted services?
A: This question touches on two topics, so I’ll try to address both.
If the question is about the BGP Virtual IP broadcasting: We’ve been working with our routing and traffic teams to develop a low-dependency ACL-based protection for which entities can broadcast the VIPs we use. This ACL is based on x.509 certificates that are provided to all Twine containers via our asymmetric cryptography platform and that assert provable authentication of the service in question. In short, this protection ensures that critical VIPs cannot be broadcast by any entity other than our containers.
If the question is about secure access to ZooKeeper itself: Similarly, we use these certificates to authenticate both endpoints of each connection to ZooKeeper, following standard TLS mutual authentication protocols. Additionally, within ZooKeeper, we apply an authorization scheme that enforces rules based on which identities are allowed to access the ensemble. This also becomes more granular and tunable as we split shared ensembles apart and into separate ensembles dedicated to single use cases.
Q: I was wondering if you’re using ZooKeeper’s reconfiguration APIs to move ZooKeeper participants around? If so, it sounds like you’ve built a layer to make reconfiguration more robust/recover and undo when failures happen. Would be cool if you could contribute this layer to ZooKeeper!
A: Yep! We’re using the reconfiguration API, which is actually a special operation that can be invoked through the standard client interface, so long as the client has sufficient privilege within the ensemble.
The reconfiguration is handled completely by the newly joining participant, however, and I should clarify that there is no separate service that orchestrates membership swaps from an omnipotent viewpoint. The fact that we can safely and correctly manage member swaps using only information known to the new member was indeed one of the most important insights we had early in the project.
When a container starts up, before it initiates the ZooKeeper process, it works through a startup sequence that handles a number of different cases, including the “happy path” in which the ensemble is healthy and available for a member swap, along with a slew of “unhappy path” cases in which the ensemble might be in an unhealthy state.
In the happy path case, our containers are able to determine whether they should join the ensemble or if they’ve already been preempted by some replacement container that’s been scheduled as the result of a network partition. This is possible because the 64-bit server ID we generate (and record in the ensemble’s membership list) contains version information for each task identifier. In short, a participant can see whether it has been superseded simply by reading the membership list, which it can do before deciding whether it should attempt to replace the member in the list that shares its task identifier (if one exists) or simply add itself to the list (if one does not).
Fault tolerance through optimal workload placement
Elisa Shibley demonstrated how Facebook optimizes for the failure domain spread of hardware, services, and data, minimizing the negative impact of any failure domain loss within a data center region. Read more about fault tolerance through optimal workload placement.
Q: With all the constraints in the system, how do you ensure that adding new constraints won’t have unintended consequences?
A: We’ve built a schema for expressing constraints in our solver. This schema is intent-based, allowing services to describe the reason behind a constraint. For example, instead of a constraint limiting the number of instances of a service that can run in the same rack, a service owner can ask that each instance of their service is guaranteed 10 Mbps of network bandwidth. This grants our systems more flexibility in finding an optimal placement plan that satisfies all our constraints.
This schema also allows us to easily test the effect a new constraint has on our infra. We can simulate a constraint to see what would happen if it were introduced into production. If we see adverse effects, we can work with the service owners to build a safer version of their requirements.
Q: If you could design an ideal data center plan, what would its features be?
A: An ideal placement plan would be one that enables perfect and even spread of capacity across all fault domains. Such a plan is not yet possible. It would need perfectly symmetric data center designs, and service architectures that don’t place custom constraints on placement. We are working with our data center teams and service owners to evolve and improve placement. But change takes time, as we have to make physical changes to data centers.
Q: Do you test and validate your fault tolerance regularly? How?
A: We do! We must regularly simulate failures to ensure that our systems function properly when a real failure happens. We test that they safely move workloads to buffer capacity and help our infra survive such a failure. Facebook runs regular tests to simulate a loss of a fault domain. While some of these tests are run without prior notice to more accurately simulate a failure, we work closely with service owners to run tests with advance notice, with humans on hand to fix anything that goes awry.
Throughput autoscaling: Dynamic sizing for facebook.com
Daniel Boeve and Anca Agape compared throughput autoscaling with other approaches, demonstrating how autoscaling keeps Facebook’s web tier both safe and efficient throughout the day. Read more about throughput autoscaling here.
Q: What kind of infrastructure challenges, if any, did you face when developing the various models for throughput autoscaling?
A: There were a variety of challenges that needed to be solved in order to implement throughput autoscaling. On the supply modeling side, we need load test data. In order to get high-quality results, we needed to implement an effective load control mechanism capable of pushing services to their limits, but not beyond them, in order to measure maximum safe throughput. We also needed to have high-fidelity data collection mechanisms with minimal impact of service performance to perform these measurements.
On the demand side, we need to gather a lot of historical data. As a result, we are now the number-one client of our internal metrics service by a significant margin. However, our heavy API usage is not limited to data collection. We also are one of Twine’s and demand control’s largest API users. Executing resizes required resolving some challenges with the interaction between the autoscaling system and the continuous deployment system. Finally, we needed to work with Twine to ensure that the correct hardware was prioritized to be freed for elastic compute use cases.
Q: When estimating regional supply, is there an undo button if a load test returns the wrong maximum safe throughput for a machine?
A: We use a system that continually produces up-to-datee maximum throughput data by load testing a few dozen machines. This maximum throughput information is very important to us, as we use it in other places too. For example, we use it in our global load balancing to decide how much user traffic to send to each region.
We already had safety checks in this load testing system to prevent producing bogus data by filtering out outlier measurements and validating the variance in results. The load testing system will stop producing any data instead of producing bogus results, and then we would use the historical max throughput data in both traffic load balancing and autoscaling. However, if the throughput data was bad from an unexpected problem in the system, then having cautious limits for downsizing steps as opposed to upsizing can help mitigate the danger of a bad estimate while still allowing for quickly adding capacity in case changes in demand warrant it.
Q: What prevents services to keep growing their size when their performance degrades?
A: Autoscale can grow a service only within the capacity that is reserved to a service. But we also have this component that can file additional capacity requests if needed. If a service regresses too much, this could kick in. While these requests are filed automatically, they usually require the service owner and our capacity team to take a look, and decide if the service should be allocated more capacity or the team needs to get hands-on to fix the regressions.
If it’s urgent and the service might get in critical state, emergency capacity can be granted to the service temporarily. But the service owner needs to follow up on the regressions and return that capacity. This is how we detect and react to large regressions, if they happen, to keep the service reliable, but we also keep them in check.