from optparse import OptionParser
from subprocess import Popen, PIPE
import time
import sys
__all__ = [main, Workforce]
'''
Use Amazon ec2 instances to quickly run a program that takes lines from stdin.
Must be run on an ec2 controller (think ecmm.chitika.net).
Justin Pombrio, 2008
'''
parser = OptionParser()
parser.add_option('-n', '--workers', type='int', dest='workers',
help='Start this many ec2 instances')
parser.add_option('-e', '--executable', dest='executable',
help='The program to run')
parser.add_option('-a', '--auxilary', dest='auxilary',
help='Extra files required by the program')
parser.add_option('-s', '--syncronize', dest='syncronize',
help='Syncronize these files up and down')
parser.add_option('-b', '--batch_size', type='int', dest='batch_size', default=10,
help='Number of lines to be uploaded at once')
parser.add_option('-i', '--ami_id', dest='ami_id', default='ami-d206e3bb',
help='The id of the ami image to use for the workers')
parser.add_option('-q', '--quiet', action='store_const',
const=0, dest='verbosity')
parser.add_option('-v', '--verbose', action='store_const',
const=1, dest='verbosity', default=1)
parser.add_option('-vv', '--very_verbose', action='store_const',
const=2, dest='verbosity')
image_cols = ['type', 'ami_id', 'name', 'key', 'status', 'ownership']
instance_cols = ['type', 'instance_id', 'ami_id', 'description', 'ip',
'status', '???', 'number', 'size', 'starttime']
start_cols = instance_cols
terminate_cols = ['type', 'instance_id', 'status']
auxilary_rsync_opts = '-rz' # recurse into dirs, gzip files
syncronize_rsync_opts = '-urz' # recurse into dirs, gzip files, update only
# (do not overwrite a newer version of a file)
command_template = 'ssh -p %s root@%s cd ~/; %s' # % (port, ip, command)
sync_up_template = 'rsync %s -p %s %s root@%s:~/%s' # % (opts, port, file, ip, file)
sync_down_template = 'rsync %s -p %s root@%s:~/%s %s' # % (opts, port, ip, file, file)
delay = 5 # seconds
def group_by(iterable, length):
iterator = iterable.__iter__()
group = []
try:
while True:
group = []
count = 0
while count < length:
group.append(iterator.next())
count += 1
yield group
except StopIteration:
if group:
yield group
def read_ec2_output(fileobj, cols):
result = []
for line in fileobj:
row_dict = {}
row = line.rstrip('\n').split('\t')
for col, element in zip(cols, row):
row_dict[col] = element.strip()
result.append(row_dict)
return result
def run_ec2_command(args, cols):
process = Popen(args, stdout=PIPE)
process.wait()
return read_ec2_output(process.stdout, cols)
def read_info(info, requirements, key):
result = []
for row in info:
row_dict = {}
for entry in row:
requirement = requirements.get(entry)
if requirement is not None and row[entry] != requirement:
continue
row_dict[row[key]] = row
result.append(row_dict)
return result
def get_ip(internal_ip):
"""Get the ip address out of a string such as ip-00-000-000-00.ec2.internal"""
ip = internal_ip.split('.')[0]
if ip.startswith('ip-'):
ip = ip[3:]
ip = ip.replace('-', '.')
if len(ip) != 13:
raise ValueError("Unable to get the ip of %s." % internal_ip)
return ip
class Workforce(object):
def __init__(self):
self.port = 9999
# Only the instances started by this program
self.instances = {} # {instance_id : {attribute : value}} where attributes are from instance_cols
# Only available images
self.images = {} # {ami_id : {attribute : value}} where attributes are from image_cols
self.processes = []
self.upload = []
self.download = []
def update_images(self):
image_info = run_ec2_command(['ec2-describe-images'], image_cols)
self.images = read_info(image_info, {'type' : 'INSTANCE', 'status' : 'available'}, 'ami_id')
def update_instances(self):
instance_info = run_ec2_command(['ec2-describe-instances'] + self.instances.keys(), instance_cols)
self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id')
def get_instances(self, status, boolean):
if boolean:
return [instance for instance in self.instances if self.instances[instance]['status'] == status]
else:
return [instance for instance in self.instances if self.instances[instance]['status'] != status]
def _start_instances(self, command):
instance_info = run_ec2_command(command, instance_cols)
self.instances = read_info(instance_info, {'type' : 'INSTANCE'}, 'instance_id')
def show_status(instances, urgent_instances=[]):
if verbosity >= 2:
for instance in instances:
sys.stderr.write('Instance %s is %s.'
% (instance, self.instances[instance]['status']))
if verbosity >= 1:
for instance in urgent_instances:
sys.stderr.write('Instance %s is %s.'
% (instance, self.instances[instance]['status']))
def start(self, num, ami_id, keypair=None):
self.update_images()
if self.images.get(ami_id) is None:
raise ValueError('The ami identifier %s does not exist or is not available!' % ami_id)
args = ['ec2-run-instances', ami_id, '-n %i' % num]
if keypair:
args += ['-k %s' % keypair]
self._start_instances(args)
if len(self.instances) != num:
raise Exception('We cannot find all the instances! '
'We recommend that you find and shut them down manually. '
'The known instances are: %s.' % ' '.join(self.instances.keys()))
started = []
while True:
self.update_instances()
unstarted = self.get_instances('running', False)
if not unstarted:
break
just_started = [instance for instance in self.instances
if instance not in unstarted and instance not in started]
started += just_started
show_status(unstarted, just_started)
time.sleep(delay)
def _stop_instances(self, command):
process = Popen(command, stdout=PIPE)
process.wait()
#self.terminate = read_info(run_ec2_command(command, terminate_cols), {'type' : 'INSTANCE'}, 'instance_id')
def stop(self):
args = ['ec2-terminate-instances'] + self.instances.keys()
self._stop_instances(args)
terminated = []
while True:
self.update_instances()
unterminated = self.get_instances('terminated', False)
if not unterminated:
break
just_terminated = [instance for instance in self.instances
if instance not in unterminated and instance not in terminated]
self.show_status(unterminated, just_terminated)
time.sleep(delay)
if self.verbose >= 1:
sys.stderr.write('All instances have been terminated.\n')
def start_process(self, command, instances=self.instances):
for instance in self.instances:
ip = get_ip(self.instances[instance]['ip'])
command = 'bash %s' % command
process = Popen(command_template % (self.port, ip, command),
stdin=PIPE, stdout=sys.stdout, stderr=sys.stderr))
self.processes.append(process)
def sync_up(self, files, opts='-rz', instances=self.instances):
for instance in self.instances:
ip = get_ip(self.instances[instance]['ip'])
command = sync_up_template % (opts, self.port, files, ip, files)
self.upload.append(Popen(command, stdout=PIPE, stderr=sys.stderr))
def sync_down(self, files, opts='-rz', instances=self.instances):
for instance in self.instances:
ip = get_ip(self.instances[instance]['ip'])
command = sync_down_template % (opts, self.port, ip, files, files)
self.download.append(Popen(command, stdout=PIPE, stderr=sys.stderr))
def main():
options, args = parser.parse_args()
workers = options.workers
executable = options.executable
auxilary = options.auxilary
syncronized = options.syncronized
batch_size = options.batch_size
ami_id = options.ami_id
verbosity = options.verbosity
workforce = Workforce()
try:
# Start
workforce.start(workers, ami_id)
time.sleep(delay)
if verbosity >= 1:
sys.stderr.write('Uploading auxilary files, syncronized files, and executable.\n')
workforce.sync_up(auxilary, auxilary_rsync_opts)
workforce.sync_up(syncronized, syncronized_rsync_opts)
workforce.sync_up(executable, auxilary_rsync_opts)
if verbosity >= 1:
sys.stderr.write('Starting the executable.\n')
workforce.start_process(executable)
if verbosity >= 1:
sys.stderr.write('Feeding input to the executable.\n')
while True:
processes = [process for process in workforce.processes
if process.returncode is None] # unfinished processes
batches = group_by(group_by(sys.stdin, batch_size), processes)
for batch, process in zip(batches, processes):
for line in batch:
process.stdin.write(line)
# Wait until all the processes are done
if verbosity >= 1:
sys.stderr.write('Waiting until all the processes are finished.\n')
while True:
if verbosity >= 2:
print 'waiting...'
processes = [process for process in workforce.processes
if process.returncode is None]
if not processes:
break
time.sleep(delay)
# Sync_down the syncronized files
if verbosity >= 1:
sys.stderr.write('Downloading syncronized files.\n')
workforce.sync_down(syncronized, syncronized_rsync_opts)
# Shut down the instances
if verbosity >= 1:
sys.stderr.write('Shutting down instances.\n')
workforce.stop()
except Exception e:
sys.stderr.write('ERROR: %s\nTrying to shut down ec2 instances.' % e)
try:
workforce.stop()
except KeyboardInterrupt:
sys.stderr.write("""We are trying to close the instances!
Please don't interrupt.""")
try:
workforce.stop()
except KeyboardInterrupt:
sys.stderr.write("""If you interrupt one more time, you must close the instances manually!""")
workforce.stop()
except:
sys.stderr.write("""FAILED TO SHUT DOWN INSTANCES!""")
if __name__ == '__main__':
main()