from optparse import OptionParser from subprocess import Popen, PIPE import time import sys __all__ = [main, Workforce] ''' Use Amazon ec2 instances to quickly run a program that takes lines from stdin. Must be run on an ec2 controller (think ecmm.chitika.net). Justin Pombrio, 2008 ''' parser = OptionParser() parser.add_option('-n', '--workers', type='int', dest='workers', help='Start this many ec2 instances') parser.add_option('-e', '--executable', dest='executable', help='The program to run') parser.add_option('-a', '--auxilary', dest='auxilary', help='Extra files required by the program') parser.add_option('-s', '--syncronize', dest='syncronize', help='Syncronize these files up and down') parser.add_option('-b', '--batch_size', type='int', dest='batch_size', default=10, help='Number of lines to be uploaded at once') parser.add_option('-i', '--ami_id', dest='ami_id', default='ami-d206e3bb', help='The id of the ami image to use for the workers') parser.add_option('-q', '--quiet', action='store_const', const=0, dest='verbosity') parser.add_option('-v', '--verbose', action='store_const', const=1, dest='verbosity', default=1) parser.add_option('-vv', '--very_verbose', action='store_const', const=2, dest='verbosity') image_cols = ['type', 'ami_id', 'name', 'key', 'status', 'ownership'] instance_cols = ['type', 'instance_id', 'ami_id', 'description', 'ip', 'status', '???', 'number', 'size', 'starttime'] start_cols = instance_cols terminate_cols = ['type', 'instance_id', 'status'] auxilary_rsync_opts = '-rz' # recurse into dirs, gzip files syncronize_rsync_opts = '-urz' # recurse into dirs, gzip files, update only # (do not overwrite a newer version of a file) command_template = 'ssh -p %s root@%s cd ~/; %s' # % (port, ip, command) sync_up_template = 'rsync %s -p %s %s root@%s:~/%s' # % (opts, port, file, ip, file) sync_down_template = 'rsync %s -p %s root@%s:~/%s %s' # % (opts, port, ip, file, file) delay = 5 # seconds def group_by(iterable, length): iterator = iterable.__iter__() group = [] try: while True: group = [] count = 0 while count < length: group.append(iterator.next()) count += 1 yield group except StopIteration: if group: yield group def read_ec2_output(fileobj, cols): result = [] for line in fileobj: row_dict = {} row = line.rstrip('\n').split('\t') for col, element in zip(cols, row): row_dict[col] = element.strip() result.append(row_dict) return result def run_ec2_command(args, cols): process = Popen(args, stdout=PIPE) process.wait() return read_ec2_output(process.stdout, cols) def read_info(info, requirements, key): result = [] for row in info: row_dict = {} for entry in row: requirement = requirements.get(entry) if requirement is not None and row[entry] != requirement: continue row_dict[row[key]] = row result.append(row_dict) return result def get_ip(internal_ip): """Get the ip address out of a string such as ip-00-000-000-00.ec2.internal""" ip = internal_ip.split('.')[0] if ip.startswith('ip-'): ip = ip[3:] ip = ip.replace('-', '.') if len(ip) != 13: raise ValueError("Unable to get the ip of %s." % internal_ip) return ip class Workforce(object): def __init__(self): self.port = 9999 # Only the instances started by this program self.instances = {} # {instance_id : {attribute : value}} where attributes are from instance_cols # Only available images self.images = {} # {ami_id : {attribute : value}} where attributes are from image_cols self.processes = [] self.upload = [] self.download = [] def update_images(self): image_info = run_ec2_command(['ec2-describe-images'], image_cols) self.images = read_info(image_info, {'type' : 'INSTANCE', 'status' : 'available'}, 'ami_id') def update_instances(self): instance_info = run_ec2_command(['ec2-describe-instances'] + self.instances.keys(), instance_cols) self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id') def get_instances(self, status, boolean): if boolean: return [instance for instance in self.instances if self.instances[instance]['status'] == status] else: return [instance for instance in self.instances if self.instances[instance]['status'] != status] def _start_instances(self, command): instance_info = run_ec2_command(command, instance_cols) self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id') def show_status(instances, urgent_instances=[]): if verbosity >= 2: for instance in instances: sys.stderr.write('Instance %s is %s.' % (instance, self.instances[instance]['status'])) if verbosity >= 1: for instance in urgent_instances: sys.stderr.write('Instance %s is %s.' % (instance, self.instances[instance]['status'])) def start(self, num, ami_id, keypair=None): self.update_images() if self.images.get(ami_id) is None: raise ValueError('The ami identifier %s does not exist or is not available!' % ami_id) args = ['ec2-run-instances', ami_id, '-n %i' % num] if keypair: args += ['-k %s' % keypair] self._start_instances(args) if len(self.instances) != num: raise Exception('We cannot find all the instances! ' 'We recommend that you find and shut them down manually. ' 'The known instances are: %s.' % ' '.join(self.instances.keys())) started = [] while True: self.update_instances() unstarted = self.get_instances('running', False) if not unstarted: break just_started = [instance for instance in self.instances if instance not in unstarted and instance not in started] started += just_started show_status(unstarted, just_started) time.sleep(delay) def _stop_instances(self, command): process = Popen(command, stdout=PIPE) process.wait() #self.terminate = read_info(run_ec2_command(command, terminate_cols), {'type' : 'INSTANCE'}, 'instance_id') def stop(self): args = ['ec2-terminate-instances'] + self.instances.keys() self._stop_instances(args) terminated = [] while True: self.update_instances() unterminated = self.get_instances('terminated', False) if not unterminated: break just_terminated = [instance for instance in self.instances if instance not in unterminated and instance not in terminated] self.show_status(unterminated, just_terminated) time.sleep(delay) if self.verbose >= 1: sys.stderr.write('All instances have been terminated.\n') def start_process(self, command, instances=self.instances): for instance in self.instances: ip = get_ip(self.instances[instance]['ip']) command = 'bash %s' % command process = Popen(command_template % (self.port, ip, command), stdin=PIPE, stdout=sys.stdout, stderr=sys.stderr)) self.processes.append(process) def sync_up(self, files, opts='-rz', instances=self.instances): for instance in self.instances: ip = get_ip(self.instances[instance]['ip']) command = sync_up_template % (opts, self.port, files, ip, files) self.upload.append(Popen(command, stdout=PIPE, stderr=sys.stderr)) def sync_down(self, files, opts='-rz', instances=self.instances): for instance in self.instances: ip = get_ip(self.instances[instance]['ip']) command = sync_down_template % (opts, self.port, ip, files, files) self.download.append(Popen(command, stdout=PIPE, stderr=sys.stderr)) def main(): options, args = parser.parse_args() workers = options.workers executable = options.executable auxilary = options.auxilary syncronized = options.syncronized batch_size = options.batch_size ami_id = options.ami_id verbosity = options.verbosity workforce = Workforce() try: # Start workforce.start(workers, ami_id) time.sleep(delay) if verbosity >= 1: sys.stderr.write('Uploading auxilary files, syncronized files, and executable.\n') workforce.sync_up(auxilary, auxilary_rsync_opts) workforce.sync_up(syncronized, syncronized_rsync_opts) workforce.sync_up(executable, auxilary_rsync_opts) if verbosity >= 1: sys.stderr.write('Starting the executable.\n') workforce.start_process(executable) if verbosity >= 1: sys.stderr.write('Feeding input to the executable.\n') while True: processes = [process for process in workforce.processes if process.returncode is None] # unfinished processes batches = group_by(group_by(sys.stdin, batch_size), processes) for batch, process in zip(batches, processes): for line in batch: process.stdin.write(line) # Wait until all the processes are done if verbosity >= 1: sys.stderr.write('Waiting until all the processes are finished.\n') while True: if verbosity >= 2: print 'waiting...' processes = [process for process in workforce.processes if process.returncode is None] if not processes: break time.sleep(delay) # Sync_down the syncronized files if verbosity >= 1: sys.stderr.write('Downloading syncronized files.\n') workforce.sync_down(syncronized, syncronized_rsync_opts) # Shut down the instances if verbosity >= 1: sys.stderr.write('Shutting down instances.\n') workforce.stop() except Exception e: sys.stderr.write('ERROR: %s\nTrying to shut down ec2 instances.' % e) try: workforce.stop() except KeyboardInterrupt: sys.stderr.write("""We are trying to close the instances! Please don't interrupt.""") try: workforce.stop() except KeyboardInterrupt: sys.stderr.write("""If you interrupt one more time, you must close the instances manually!""") workforce.stop() except: sys.stderr.write("""FAILED TO SHUT DOWN INSTANCES!""") if __name__ == '__main__': main()