Justin Pombrio

EC2 Map

from optparse import OptionParser
from subprocess import Popen, PIPE
import time
import sys

__all__ = [main, Workforce]

'''
Use Amazon ec2 instances to quickly run a program that takes lines from stdin.

Must be run on an ec2 controller (think ecmm.chitika.net).
Justin Pombrio, 2008
'''

parser = OptionParser()
parser.add_option('-n', '--workers', type='int', dest='workers',
                  help='Start this many ec2 instances')
parser.add_option('-e', '--executable', dest='executable',
                  help='The program to run')
parser.add_option('-a', '--auxilary', dest='auxilary',
                  help='Extra files required by the program')
parser.add_option('-s', '--syncronize', dest='syncronize',
                  help='Syncronize these files up and down')
parser.add_option('-b', '--batch_size', type='int', dest='batch_size', default=10,
                  help='Number of lines to be uploaded at once')
parser.add_option('-i', '--ami_id', dest='ami_id', default='ami-d206e3bb',
                  help='The id of the ami image to use for the workers')
parser.add_option('-q', '--quiet', action='store_const',
                  const=0, dest='verbosity') 
parser.add_option('-v', '--verbose', action='store_const',
                  const=1, dest='verbosity', default=1)
parser.add_option('-vv', '--very_verbose', action='store_const',
                  const=2, dest='verbosity')


image_cols = ['type', 'ami_id', 'name', 'key', 'status', 'ownership']

instance_cols = ['type', 'instance_id', 'ami_id', 'description', 'ip',
                 'status', '???', 'number', 'size', 'starttime']

start_cols = instance_cols

terminate_cols = ['type', 'instance_id', 'status']

auxilary_rsync_opts = '-rz' # recurse into dirs, gzip files

syncronize_rsync_opts = '-urz' # recurse into dirs, gzip files, update only
                               # (do not overwrite a newer version of a file)

command_template = 'ssh -p %s root@%s cd ~/; %s' # % (port, ip, command)

sync_up_template = 'rsync %s -p %s %s root@%s:~/%s' # % (opts, port, file, ip, file)

sync_down_template = 'rsync %s -p %s root@%s:~/%s %s' # % (opts, port, ip, file, file)

delay = 5 # seconds


def group_by(iterable, length):
    iterator = iterable.__iter__()
    group = []

    try:
        while True:
            group = []
            count = 0
            while count < length:
                group.append(iterator.next())
                count += 1
            yield group

    except StopIteration:
        if group:
            yield group

def read_ec2_output(fileobj, cols):
    result = []
    for line in fileobj:
        row_dict = {}
        row = line.rstrip('\n').split('\t')
        for col, element in zip(cols, row):
            row_dict[col] = element.strip()
        result.append(row_dict)

    return result

def run_ec2_command(args, cols):
    process = Popen(args, stdout=PIPE)
    process.wait()
    return read_ec2_output(process.stdout, cols)

def read_info(info, requirements, key):
    result = []
    for row in info:
        row_dict = {}
        for entry in row:
            requirement = requirements.get(entry)
            if requirement is not None and row[entry] != requirement:
                continue
        row_dict[row[key]] = row
        result.append(row_dict)
    return result

def get_ip(internal_ip):
    """Get the ip address out of a string such as ip-00-000-000-00.ec2.internal"""
    ip = internal_ip.split('.')[0]
    if ip.startswith('ip-'):
        ip = ip[3:]
    ip = ip.replace('-', '.')
    if len(ip) != 13:
        raise ValueError("Unable to get the ip of %s." % internal_ip)
    return ip


class Workforce(object):

    def __init__(self):
        self.port = 9999

        # Only the instances started by this program
        self.instances = {} # {instance_id : {attribute : value}}  where attributes are from instance_cols
        # Only available images
        self.images = {} # {ami_id : {attribute : value}}  where attributes are from image_cols

        self.processes = []
        self.upload = []
        self.download = []

    def update_images(self):
        image_info = run_ec2_command(['ec2-describe-images'], image_cols)
        self.images = read_info(image_info, {'type' : 'INSTANCE', 'status' : 'available'}, 'ami_id')

    def update_instances(self):
        instance_info = run_ec2_command(['ec2-describe-instances'] + self.instances.keys(), instance_cols)
        self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id')

    def get_instances(self, status, boolean):
        if boolean:
            return [instance for instance in self.instances if self.instances[instance]['status'] == status]
        else:
            return [instance for instance in self.instances if self.instances[instance]['status'] != status]

    def _start_instances(self, command):
        instance_info = run_ec2_command(command, instance_cols)
        self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id')

    def show_status(instances, urgent_instances=[]):
        if verbosity >= 2:
            for instance in instances:
                sys.stderr.write('Instance %s is %s.'
                                 % (instance, self.instances[instance]['status']))
        if verbosity >= 1:
            for instance in urgent_instances:
                sys.stderr.write('Instance %s is %s.'
                                 % (instance, self.instances[instance]['status']))

    def start(self, num, ami_id, keypair=None):
        self.update_images()
        if self.images.get(ami_id) is None:
            raise ValueError('The ami identifier %s does not exist or is not available!' % ami_id)

        args = ['ec2-run-instances', ami_id, '-n %i' % num]
        if keypair:
            args += ['-k %s' % keypair]

        self._start_instances(args)

        if len(self.instances) != num:
            raise Exception('We cannot find all the instances! '
                            'We recommend that you find and shut them down manually. '
                            'The known instances are: %s.' % ' '.join(self.instances.keys()))

        started = []
        while True:
            self.update_instances()
            unstarted = self.get_instances('running', False)
            if not unstarted:
                break
            just_started = [instance for instance in self.instances
                            if instance not in unstarted and instance not in started]
            started += just_started
            show_status(unstarted, just_started)    
            time.sleep(delay)

    def _stop_instances(self, command):
        process = Popen(command, stdout=PIPE)
        process.wait()
        #self.terminate = read_info(run_ec2_command(command, terminate_cols), {'type' : 'INSTANCE'}, 'instance_id')

    def stop(self):
        args = ['ec2-terminate-instances'] + self.instances.keys()
        self._stop_instances(args)

        terminated = []
        while True:
            self.update_instances()
            unterminated = self.get_instances('terminated', False)
            if not unterminated:
                break
            just_terminated = [instance for instance in self.instances
                               if instance not in unterminated and instance not in terminated]
            self.show_status(unterminated, just_terminated)
            time.sleep(delay)

        if self.verbose >= 1:
            sys.stderr.write('All instances have been terminated.\n')

    def start_process(self, command, instances=self.instances):
        for instance in self.instances:
            ip = get_ip(self.instances[instance]['ip'])
            command = 'bash %s' % command
            process = Popen(command_template % (self.port, ip, command),
                            stdin=PIPE, stdout=sys.stdout, stderr=sys.stderr))
            self.processes.append(process)

    def sync_up(self, files, opts='-rz', instances=self.instances):
        for instance in self.instances:
            ip = get_ip(self.instances[instance]['ip'])
            command = sync_up_template % (opts, self.port, files, ip, files)
            self.upload.append(Popen(command, stdout=PIPE, stderr=sys.stderr))

    def sync_down(self, files, opts='-rz', instances=self.instances):
        for instance in self.instances:
            ip = get_ip(self.instances[instance]['ip'])
            command = sync_down_template % (opts, self.port, ip, files, files)
            self.download.append(Popen(command, stdout=PIPE, stderr=sys.stderr))


def main():
    options, args = parser.parse_args()
    workers = options.workers
    executable = options.executable
    auxilary = options.auxilary
    syncronized = options.syncronized
    batch_size = options.batch_size
    ami_id = options.ami_id
    verbosity = options.verbosity

    workforce = Workforce()

    try:
        # Start
        workforce.start(workers, ami_id)
        time.sleep(delay)

        if verbosity >= 1:
            sys.stderr.write('Uploading auxilary files, syncronized files, and executable.\n')
        workforce.sync_up(auxilary, auxilary_rsync_opts)
        workforce.sync_up(syncronized, syncronized_rsync_opts)
        workforce.sync_up(executable, auxilary_rsync_opts)

        if verbosity >= 1:
            sys.stderr.write('Starting the executable.\n')
        workforce.start_process(executable)

        if verbosity >= 1:
            sys.stderr.write('Feeding input to the executable.\n')
        while True:
            processes = [process for process in workforce.processes
                         if process.returncode is None] # unfinished processes
            batches = group_by(group_by(sys.stdin, batch_size), processes)
            for batch, process in zip(batches, processes):
                for line in batch:
                    process.stdin.write(line)

        # Wait until all the processes are done
        if verbosity >= 1:
            sys.stderr.write('Waiting until all the processes are finished.\n')

        while True:
            if verbosity >= 2:
                print 'waiting...'

            processes = [process for process in workforce.processes
                         if process.returncode is None]
            if not processes:
                break
            time.sleep(delay)

        # Sync_down the syncronized files
        if verbosity >= 1:
            sys.stderr.write('Downloading syncronized files.\n')
        workforce.sync_down(syncronized, syncronized_rsync_opts)

        # Shut down the instances
        if verbosity >= 1:
            sys.stderr.write('Shutting down instances.\n')
        workforce.stop()

    except Exception e:
        sys.stderr.write('ERROR: %s\nTrying to shut down ec2 instances.' % e)
        try:
            workforce.stop()
        except KeyboardInterrupt:
            sys.stderr.write("""We are trying to close the instances!
                                Please don't interrupt.""")
            try:
                workforce.stop()
            except KeyboardInterrupt:
                sys.stderr.write("""If you interrupt one more time, you must close the instances manually!""")
                workforce.stop()
        except:
            sys.stderr.write("""FAILED TO SHUT DOWN INSTANCES!""")

if __name__ == '__main__':
    main()