-
Notifications
You must be signed in to change notification settings - Fork 1
Advanced Concepts
The advanced concepts implemented by Fantasm are based on the Brett Slatkin talk from Google I/O 2010, Building high-throughput data pipelines with Google App Engine.
Continuation is a technique whereby a process is forked for each subsequent iteration of a dataset. After the fork is created, the original process continues on to operate over its portion of the dataset. This allows a large number of processes to be fanned out to operate over the dataset in parallel.
In the App Engine world, this corresponds to kicking off a new named task for each portion of a dataset (typically a portion of a datastore query) and then operating over the portion. (Naming the task is vital to prevent fork bombs - be sure to watch the talk mentioned above.)
In Fantasm, a state can be demarcated as a continuation.
states: - name: MyFanOut action: MyFanOutActionClass # will have a continuation method continuation: True ...
This means that when this state is reached, the machine may spawn copies of itself with each copy operating over a portion of the dataset. Fantasm hides much of the complexity of this operation, requiring you only to write a method that returns the next continuation token (e.g., a datastore cursor). See FSM Actions - Continuation Interface for the specific interface details.
Fan-in is a process by which a number of tasks can be brought together into a single process so that they can be operated over in batch. Fan-in can be thought of as a series of boxcars that ship on a regular interval (e.g., every 15 seconds). When a task comes in, it is placed on a boxcar and waits to be shipped. Up to 15 seconds later, the boxcar is shipped loaded with tasks that can all be processed together.
This is a very powerful technique, yet it is tricky to implement.
In Fantasm, a state can have an fan_in attribute. It is an integer representing how often a boxcar will ship (in seconds).
states: - name: MyFanIn action: MyFanInActionClass fan_in: 15
The interface for a fan-in action is exactly the same, except instead of operating over a single context, you will receive a list of contexts, one for each machine being fanned in.
class MyFanInActionClass(object):
def execute(self, contexts, obj): # contexts is a list of dictionaries, one for each machine being fanned in
Note that in this action method, you are "joining" the forks back together. If you return an event (i.e., if this is a non-final state), only one of the machines will continue forward. TODO - which context is carried forward?
When using a fan_in state, you will notice extra tasks being queued. These tasks are the tasks that fire to "ship the boxcar" - notice that they are set to run in the future. While the machine contexts are being collected, they are temporarily stored in datastore.
IMPORTANT When fanning in high rate processes, it is important to not choose a fan_in setting that is too high. A fan_in of 1 or 2 is appropriate for high rate processes.
Normally, all equivalent states of a given machine instance are grouped together when fanning in. This can be further refined by specifying a fan_in_group in the state configuration. The fan_in_group is the name of the attribute on the machine context to use for grouping. When grouping, you will see an extra "ship the boxcar" task queued for each group. If a particular context does not contain the grouping attribute, it is collected into a "no grouping" boxcar; i.e., the machine instance is not lost.
For example, imagine you have a machine like the following:
states: - name: SpecifyGroup action: SpecifyGroup transitions: - event: next - to: ComputeGroupValue
- name: ComputeGroupValue
action: ComputeGroupValue
fan_in: 15
fan_in_group: report_date
combined with the following code:
class SpecifyGroup(object):
def execute(self, context, obj): context['report_date'] = 'some computation to select date here' return 'next'
class ComputeGroupValue(object):
def execute(self, contexts, obj): # here, all the contexts will have the same 'report_date' value
Note that if, for whatever reason, SpecifyGroup does not place a report_date on the context, those machine instances without report_date on their context will be grouped together and fanned in and passed to ComputeGroupValue.
A given machine can spawn a set of new machines - that is, not replicas like a continuation, but brand-new machines in their initial state.
On the context object, there is a method called spawn:
context.spawn(machineName, contexts, countdown=0, method='POST')
machineName - the name of the machine to spawn (string) contexts - a list of context dictionaries to create the new machine with, one for each desired machine countdown - the number of seconds to wait before spawning the machines method - the method (i.e., GET or POST) to initialize the machine with. This method is used to execute all queued tasks for that machine and any replicas it creates (via continuation).
Calling this method will queue a task for each requested machine (using the batch Queue.add() method).