fromoptparseimportOptionParserfromsubprocessimportPopen,PIPEimporttimeimportsys__all__=[main,Workforce]'''Use Amazon ec2 instances to quickly run a program that takes lines from stdin.Must be run on an ec2 controller (think ecmm.chitika.net).Justin Pombrio, 2008'''parser=OptionParser()parser.add_option('-n','--workers',type='int',dest='workers',help='Start this many ec2 instances')parser.add_option('-e','--executable',dest='executable',help='The program to run')parser.add_option('-a','--auxilary',dest='auxilary',help='Extra files required by the program')parser.add_option('-s','--syncronize',dest='syncronize',help='Syncronize these files up and down')parser.add_option('-b','--batch_size',type='int',dest='batch_size',default=10,help='Number of lines to be uploaded at once')parser.add_option('-i','--ami_id',dest='ami_id',default='ami-d206e3bb',help='The id of the ami image to use for the workers')parser.add_option('-q','--quiet',action='store_const',const=0,dest='verbosity')parser.add_option('-v','--verbose',action='store_const',const=1,dest='verbosity',default=1)parser.add_option('-vv','--very_verbose',action='store_const',const=2,dest='verbosity')image_cols=['type','ami_id','name','key','status','ownership']instance_cols=['type','instance_id','ami_id','description','ip','status','???','number','size','starttime']start_cols=instance_colsterminate_cols=['type','instance_id','status']auxilary_rsync_opts='-rz'# recurse into dirs, gzip filessyncronize_rsync_opts='-urz'# recurse into dirs, gzip files, update only# (do not overwrite a newer version of a file)command_template='ssh -p %s root@%s cd ~/; %s'# % (port, ip, command)sync_up_template='rsync %s -p %s%s root@%s:~/%s'# % (opts, port, file, ip, file)sync_down_template='rsync %s -p %s root@%s:~/%s%s'# % (opts, port, ip, file, file)delay=5# secondsdefgroup_by(iterable,length):iterator=iterable.__iter__()group=[]try:whileTrue:group=[]count=0whilecount<length:group.append(iterator.next())count+=1yieldgroupexceptStopIteration:ifgroup:yieldgroupdefread_ec2_output(fileobj,cols):result=[]forlineinfileobj:row_dict={}row=line.rstrip('\n').split('\t')forcol,elementinzip(cols,row):row_dict[col]=element.strip()result.append(row_dict)returnresultdefrun_ec2_command(args,cols):process=Popen(args,stdout=PIPE)process.wait()returnread_ec2_output(process.stdout,cols)defread_info(info,requirements,key):result=[]forrowininfo:row_dict={}forentryinrow:requirement=requirements.get(entry)ifrequirementisnotNoneandrow[entry]!=requirement:continuerow_dict[row[key]]=rowresult.append(row_dict)returnresultdefget_ip(internal_ip):"""Get the ip address out of a string such as ip-00-000-000-00.ec2.internal"""ip=internal_ip.split('.')[0]ifip.startswith('ip-'):ip=ip[3:]ip=ip.replace('-','.')iflen(ip)!=13:raiseValueError("Unable to get the ip of %s."%internal_ip)returnipclassWorkforce(object):def__init__(self):self.port=9999# Only the instances started by this programself.instances={}# {instance_id : {attribute : value}} where attributes are from instance_cols# Only available imagesself.images={}# {ami_id : {attribute : value}} where attributes are from image_colsself.processes=[]self.upload=[]self.download=[]defupdate_images(self):image_info=run_ec2_command(['ec2-describe-images'],image_cols)self.images=read_info(image_info,{'type':'INSTANCE','status':'available'},'ami_id')defupdate_instances(self):instance_info=run_ec2_command(['ec2-describe-instances']+self.instances.keys(),instance_cols)self.instances=read_info(instance_info,{'type':'INSTANCE'},'instance_id')defget_instances(self,status,boolean):ifboolean:return[instanceforinstanceinself.instancesifself.instances[instance]['status']==status]else:return[instanceforinstanceinself.instancesifself.instances[instance]['status']!=status]def_start_instances(self,command):instance_info=run_ec2_command(command,instance_cols)self.instances=read_info(instance_info,{'type':'INSTANCE'},'instance_id')defshow_status(instances,urgent_instances=[]):ifverbosity>=2:forinstanceininstances:sys.stderr.write('Instance %s is %s.'%(instance,self.instances[instance]['status']))ifverbosity>=1:forinstanceinurgent_instances:sys.stderr.write('Instance %s is %s.'%(instance,self.instances[instance]['status']))defstart(self,num,ami_id,keypair=None):self.update_images()ifself.images.get(ami_id)isNone:raiseValueError('The ami identifier %s does not exist or is not available!'%ami_id)args=['ec2-run-instances',ami_id,'-n %i'%num]ifkeypair:args+=['-k %s'%keypair]self._start_instances(args)iflen(self.instances)!=num:raiseException('We cannot find all the instances! ''We recommend that you find and shut them down manually. ''The known instances are: %s.'%' '.join(self.instances.keys()))started=[]whileTrue:self.update_instances()unstarted=self.get_instances('running',False)ifnotunstarted:breakjust_started=[instanceforinstanceinself.instancesifinstancenotinunstartedandinstancenotinstarted]started+=just_startedshow_status(unstarted,just_started)time.sleep(delay)def_stop_instances(self,command):process=Popen(command,stdout=PIPE)process.wait()#self.terminate = read_info(run_ec2_command(command, terminate_cols), {'type' : 'INSTANCE'}, 'instance_id')defstop(self):args=['ec2-terminate-instances']+self.instances.keys()self._stop_instances(args)terminated=[]whileTrue:self.update_instances()unterminated=self.get_instances('terminated',False)ifnotunterminated:breakjust_terminated=[instanceforinstanceinself.instancesifinstancenotinunterminatedandinstancenotinterminated]self.show_status(unterminated,just_terminated)time.sleep(delay)ifself.verbose>=1:sys.stderr.write('All instances have been terminated.\n')defstart_process(self,command,instances=self.instances):forinstanceinself.instances:ip=get_ip(self.instances[instance]['ip'])command='bash %s'%commandprocess=Popen(command_template%(self.port,ip,command),stdin=PIPE,stdout=sys.stdout,stderr=sys.stderr))self.processes.append(process)defsync_up(self,files,opts='-rz',instances=self.instances):forinstanceinself.instances:ip=get_ip(self.instances[instance]['ip'])command=sync_up_template%(opts,self.port,files,ip,files)self.upload.append(Popen(command,stdout=PIPE,stderr=sys.stderr))defsync_down(self,files,opts='-rz',instances=self.instances):forinstanceinself.instances:ip=get_ip(self.instances[instance]['ip'])command=sync_down_template%(opts,self.port,ip,files,files)self.download.append(Popen(command,stdout=PIPE,stderr=sys.stderr))defmain():options,args=parser.parse_args()workers=options.workersexecutable=options.executableauxilary=options.auxilarysyncronized=options.syncronizedbatch_size=options.batch_sizeami_id=options.ami_idverbosity=options.verbosityworkforce=Workforce()try:# Startworkforce.start(workers,ami_id)time.sleep(delay)ifverbosity>=1:sys.stderr.write('Uploading auxilary files, syncronized files, and executable.\n')workforce.sync_up(auxilary,auxilary_rsync_opts)workforce.sync_up(syncronized,syncronized_rsync_opts)workforce.sync_up(executable,auxilary_rsync_opts)ifverbosity>=1:sys.stderr.write('Starting the executable.\n')workforce.start_process(executable)ifverbosity>=1:sys.stderr.write('Feeding input to the executable.\n')whileTrue:processes=[processforprocessinworkforce.processesifprocess.returncodeisNone]# unfinished processesbatches=group_by(group_by(sys.stdin,batch_size),processes)forbatch,processinzip(batches,processes):forlineinbatch:process.stdin.write(line)# Wait until all the processes are doneifverbosity>=1:sys.stderr.write('Waiting until all the processes are finished.\n')whileTrue:ifverbosity>=2:print'waiting...'processes=[processforprocessinworkforce.processesifprocess.returncodeisNone]ifnotprocesses:breaktime.sleep(delay)# Sync_down the syncronized filesifverbosity>=1:sys.stderr.write('Downloading syncronized files.\n')workforce.sync_down(syncronized,syncronized_rsync_opts)# Shut down the instancesifverbosity>=1:sys.stderr.write('Shutting down instances.\n')workforce.stop()exceptExceptione:sys.stderr.write('ERROR: %s\nTrying to shut down ec2 instances.'%e)try:workforce.stop()exceptKeyboardInterrupt:sys.stderr.write("""We are trying to close the instances! Please don't interrupt.""")try:workforce.stop()exceptKeyboardInterrupt:sys.stderr.write("""If you interrupt one more time, you must close the instances manually!""")workforce.stop()except:sys.stderr.write("""FAILED TO SHUT DOWN INSTANCES!""")if__name__=='__main__':main()