Skip to content

Latest commit

 

History

History
148 lines (104 loc) · 13.1 KB

ch07.asciidoc

File metadata and controls

148 lines (104 loc) · 13.1 KB

Adding Changeable/Mutable State with Dask Actors

Dask is focused on scaling analytic use cases, but you can use it to scale many other types of problems. So far, most of the tools you have used in Dask are functional. Functional programming means that previous calls do not impact future calls. Stateless functions are common in distributed systems like Dask, as they can safely be re-executed multiple times on failure. Updating the weights of a model during training is an example of state common in data science. One of the most common ways of handling state in a distributed system is with the actor model. This chapter will introduce both the general actor model and Dask’s specific implementation.

Dask futures offer a non-mutable distributed state, where values are stored on the workers. However, this doesn’t work well for situations in which you want to update the state, like changing the value in a bank account balance (as we’ll do in Making a bank account actor), or updating machine learning model weights during training.

Tip

Dask actors have a number of limitations, and we believe that in many cases the right answer is to keep mutable state outside of Dask (like in a database).

Of course, you don’t have to use distributed mutable state. In some cases, you may choose to not use distributed state and instead put it all in your main program. This can quickly lead to bottlenecks on the node responsible for your main program. Other options include storing your state outside of Dask, like in a database, which has its own trade-offs. While this chapter focuses on how to use the actor model, we conclude with when not to use Dask actors and alternatives for handling state, which is of equal importance.

Tip

Dask also has distributed mutable objects, covered in [dds_scheduling].

What Is the Actor Model?

In the actor model, actors do the following:

  • Store data

  • Receive and respond to messages, including from other actors and external

  • Pass messages

  • Create new actors

The actor model is a technique of dealing with state in parallel and distributed systems that avoid locks. While proper locking ensures that only one piece of code modifies a given value, it can be very expensive and difficult to get right. A common problem with locking is known as deadlocking—this is where resources are acquired/released in an incorrect order that the program can block forever. The slowness and difficulty of locks only increase in distributed systems.[1] The actor model was introduced in 1973 and has since been implemented in most programming languages,[2] with some popular modern implementations including Akka in Scala and the .NET languages.

It can be helpful to think of each actor as a person holding a note about their state, and that person is the only one allowed to read or update the note. Whenever another part of the code wants to access or modify the state, it must ask the actor to do this.

Conceptually, this is very similar to classes in object-oriented programming. However, unlike with generic classes, actors process one request at a time to ensure an actor’s state consistency. To improve the throughput, people often create a pool of actors (assuming they can shard or replicate the actor’s state). We’ll cover an example in the next section.

The actor model is a good fit for many distributed systems scenarios. Here are some typical use cases in which the actor model can be advantageous:

  • You need to deal with a large distributed state that is hard to synchronize between invocations (e.g., ML model weights, counters, etc).

  • You want to work with single-threaded objects that do not require significant interaction from external components. This is especially useful for legacy code that is not fully understood.[3]

Now that you have an understanding of the actor model in general, it’s time for you to learn about how Dask implements it, and about its trade-offs.

Dask Actors

Dask actors are one implementation of actors, and some of the properties differ between Dask and other systems. Unlike the rest of Dask, Dask actors are not resilient to failures. If the node, or process, running the actor fails, the data inside the actor is lost and Dask is not able to recover from it.

Your First Actor (It’s a Bank Account)

Creating an actor in Dask is relatively simple. To start with, you make a normal Python class with functions that you will call. These functions are responsible for receiving and responding to messages in the actor model. Once you have your class, you submit it to Dask, along with the flag actor=True, and Dask gives you back a future representing a reference to the actor. When you get the result of this future, Dask creates and returns to you a proxy object, which passes any function calls as messages to the actor.

Note

Note this is effectively an object-oriented bank account implementation, except we don’t have any locks since we only ever have a single thread changing the values.

Let’s take a look at how you can implement a common example actor for a bank account. In Making a bank account actor, we define three methods—balance, deposit, and withdrawal—that can be used to interact with the actor. Once the actor is defined, we ask Dask to schedule the actor so that we can call it.

Example 1. Making a bank account actor
link:./examples/dask/Dask-Explore-Actors.py[role=include]

When you call methods on the resulting proxy object (see Using the bank account actor), Dask dispatches a remote procedure call and returns a special ActorFuture immediately. This allows you to use actors in a non-blocking fashion. Unlike the generic @dask.delayed calls, these are all routed to the same process, namely the one where Dask has scheduled the actor.

Example 2. Using the bank account actor
link:./examples/dask/Dask-Explore-Actors.py[role=include]

The ActorFuture is not serializable, so if you need to transfer the result of calling an actor, you need to block and get its value, as shown in ActorFutures are not serializable.

Example 3. ActorFutures are not serializable
link:./examples/dask/Dask-Explore-Actors.py[role=include]

While having one actor per bank account does a good job of avoiding bottlenecks, since each bank account likely won’t have too many transactions queued, it is slightly inefficient, as there is a non-zero actor overhead. One solution is to extend our bank account actor to support multiple accounts by using a key and hashmap, but if all accounts are inside one actor, this can lead to scaling problems.

Scaling Dask Actors

The actor model described earlier in this chapter typically assumes that actors are lightweight, meaning they contain a single piece of state, and do not require scal⁠ing/​parallelization. In Dask and similar systems (including Akka), actors are often used for coarser-grained implementations and can require scaling.[4]

As with dask.delayed, you can scale actors horizontally (across pro⁠cesses/​machines) by creating multiple actors or vertically (with more resources). Scaling actors horizontally is not as simple as just adding more machines or workers, since Dask cannot break up a single actor across multiple processes.

When scaling actors horizontally, it is up to you to break up the state in such a way that you can have multiple actors handling it. One technique is to use actor pools (see Scaled actor model using consistent hashing). These pools can have a static mapping of, say, user → actor, or, in the situation in which the actors share a database, round-robin or other non-deterministic balancing can be used.

spwd 0701
Figure 1. Scaled actor model using consistent hashing

We extend the bank account example to a "bank" where an actor may be responsible for multiple accounts (but not for all of the accounts in the bank). We can then use an actor pool with hashing to route the requests to the correct "branch" or actor, as shown in Hashing actor pool example for a bank.

Example 4. Hashing actor pool example for a bank
link:./examples/dask/Dask-Explore-Actors.py[role=include]

Limitations

As previously mentioned, Dask actors are not resilient to machine or process failure. This is a design decision in Dask and is not true for all actor systems. Many, but not all, actor systems offer different options for the persistence and recovery of actors during failure. For example, Ray has the concept of recoverable actors (managed automatically inside of workflows or manually).

Warning

Calls to dask.delayed functions may be retried on failure, and if they call functions on actors, those function calls will then be duplicated. If you cannot have a function replayed, then you need to ensure it is called only from inside other actors.

Dask’s actor model is less full-featured than Ray’s actor model, much as Ray’s DataFrame is less full-featured than Dask’s. You may wish to consider running Dask on Ray to get the best of both worlds. While Holden is biased, she suggests you check out her book Scaling Python with Ray if you are interested in Ray.

When to Use Dask Actors

A common problem in the industry is not realizing when our cool new tool is not the right tool for the job. As the saying goes, "When you have a hammer, the whole world looks like a nail." You likely do not need actors and should stick with tasks if you are not mutating state. It is important for you to remember that there are other options for handling state, as shown in Comparison of techniques for managing mutable state.

Table 1. Comparison of techniques for managing mutable state
Local state (e.g., driver) Dask actors External distributed state (e.g., Zookeeper, Ray, or AKKA)

Scalability

No, all state must fit on a single machine.

State within each actor must fit on a machine, but actors are spread out.

Yes[5]

Resilience

Medium, but no increase in resilience cost (e.g., loss of driver is already catastrophic)

No, loss of any worker with an actor becomes catastrophic.

Yes, loss of entire cluster can be recovered from.

Performance overhead

RPC to driver

Same as dask.delayed

RPC to external system + external systems overhead

Code complexity

Low

Medium

High (new library to learn and integrate), extra logic for avoiding duplicate execution

Deployment complexity

Low

Low

High (new system to maintain)

As with most things in life, picking the right technique is a compromise specific to the problem you are trying to solve. We believe that one of the two local (e.g., driver) states, or the use of Ray actors in conjunction with Dask for its analytics powers, can handle most cases in which you need mutable state.

Conclusion

In this chapter you have learned the basics of how the actor model works as well as how Dask implements it. You’ve also learned some alternatives for dealing with state in a distributed system, and how to choose between them. Dask actors are a relatively new part of Dask and do not have the same resilience properties as delayed functions. The failure of a worker containing an actor cannot be recovered from. Many other actor systems offer some ability to recover from failures, and if you find yourself depending heavily on actors, you may wish to explore alternatives.


1. See the ZooKeeper documentation for an understanding of ZooKeeper’s distributed performance.
2. The actor model was extended in 1985 for concurrent computation; see "Actors: A Model of Concurrent Computation in Distributed Systems" by Gul Abdulnabi Agha.
3. Think COBOL, where the author left and the documentation was lost, but when you tried to turn it off accounting came running, literally.
4. A coarse-grained actor is one that may contain multiple pieces of state; a fine-grained actor is one where each piece of state would be represented as a separate actor. This is similar to the concept of coarse-grained locking.
5. Ray actors still require that the state within an actor must fit on a single machine. Ray has additional tools to shard or create pools of actors.