Skip to content

Commit

Permalink
bench: update for python3, boto3, and defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
martingrambow authored and mreiferson committed Jun 16, 2020
1 parent 460bf6a commit 7c4e954
Showing 1 changed file with 75 additions and 80 deletions.
155 changes: 75 additions & 80 deletions bench/bench.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
#!/usr/bin/env python
#!/usr/bin/env python3

#
# This script bootstraps an NSQ cluster in EC2 and runs benchmarks.
#
# Requires python3 and the following packages:
# - boto3
# - paramiko
# - tornado
#
# AWS authentication is delegated entirely to the boto3 environment, see:
#
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
#
# EC2 instances are launched into EC2 Classic, expecting a 'default' security group
# that allows allows SSH (port 22) from 0.0.0.0/0 and an EC2 key pair created
# (named 'default', but configurable via --ssh-key-name).
#

import sys
import logging
import time
Expand All @@ -7,7 +25,7 @@
import warnings
import hashlib

import boto.ec2
import boto3
import paramiko.client
import paramiko.ssh_exception
import tornado.options
Expand Down Expand Up @@ -40,8 +58,8 @@ def ssh_cmd(ssh_client, cmd, timeout=2):
chan.settimeout(timeout)
chan.exec_command(cmd)

stdout = ''
stderr = ''
stdout = b''
stderr = b''
while True:
if chan.recv_ready():
stdout += chan.recv(4096)
Expand All @@ -60,11 +78,8 @@ def ssh_cmd(ssh_client, cmd, timeout=2):
return stdout, stderr


def connect_to_ec2():
return boto.ec2.connect_to_region(
tornado.options.options.region,
aws_access_key_id=tornado.options.options.access_key,
aws_secret_access_key=tornado.options.options.secret_key)
def get_session():
return boto3.session.Session(region_name=tornado.options.options.region)


def _bootstrap(addr):
Expand All @@ -79,83 +94,75 @@ def _bootstrap(addr):
'mkdir -p go/src/github.com/nsqio',
'cd go/src/github.com/nsqio && git clone https://github.com/nsqio/nsq',
'cd go/src/github.com/nsqio/nsq && git checkout %s' % commit,
'sudo -S curl -s -o /usr/local/bin/gpm \
https://raw.githubusercontent.com/pote/gpm/v1.2.3/bin/gpm',
'sudo -S chmod +x /usr/local/bin/gpm',
'cd go/src/github.com/nsqio/nsq && \
GOPATH=/home/ubuntu/go PATH=$PATH:/usr/local/go/bin gpm install',
'cd go/src/github.com/nsqio/nsq/apps/nsqd && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_writer && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_reader && \
GOPATH=/home/ubuntu/go /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/apps/nsqd && GO111MODULE=on /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_writer && GO111MODULE=on /usr/local/go/bin/go build',
'cd go/src/github.com/nsqio/nsq/bench/bench_reader && GO111MODULE=on /usr/local/go/bin/go build',
'sudo -S mkdir -p /mnt/nsq',
'sudo -S chmod 777 /mnt/nsq']:
ssh_cmd(ssh_client, cmd, timeout=10)


def bootstrap():
conn = connect_to_ec2()
session = get_session()

ec2 = session.resource('ec2')

total_count = tornado.options.options.nsqd_count + tornado.options.options.worker_count
logging.info('launching %d instances', total_count)
run = conn.run_instances(
tornado.options.options.ami,
min_count=total_count,
max_count=total_count,
key_name=tornado.options.options.ssh_key_name,
instance_type=tornado.options.options.instance_type,
security_groups=['default'])
instances = ec2.create_instances(
ImageId=tornado.options.options.ami,
MinCount=total_count,
MaxCount=total_count,
KeyName=tornado.options.options.ssh_key_name,
InstanceType=tornado.options.options.instance_type,
SecurityGroups=['default'])

logging.info('waiting for instances to launch...')

while all(i.state != 'running' for i in run.instances):
waiting_for = [i.id for i in run.instances if i.state != 'running']
while any(i.state['Name'] != 'running' for i in instances):
waiting_for = [i.id for i in instances if i.state['Name'] != 'running']
logging.info('... sleeping for 5s (waiting for %s)', ', '.join(waiting_for))
time.sleep(5)
for instance in run.instances:
instance.update()
for instance in instances:
instance.load()

for instance in run.instances:
for instance in instances:
if not instance.tags:
conn.create_tags([instance.id], {'nsq_bench': '1'})

hosts = [(i.id, i.public_dns_name) for i in run.instances]
instance.create_tags(Tags=[{'Key': 'nsq_bench', 'Value': '1'}])

try:
c = 0
for id, addr in hosts:
for i in instances:
c += 1
logging.info('(%d) bootstrapping %s (%s)', c, addr, id)
_bootstrap(addr)
logging.info('(%d) bootstrapping %s (%s)', c, i.public_dns_name, i.id)
_bootstrap(i.public_dns_name)
except Exception:
logging.exception('bootstrap failed')
decomm()


def run():
hosts = _find_hosts()
instances = _find_instances()

logging.info('launching nsqd on %d host(s)', tornado.options.options.nsqd_count)

nsqd_chans = []
nsqd_hosts = hosts[:tornado.options.options.nsqd_count]
for id, addr in nsqd_hosts:
nsqd_hosts = instances[:tornado.options.options.nsqd_count]
for instance in nsqd_hosts:
try:
ssh_client = ssh_connect_with_retries(addr)
ssh_client = ssh_connect_with_retries(instance.public_dns_name)
for cmd in [
'sudo -S pkill -f nsqd',
'sudo -S rm -f /mnt/nsq/*.dat',
'GOMAXPROCS=32 ./go/src/github.com/nsqio/nsq/apps/nsqd/nsqd \
--data-path=/mnt/nsq --mem-queue-size=10000000 --max-rdy-count=%s' % (
tornado.options.options.rdy
)]:
--data-path=/mnt/nsq \
--mem-queue-size=10000000 \
--max-rdy-count=%s' % (tornado.options.options.rdy)]:
nsqd_chans.append((ssh_client, ssh_cmd_async(ssh_client, cmd)))
except Exception:
logging.exception('failed')

nsqd_tcp_addrs = [h[1] for h in nsqd_hosts]
nsqd_tcp_addrs = [i.public_dns_name for i in nsqd_hosts]

dt = datetime.datetime.utcnow()
deadline = dt + datetime.timedelta(seconds=30)
Expand All @@ -166,12 +173,12 @@ def run():

worker_chans = []

producer_hosts = hosts[tornado.options.options.nsqd_count:]
for id, addr in producer_hosts:
producer_hosts = instances[tornado.options.options.nsqd_count:]
for instance in producer_hosts:
for nsqd_tcp_addr in nsqd_tcp_addrs:
topic = hashlib.md5(addr).hexdigest()
topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdigest()
try:
ssh_client = ssh_connect_with_retries(addr)
ssh_client = ssh_connect_with_retries(instance.public_dns_name)
for cmd in [
'GOMAXPROCS=2 \
./go/src/github.com/nsqio/nsq/bench/bench_writer/bench_writer \
Expand All @@ -187,12 +194,12 @@ def run():
tornado.options.options.nsqd_count * tornado.options.options.worker_count,
tornado.options.options.worker_count)

consumer_hosts = hosts[tornado.options.options.nsqd_count:]
for id, addr in consumer_hosts:
consumer_hosts = instances[tornado.options.options.nsqd_count:]
for instance in consumer_hosts:
for nsqd_tcp_addr in nsqd_tcp_addrs:
topic = hashlib.md5(addr).hexdigest()
topic = hashlib.md5(instance.public_dns_name.encode('utf-8')).hexdigest()
try:
ssh_client = ssh_connect_with_retries(addr)
ssh_client = ssh_connect_with_retries(instance.public_dns_name)
for cmd in [
'GOMAXPROCS=8 \
./go/src/github.com/nsqio/nsq/bench/bench_reader/bench_reader \
Expand Down Expand Up @@ -223,7 +230,7 @@ def run():
sys.stdout.flush()
continue
if chan.recv_stderr_ready():
line = chan.recv_stderr(4096)
line = chan.recv_stderr(4096).decode('utf-8')
if 'duration:' in line:
kind = line.split(' ')[0][1:-1]
parts = line.rsplit('duration:')[1].split('-')
Expand Down Expand Up @@ -253,27 +260,18 @@ def run():
chan.close()


def _find_hosts():
conn = connect_to_ec2()
reservations = conn.get_all_instances()
instances = [inst for res in reservations for inst in res.instances]

hosts = []
for instance in instances:
if not instance.tags or instance.state != 'running':
continue
if 'nsq_bench' in instance.tags:
hosts.append((instance.id, instance.public_dns_name))

return hosts
def _find_instances():
session = get_session()
ec2 = session.resource('ec2')
return [i for i in ec2.instances.all() if
i.state['Name'] == 'running' and any(t['Key'] == 'nsq_bench' for t in i.tags)]


def decomm():
conn = connect_to_ec2()
hosts = _find_hosts()
host_ids = [h[0] for h in hosts]
logging.info('terminating instances %s' % ','.join(host_ids))
conn.terminate_instances(host_ids)
instances = _find_instances()
logging.info('terminating instances %s' % ','.join(i.id for i in instances))
for instance in instances:
instance.terminate()


if __name__ == '__main__':
Expand All @@ -283,11 +281,8 @@ def decomm():
help='how many nsqd instances to launch')
tornado.options.define('worker_count', type=int, default=3,
help='how many worker instances to launch')
tornado.options.define('access_key', type=str, default='',
help='AWS account access key')
tornado.options.define('secret_key', type=str, default='',
help='AWS account secret key')
tornado.options.define('ami', type=str, default='ami-48fd2120',
# ubuntu 18.04 HVM instance store us-east-1
tornado.options.define('ami', type=str, default='ami-0938f2289b3fa3f5b',
help='AMI ID')
tornado.options.define('ssh_key_name', type=str, default='default',
help='SSH key name')
Expand All @@ -301,7 +296,7 @@ def decomm():
help='the benchmark mode (pub, pubsub)')
tornado.options.define('commit', type=str, default='master',
help='the git commit')
tornado.options.define('golang_version', type=str, default='1.5.1',
tornado.options.define('golang_version', type=str, default='1.14.3',
help='the go version')
tornado.options.parse_command_line()

Expand Down

0 comments on commit 7c4e954

Please sign in to comment.