Distributed locking with AWS SWF

 

Maybe one of the first question asked by everyone has a system running on more than one computer is how to make sure that a task is processed by one and only one server from that fleet. In fact, this is a classic problem in the distributed systems ecosystem, and its answer has 2 distinct parts: to assign the processor to the task (leader election), but also to provide a mechanism to detect the processor failure (failure detection) and task reassignment to another worker.

A solution for the first point is a locking mechanism, meaning that a set of processes are fighting to acquire a lock and whoever is the lucky guy to take it, has the right to work on task completion. Most of the implementations for this solution use a strongly consistent datastore (and of course durable and highly available), but today we want to show you another way to do that, using AWS SWF and a key concept behind this service: on an SWF domain we cannot have more than one running workflow with a given ID.

To prove our solution, we used the workflow described in a previous post and we tried to start 2 simultaneous executions using a CountDownLatch from Java. In that way, we can make sure both threads are started – a thread signals that it starts by counting down the latch value (line 61), but they wait until from the main thread this value is decremented to 0(line 40). In that way, the 2 threads move forward and try to start the workflow. The code we used is listed below.

public static void main(String[] args) throws Exception {

        AmazonSimpleWorkflowClientBuilder standard = AmazonSimpleWorkflowClientBuilder.standard();
        standard.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("https://swf.eu-west-1.amazonaws.com", "eu-west-1"));
        AmazonSimpleWorkflow service = standard.build();
        String domain = "HelloWorldDomain";
        String taskListToPoll = "HelloWorldDomain";
        ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
        aw.addActivitiesImplementation(new ComputerActivitiesImpl());
        aw.start();

        ComputerActivitiesClient activitiesClient = new ComputerActivitiesClientImpl();
        DecisionContextProvider provider = new DecisionContextProviderImpl();
        ComputerWorkflowSelfClient selfClient = new ComputerWorkflowSelfClientImpl();
        ComputerWorkflow computerWorkflow = new ComputerWorkflowImpl(activitiesClient, provider, selfClient);
        SpringWorkflowWorker wfw = new SpringWorkflowWorker(service, domain, taskListToPoll);
        wfw.addWorkflowImplementation(computerWorkflow);
        wfw.start();

        ComputerWorkflowClientExternalFactory factory = new ComputerWorkflowClientExternalFactoryImpl(service, domain);
        ComputerWorkflowClientExternal workflow = factory.getClient("someID");
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new ServiceRunner("Worker 1", latch, workflow));
        executorService.submit(new ServiceRunner("Worker 2", latch, workflow));

        while (latch.getCount() != 1) {
            LOG.info("Waiting a while until threads started...");
            Thread.sleep(2000);
        }
        latch.countDown();
    }
private class ServiceRunner implements Runnable {
        private final String name;
        private final CountDownLatch latch;
        private final ComputerWorkflowClientExternal workflow;

        public ServiceRunner(String name, CountDownLatch latch, ComputerWorkflowClientExternal workflow) {
            this.name = name;
            this.latch = latch;
            this.workflow = workflow;
        }

        @Override
        public void run() {
            LOG.info("Service {} started.", name);
            latch.countDown();
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                workflow.compute();
                LOG.info(" Workflow started by thread {}", name);
            } catch (Exception e) {
                LOG.info(" Exception on thread {}. Details: {} ", name, e);
            }
        }
    }

And the outcome is:

AA:BB:CC.425 [pool-4-thread-1] INFO Service Worker 1 started. 
AA:BB:CC.425 [pool-4-thread-2] INFO Service Worker 2 started. 
AA:BB:CC.917 [pool-4-thread-2] INFO Workflow started by thread Worker 2 
AA:BB:CC.298 [pool-4-thread-1] INFO Exception on thread Worker 1. Details: com.amazonaws.services.simpleworkflow.model.WorkflowExecutionAlreadyStartedException: null (Service: AmazonSimpleWorkflow; Status Code: 400; Error Code: WorkflowExecutionAlreadyStartedFault;

Basically, these log lines shows that only one thread was able to launch the workflow, whereas the other one received an exception indicating there is another instance with the same id running on the same domain.

To solve the failure detection aspects, you have to implement a mechanism to try to trigger the workflow until a condition is completed – mark somewhere that the workflow job was successfully completed.

Our advice is to ask yourself if you really need a locking mechanism and to try to make your architecture to support at-least-once approach, because in this case other solutions could solve simpler your problem. But if you really need such a solution, keep in mind that SWF is a possible candidate that works fine, without costing a lot of money, but it brings some overhead in terms of dependencies and testing.

We hope that you’ve found these tips handy, but the best is yet to come – in the next post of this series we’ll show you how to solve a frequent exception seen while using SWF. Thanks again for being our reader!