Actors in computer science literature respond to messages, store state internally to change future responses, and have the ability to launch other actors. We support these, as well as a publish-subscribe mechanism where an actor can publish changes to any & all subscribed clients. This push-based model allows for actors to notify clients in a natural way for their UI stack. For example, if an actor representing a data structure is changed, the updates to that data structure can be shared with all interested clients easily. For instance, we’ve built an ObservableCloudList<T> type that works with data binding in .NET GUI applications.

Our intention is that actors run in an Actor Runtime, a cluster of machines that host actors on multiple replicas. An actor’s state will be duplicated on a number of replicas. There is a primary replica that services all incoming requests and handles replicating any state changes to secondary replicas. If a primary replica crashes, a secondary is promoted and takes over. Clients reconnect to the new replica and continue executing. Here’s a high level usage of actors, focusing on usage from client apps.[1]

clip_image004

An additional property of actors is that they only process one message/request at a time, so that the logic for each message/request has exclusive access to the actor’s state. An additional property of our actors is that, for any one client request, either all state changes (i.e., writes) associated with that request happen or none of them happen (in which case we report an error to the client). This relieves some of the burden around consistency from an actor author.

ActorFx Basics

The essence of the ActorFx model is captured in two interfaces: IActor and IActorState.

IActorState is the interface through which actor logic accesses the persistent data associated with an actor.

    public interface IActorState
    {
        void Set(string key, object value);
        object Get(string key);
        bool TryGet(string key, out object value);
        void Remove(string key);
        Task Flush(); // "Commit"
    }

By design, the interface is an abstract key-value store. The Set, Get, TryGet and Remove methods are all similar to what you might find in any Dictionary-type class. The Flush() method allows for transaction-like semantics in the actor logic; by convention, all side-effecting IActorState operations (i.e., Set and Remove) are stored in a local side-effect buffer until Flush() is called, at which time they are committed to the durable store.

The IActor interface

An ActorFx actor can be thought of as a highly available service, and IActor serves as the interface for that service. In its purest form, IActor would have a single method:

    public interface IActor
    {
        object Eval(Func<IActorState, object[], object> function, object[] parameters);
    }
That is, the caller requests that the actor evaluate a delegate, accompanied by caller-specified parameters represented as .NET objects, against an IActorState object representing a persistent data store. The Eval call eventually returns an object representing the result of the evaluation.

Those familiar with object-oriented programming should be able to see a parallel here. In OOP, an instance method call is equivalent to a static method call into which you pass the “this” pointer. In the C# sample below, for example, Method1 and Method2 are equivalent in terms of functionality:

    class SomeClass
    {
        int _someMemberField;

        public void Method1(int num)
        {
            _someMemberField += num;
        }

        public static void Method2(SomeClass thisPtr, int num)
        {
            thisPtr._someMemberField += num;
        }
    }
Similarly, the function passed to the IActor.Eval method takes an IActorState argument that can conceptually be thought of as the “this” pointer for the actor. So actor methods (described below) can be thought of as instance methods for the actor. This transformation from instance fields to storing state via an interface abstracts away the storage mechanism for an object’s state, allowing implementations of that interface to replicate state across multiple machines. High availability is achieved through redundant replicas of actors, each with its own copy of the IActorState. Currently we are providing an IActorState implementation that stores state in memory replicated across multiple machines. We intend to provide an IActorState implementation with persistence support in a future release.

Actor Methods

In practice, passing delegates to actors can be tedious and error-prone, and would require all clients to have the most recent version of those delegates. Therefore, the IActor interface was changed to look like this[2] :

    public interface IActor
    {
        string CallMethod(string methodName, string[] parameters );
        bool AddAssembly(string assemblyName, byte[] assemblyBytes);
    }
Though the Eval method is still an integral part of the actor implementation, it is no longer part of the actor interface (at least for our initial release). Instead, it has been replaced in the interface by two methods:
  • The CallMethod method allows the user to call an actor method; it is translated internally to an Eval() call that looks up the method in the actor’s state, calls it with the given parameters, and then returns the result.
  • The AddAssembly method allows the user to transport an assembly containing actor methods to the actor.

Note that by splitting Eval into these two different methods, an actor’s implementation can be patched independently of the client application.

There are two ways to define actor methods:

(1) Define the methods directly in the actor service, “baking them in” to the service.

myStateProvider.Set(
      "SayHello",
        (Func<IActorState, object[], object>) 
                  delegate(IActorState astate, object[] parameters) 
                   {
                     return "Hello!";
                    });

(2) Define the methods on the client side.
        [ActorMethod]
        public static object SayHello(IActorState state, object[] parameters)
        {
            return "Hello!";
        }

You would then transport them to the actor “on-the-fly” via the actor’s AddAssembly call.

All actor methods must have identical signatures (except for the method name):

  • They must return an object.
  • They must take two parameters:
    • An IActorState object to represent the “this” pointer for the actor, and
    • An object[] array representing the parameters passed into the method.

Additionally, actor methods defined on the client side and transported to the actor via AddAssembly must be decorated with the ActorMethodAttribute, and must be declared as public and static.

Auto-Idempotence Support

In any client-server scenario, there exists the possibility of some confusion between the client and the server as to the final disposition of a request in the face of a server or network failure. For example, consider the following two similar situations:

  1. Client C1 sends request R1 to the actor. The actor services R1, but the actor’s primary replica crashes before sending the response to C1. C1 then resends request R1 to the new primary for the actor.
  2. Client C1 sends request R1 to the actor. The actor services R1, and sends the response back to C1. Unfortunately, the response gets lost on the way back to C1. C1 then assumes that R1 was lost and resends it.

In both of the above situations, R1 has successfully been processed before it is resent by the client. If R1 has side-effects, then processing R1 again could result in inconsistent data. Consider, for example, an Increment request; you would not want to re-process such a request in these situations because the internally maintained counter would be inaccurate.

In order to support correctness in the face of such failures, we added these parameters to the IActor.CallMethod method:

    public interface IActor
    {
        string CallMethod(string clientId, int clientSequenceNumber, 
string methodName, string[] parameters);
        bool AddAssembly(string assemblyName, byte[] assemblyBytes);
    }
Each time that a request is completed for a client, the result, clientId and clientSequenceNumber are stored in the durable store of the actor. The next time that a request is received from the client, the actor compares the clientId/clientSequenceNumber with those stored in the durable store. If a duplicate request is detected, then the result from the previous request will be returned to the client.

In this fashion, the actor avoids re-processing identical requests that can occur in response to network or service failures. We call this “Auto-Idempotence” because it makes all CallMethod requests essentially idempotent[3].

Publish/Subscribe Support

We wanted to be able to provide subscription and publication support for actors, so we added these methods to the IActor interface:

    public interface IActor
    {
        string CallMethod(string clientId, int clientSequenceNumber, string methodName, string[] parameters);
        bool AddAssembly(string assemblyName, byte[] assemblyBytes);
        void Subscribe(string eventType);
        void Unsubscribe(string eventType);
        void UnsubscribeAll();
    }
As can be seen, event types are coded as strings. An event type might be something like “Collection.ElementAdded” or “Service.Shutdown”. Event notifications are received through the FabricActorClient.

Each actor can define its own events, event names and event payload formats. And note that the pub/sub feature is opt-in; it is perfectly fine for an actor to not support any events, or for clients to choose to not listen to updates.

An actor publishes notifications to its subscribers by two Publish methods added to IActorState:

    public interface IActorState
    {
        // Pub/sub support
        void Subscribe(IPublicationCallbackContract callerProxy, string eventType);
        void Unsubscribe(IPublicationCallbackContract callerProxy, string eventType);
        void UnsubscribeAll(IPublicationCallbackContract callerProxy);
        void Publish(string eventType, string eventValue);
        void Publish(string eventType, params object[] eventValues);

        void Set(string key, object value);
        object Get(string key);
        bool TryGet(string key, out object value);
        void Remove(string key);
        Task Flush(); // "Commit"

        bool ChangesPendingForCurrentOperation();
        void ClearChangesForCurrentOperation(); // "Rollback"

        IEnumerable<string> GetAllKeysWithPrefix(string keyPrefix);
        object CallMethod(string methodName, object[] args);
    }

Example: Counter

If you wanted your actor to support counter semantics, you could implement an actor method as follows:

        [ActorMethod]
        public static object IncrementCounter(IActorState state, object[] parameters)
        {
            // Grab the parameter
            var amountToIncrement = (int)parameters[0];

            // Grab the current counter value
            int count = 0; // default on first call
            object temp;
            if (state.TryGet("_count", out temp)) count = (int)temp;

            // Increment the counter
            count += amountToIncrement;

            // Store the new value
            state.Set("_count", count);

            // Publish new value to all subscribers
            state.Publish("Counter.Value", count);
            return count;
        }

Initially, the state for the actor would be empty. After an IncrementCounter call with a parameter of 5, the actor’s state would look like this:

Key

Value

“_count”

5

After another IncrementCounter call with a parameter of -2, the actor’s state would look like this:

Key

Value

“_count”

3

Note that the result of incrementing the counter is also published to all clients that subscribed to an event named “Counter.Value” on this instance of the actor. Any applications that subscribed to this event will be pushed a notification that the counter value has changed, and can update themselves appropriately.

Pretty simple, right? Let’s try something a little more complicated.

Example: Stack

For a slightly more complicated example, let’s consider how we would implement a stack in terms of actor methods. The code would be as follows:

        [ActorMethod]
        public static object Push(IActorState state, object[] parameters)
        {
            // Grab the object to push
            var pushObj = parameters[0];
 
            // Grab the current size of the stack
            int stackSize = 0; // default on first call
            object temp;
            if (state.TryGet("_stackSize", out temp)) stackSize = (int)temp;

            // Store the newly pushed value
            var newKeyName = "_item" + stackSize;
            var newStackSize = stackSize + 1;
            state.Set(newKeyName, pushObj);
            state.Set("_stackSize", newStackSize );

            // Return the new stack size
            return newStackSize;
        }

        [ActorMethod]
        public static object Pop(IActorState state, object[] parameters)
        {
            // No parameters to grab

            // Grab the current size of the stack
            int stackSize = 0; // default on first call
            object temp;
            if (state.TryGet("_stackSize", out temp)) stackSize = (int)temp;

            // Throw on attempt to pop from empty stack
            if (stackSize == 0) throw new InvalidOperationException("Attempted to pop from an empty stack");

            // Remove the popped value, update the stack size
            int newStackSize = stackSize - 1;
            var targetKeyName = "_item" + newStackSize;
            var retrievedObject = state.Get(targetKeyName);
            state.Remove(targetKeyName);
            state.Set("_stackSize", newStackSize);

            // Return the popped object
            return retrievedObject;
        }

        [ActorMethod]
        public static object Size(IActorState state, object[] parameters)
        {
            // Grab the current size of the stack, return it
            int stackSize = 0; // default on first call
            object temp;
            if (state.TryGet("_stackSize", out temp)) stackSize = (int)temp;

            return stackSize;
        }

[Remember when examining these actor methods that each actor message is processed sequentially, so all actor methods are effectively called under lock. Thus there is no need to worry about thread-safety in the method logic.]

To summarize, the actor would contain the following items in its state:

  • The key “_stackSize” whose value is the current size of the stack.
  • One key “_itemXXX” corresponding to each value pushed onto the stack.

After the items “foo”, “bar” and “spam” had been pushed onto the stack, in that order, the actor’s state would look like this:

Key

Value

“_stackSize”

3

“_item0”

“foo”

“_item1”

“bar”

“_item2”

“spam”

A pop operation would yield the string “spam”, and leave the actor’s state looking like this:

Key

Value

“_stackSize”

2

“_item0”

“foo”

“_item1”

“bar”

The Actor Runtime Client

Once you have actors up and running in the Actor Runtime, you can connect to those actors and manipulate them via use of the FabricActorClient. This is the FabricActorClient’s interface[5]:

    public class FabricActorClient
    {
        public FabricActorClient(Uri fabricUri, Uri actorUri, bool useGateway);
        public bool AddAssembly(string assemblyName, byte[] assemblyBytes, bool replaceAllVersions[6]  = true);
        public Object CallMethod(string methodName, object[] parameters);
        public IDisposable Subscribe(string eventType, IObserver<string> eventObserver);
    }

When constructing a FabricActorClient, you need to provide three parameters:

  • fabricUri: This is the URI associated with the Actor Runtime cluster on which your actor is running. When in a local development environment, this is typically “net.tcp://127.0.0.1:9000”. When in an Azure environment, this would be something like “net.tcp://<yourRuntimeDeployment>.cloudapp.net:9000”.
  • actorUri: This is the URI, within the Actor Runtime, that is associated with your actor. This would be something like “fabric:/actor/list/list1” or “fabric:/actor/adhoc/myFirstActor”.
  • useGateway: Set this to false when connecting to an actor in a local development environment, true when connecting to an Azure-hosted actor.

The AddAssembly method allows you to transport an assembly to the actor. Typically that assembly would contain actor methods, effectively add behavior to or changing the existing behavior of the actor.

The CallMethod method allows you to call the method of that name on the actor. It returns the result returned by the actor.

The Subscribe method allows you to subscribe to actor events. They will be published to the provided IObserver<string> object.


[1] While our goal is to target the cloud, or initial Actor Runtime release only runs on development machines in an Azure development cluster.

[2] Note that we also changed the parameters from .NET Objects to JSON strings, to accommodate non-.NET clients. However, we still have more work to do in this area; our actor logic still expects .NET-specific decorations inside of the JSON string in order to deserialize the passed parameters. We will work to remove this .NET dependency.

[3] Of course this simple scheme only works for synchronous requests from clients, where each client has at most one request outstanding. We would need to beef up the logic on the actor side if we were to support asynchronous requests where each client could have several outstanding requests.

[5] There’s actually more to it, but these are the important parts that one needs to know in order to begin experimenting with actors.

[6] Note that the replaceAllVersions parameter is ignored for this release, and will probably be removed in a future release.

Last edited Dec 7, 2012 at 5:56 AM by claudioc, version 18

Comments

No comments yet.