Insanely simple distributed clusters using NodeJS and Beanstalkd and Ansible.

I've been playing around with distributed computing for a while now. Earlier this year, I had a bitcoin mining operation running in my garage. Though I am not a bitcoin millionaire, I did manage to recover my investment. But more importantly, I gathered a lot of experience with the tools available to manage, develop and deploy code on compute clusters.

What is a cluster and what does it give me?

You have a job that requires some compute resources - CPU, Storage, Network, etc. You want to scale out your job so it can process larger inputs in a shorter time. So you buy a beefier computer and run your job on that. You will eventually hit the limit of what a single machine can do. At this point, you scale out your job across several machines.

This group of machines is collectively operating as a cluster. Clusters are a way for you to scale your jobs beyond the limit of a single machine.

Whatever the specific job is, any compute cluster operates in a simple manner. You set up a bunch of machines with compute resources. You deploy some code on them which knows how to process a part of the input data independently. Then you build some logic that is able to break the input into part chunks and distributes that across all the machines in the cluster.

The machines execute their individual subtasks in parallel and report back with their part results. Typically, there is some additional logic to combine these part results back into a full results. This should sound very familar if you know what Map-Reduce is.

In this article I, will explain how you can build your own distributed computing platform. I am going to demonstrate a specific example of a Distributed Web Crawler using the following:

  • Amazon EC2 as the hardware provisioning service.
  • Ansible over SSH as the distributed deployment tool.
  • Beanstalkd as the central datastore for assigning jobs and receiving results.
  • PhantomJS and WebPageReplay to crawl websites.
  • Node.js as the the glue that holds everything together.
Why a web crawler?

I've been recently fascinated with the idea of an offline internet. Projects like Outernet aim to bring downstream-only 'internet' access to the masses with their lantern project on Indiegogo. Essentially 10MB/day of curated content is streamed down to a satellite receiver - which doubles as a WiFi hotspot. You access the content with your smartphone.

10MB/Day is a bit limiting. I don't even get to control what content shows up. Here is a different idea. Why not do a siterip of something like Wikipedia, put that on a WiFi router instead? Going on a trip and don't want to pay for roaming? Site-rip GoogleMaps, Yelp and TripAdvisor for the specific cities you're visiting. Choose whatever content you want.

For this we will primarily use WebPageReplay - a tool that allows you to take a snapshot of a subset of the internet. There are two modes of operation to WebPageReplay - record mode - where it records every HTTP/HTTPS request and response through your computer. You start it up in record mode, start browsing some websites. Replay records them and stores them on to disk.

The second mode - playback mode is pretty-self explanatory. Replay now spins up its own DNS & HTTP servers and waits for a request from your browser. When you type in a URL into the browser, Replay intercepts the request and serves the site to the browser from its recorded file. So you never hit your data connection.

So that's what we're going to do with our cluster. We're going to crawl through Wikipedia's top 5000 articles.

But I want to do something else with my cluster

Feel free to use this guide to spin up your own cluster with your own tasks. I am talking about clusters in the context of webcrawling because:

  • Web crawling is an embarassingly parallel task.
  • No obvious issues with legality or morality.

How exactly do we crawl the web?

To record the pages, we're going to enable Replay on record mode and visit a list of urls on a browser running on that machine.

Why not just curl or wget all the URLs?

Because wget only gives you the single main document resource on a site. We want to capture all the content of a page. Pages that fetch resources through javascript - we want to crawl those as well. Pages like these, you need something that executes the page's javascript. We need a browser. I could spin up Chrome on each of my cluster instances to visit the pages. But there is a better alternative.

PhantomJS is a fully scriptable headless webkit browser. You script the browser in Javascript. For our case, all we're going to do is to load a page and wait for it to finish. Here a guy who scripted PhantomJS to order pizza.

Architecture

For our distributed cluster, we will use the following simple architecture.

A simple cluster

Beanstalkd is a work queue running on the Master node. The workers connect to it and listen for jobs.

Your Desktop then connects into the Master and starts injecting the list of URLs you want to crawl.

The workers then pick up these URLs and start crawling. If they find any links in the URL, they add them back into the queue until they hit a certain maximum crawl depth.

Think of it as the classic interview problem of Breadth-First-Search mixed in with a Producers-Consumers problem.

Before we go all distributed up in this blog, let's test this architecture locally. Let's spin up beanstalkd on our local box and simulate a few workers and a master - all running on our local box.

Spin up beanstalkd:

We're going to use the following local worker task. All it does is to reserve a job and wait a random interval upto 2 seconds before marking it as done.

The local job injector. This simple script injects some random job payloads into our workqueue.

Looks like they are working well. Now to distribute this across machines.

Provision hardware

At this point, we need some cluster hardware. If you have a cluster of machines sitting in your garage, go ahead and use those.

For the rest of you, I would recommend you create an AWS account and spin up some instances. AWS offers a free tier - up to 750 hours of EC2 t2.micro capacity. We're going to spin up 5 for our demonstration.

t2.micros cost ~1 penny/hr. There is an instance limit of 20 per region. So you can't really go crazy with your spending, even if you screw something up.

Spin them up with the following settings in the region closest to you:

  • AMI: Ubuntu 14.04 Server
  • Storage: 8GB SSD (default)
  • Security groups: Expose 22(for SSH) and 443 (for work queues)

Create and use a keypair - you will need this to ssh into the hosts. Once the hosts have spun up, note down their public IPs.

Development and Deployment

Now it is important to note how we develop and deploy the pieces of code that will run on the cluster hosts.

In a typical local development project the feedback loop dev > deploy > test > dev is so short we never even notice it. If you're writing code in Node, tools like mocha make the cycle as short as a few milliseconds - so you know immediately when you've broken something.

Check out this ultra fast feedback look below. Here I am running a Node.js express app with mocha unit tests. A grunt task monitors the source directory for file changes. When I hit 'save' on my editor, grunt spins up a live-reload in my browser window and kicks off my unit tests.

We want to use a super-fast feedback loop for developing our cluster scripts.

This is easy if you're doing development on a localbox. But with clusters, it gets complex - simply because there are a lot of hosts. If we code in a way that requires EC2 to restart hosts, then the cycle time would become unacceptably high. So we bias towards optimizing the feedback loop as much as possible.

Alright enough talk. Let's get down to it.

Workers

Each worker will use PhantomJS to visit a particular URL. PhantomJS is a headless scriptable Webkit browser.

On the same machine, an instance of WebPageReplay will run in record mode. WebPageReplay is a tool that records every HTTP/HTTPS request and response going through a machine.

Why use this instead of something like wget? This setup seems overkill, no?

It probably is, depending on the URLs you wish to crawl. If the sites you want to crawl have completely static resources, then you can use wget.

However, most modern sites delay-load most of their content. Gmail as a classic example retrieves all mail items in Javascript. So you need to run a page's javascript to get all the resources that contain your actual emails.

So here, PhantomJS handles running javascript on the site and loading the site. WebPageReplay is running in the background recording everything going through the pipe.

PhantomJS can also be scripted to tackle more complex interactions. Say you want to automate something in your page - like logging in or clicking on items etc - you can do all of this easily with PhantomJS.

PhantomJS will also parse the fully loaded page and extract links - which will then get added back into the work queue depending on the crawl depth halt condition.

All of this logic will be glued together with a node script running on the worker instances.

Master

The master node simply hosts a beanstalkd instance. Beanstalkd is a fast work queue. In this context, we will create two queues in beanstalk - one for jobs, and another for results. All the worker nodes will then connect to these queues to take jobs and post results.

We will also directly connect our desktop to the master to push in the initial list of URLs we want to crawl.

Provision Hardware

We're first going to provision some EC2 instances. We will use 5 t2.micros for this demonstration.

t2.micros cost ~1 penny/hr. They have one core and low network bandwidth.

Spin them up with the following settings:

  • AMI: Ubuntu 14.04 Server
  • Storage: 8GB SSD (default)
  • Security groups: Expose 22(for SSH) and 443 (for work queues)

Create and use a keypair - you will need this to ssh into the hosts. Once the hosts have spun up, note down their public IPs.

Setup and Deploy

Now that your instances are running, the first thing we're going to do is to deploy code to them and start them up.

But our code has some dependencies. We're writing everything in Node.js, so we first need to make sure we install Node. We also need to setup PhantomJS and check out WebPageReplay from GitHub.

The simplest way to do this is to SSH into each of the hosts and run the appropriate setup tasks. After that is done, we SCP(File copy over SSH) our code into the box and then start them up, again via SSH.

Ansible is a tool that essentially automates this whole process. Ansible is a full-fledged deployment, task-execution and configuration management platform that is highly optimized to handle fast idempotent configurations.

We are going to first tell Ansible the hardware we have available to us. We do this by specifying the IPs of our hosts.

Here, we've dedicated one host to be the master and the rest of them to be workers. Pretty straightforward.

Next we specify the configuration we want on these roles. We do this using a playbook. A playbook allows very extensive configuration. But for our purpose, we are simply going to specify a list of tasks we wish to perform on each of the roles (workers and master) to deploy our code and start the instances up.

We follow the three steps:

  • Install dependencies using apt.
  • Copy over the scripts from our desktop to the cluster hosts.
  • Start the scripts on each of the cluster hosts.

Now we run the playbook:

**

Just like that, we've setup dependencies on all of our hosts, copied over our scripts and started running them. That's our feedback loop. Not quite short enough to run tests on every line change, but this is as close as we're going to get with our cluster.

How is it so much faster than normal SSH?

Ansible is smart enough to not do anything if it determines an action is unecessary. In the above example, there are several log lines that simply say 'ok' - these are emitted when we ask to install dependencies which already are installed. The lines that say 'changed' are the ones where an action was actually performed.

Plus, Ansible will spin up concurrent SSH connections to the hosts in your inventory. So All the tasks are performed in parallel.

Run the cluster

Let's do a quick check that our queues are working correctly.

Now that our cluster is up and running, let's connect to it and start crawling the web.

For this, we need a simple script that inserts jobs into our workqueues running on the master.