Skip to content

Commit

Permalink
Keep spark(-tools) and mesos(-tools) in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
hannes-ucsc committed Feb 4, 2016
1 parent 3b4a1eb commit 490c525
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 31 deletions.
28 changes: 16 additions & 12 deletions spark-tools/src/cgcloud/spark_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import logging
import re
import os
import errno
import fcntl
from grp import getgrnam
from pwd import getpwnam
import itertools
import logging
import os
import re
import socket
import stat
from urllib2 import urlopen
from subprocess import check_call, check_output, CalledProcessError, STDOUT
import time
import itertools
from grp import getgrnam
from pwd import getpwnam
from subprocess import check_call, check_output, CalledProcessError, STDOUT
from urllib2 import urlopen

import boto.ec2
from bd2k.util import memoize, less_strict_bool
from bd2k.util.files import mkdir_p
from boto.ec2.instance import Instance

from cgcloud.lib.util import volume_label_hash
from cgcloud.lib.ec2 import EC2VolumeHelper
from cgcloud.lib.util import volume_label_hash

initctl = '/sbin/initctl'

Expand Down Expand Up @@ -213,8 +213,8 @@ def master_id( self ):
master_id = self.instance_tag( 'leader_instance_id' )
if not master_id:
raise RuntimeError( "Instance not tagged with master's instance ID" )
log.info( "Master's instance ID is '%s'", master_id )
return master_id
log.info( "Master's instance ID is '%s'", master_id )
return master_id

@property
@memoize
Expand Down Expand Up @@ -365,7 +365,7 @@ def __wait_for_master_ssh( self ):

def _copy_dir_from_master( self, path ):
log.info( "Copying %s from master" % path )
if not path.endswith('/'):
if not path.endswith( '/' ):
path += '/'
for tries in range( 5 ):
try:
Expand Down Expand Up @@ -404,6 +404,10 @@ def __create_lazy_dirs( self ):
log.info( "Bind-mounting directory structure" )
for (parent, name, persistent) in self.lazy_dirs:
assert parent[ 0 ] == os.path.sep
logical_path = os.path.join( parent, name )
if persistent is None:
tag = 'persist' + logical_path.replace( os.path.sep, '_' )
persistent = less_strict_bool( self.instance_tag( tag ) )
location = self.persistent_dir if persistent else self.ephemeral_dir
physical_path = os.path.join( location, parent[ 1: ], name )
mkdir_p( physical_path )
Expand Down
43 changes: 24 additions & 19 deletions spark/src/cgcloud/spark/spark_box.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,42 @@
import logging
from collections import namedtuple
import json
import logging
import re
from StringIO import StringIO
from collections import namedtuple

from bd2k.util.iterables import concat
from bd2k.util.strings import interpolate as fmt
from fabric.context_managers import settings
from fabric.operations import run, put, os

from bd2k.util.strings import interpolate as fmt

from cgcloud.core.box import fabric_task
from cgcloud.core.cluster import ClusterBox, ClusterLeader, ClusterWorker
from cgcloud.core.ubuntu_box import Python27UpdateUbuntuBox
from cgcloud.fabric.operations import sudo, remote_open, pip, sudov
from cgcloud.core.common_iam_policies import ec2_read_only_policy
from cgcloud.core.generic_boxes import GenericUbuntuTrustyBox
from cgcloud.core.ubuntu_box import Python27UpdateUbuntuBox
from cgcloud.fabric.operations import sudo, remote_open, pip, sudov
from cgcloud.lib.util import abreviated_snake_case_class_name, heredoc

log = logging.getLogger( __name__ )

hadoop_version = '2.6.0'

spark_version = '1.2.1'

user = 'sparkbox'

install_dir = '/opt/sparkbox'

var_dir = '/var/lib/sparkbox'

log_dir = "/var/log/sparkbox"

ephemeral_dir = '/mnt/ephemeral'

persistent_dir = '/mnt/persistent'

var_dir = '/var/lib/sparkbox'

hdfs_replication = 1

hadoop_version = '2.6.0'

spark_version = '1.2.1'

Service = namedtuple( 'Service', [
'init_name',
'description',
Expand All @@ -64,17 +63,17 @@ def spark_service( name, script_suffix=None ):
stop_script=fmt( script, action='stop' ) )


hadoop_services = {
'master': [ hdfs_service( 'namenode' ), hdfs_service( 'secondarynamenode' ) ],
'slave': [ hdfs_service( 'datanode' ) ] }
hadoop_services = dict(
master=[ hdfs_service( 'namenode' ), hdfs_service( 'secondarynamenode' ) ],
slave=[ hdfs_service( 'datanode' ) ] )

spark_services = {
'master': [ spark_service( 'master' ) ],
spark_services = dict(
master=[ spark_service( 'master' ) ],
# FIXME: The start-slaves.sh script actually does ssh localhost on a slave so I am not sure
# this is the right thing to do. OTOH, it is the only script starts Tachyon and sets up the
# spark:// URL pointing at the master. We would need to duplicate some of its functionality
# if we wanted to eliminate the ssh call.
'slave': [ spark_service( 'slave', 'slaves' ) ] }
slave=[ spark_service( 'slave', 'slaves' ) ] )


class SparkBox( GenericUbuntuTrustyBox, Python27UpdateUbuntuBox, ClusterBox ):
Expand Down Expand Up @@ -341,10 +340,11 @@ def __install_tools( self ):
"""
tools_dir = install_dir + '/tools'
admin = self.admin_account( )
sudo( fmt( 'mkdir -p {tools_dir} {persistent_dir} {ephemeral_dir}' ) )
sudo( fmt( 'mkdir -p {tools_dir}' ) )
sudo( fmt( 'chown {admin}:{admin} {tools_dir}' ) )
run( fmt( 'virtualenv --no-pip {tools_dir}' ) )
run( fmt( '{tools_dir}/bin/easy_install pip==1.5.2' ) )

with settings( forward_agent=True ):
with self._project_artifacts( 'spark-tools' ) as artifacts:
pip( use_sudo=True,
Expand Down Expand Up @@ -409,8 +409,13 @@ def _lazy_mkdir( self, parent, name, persistent=False ):
/mnt/persistent/foo/dir is created and bind-mounted into /foo/dir when the box starts.
Likewise, __lazy_mkdir( '/foo', 'dir', False) creates /foo/dir now and ensures that
/mnt/ephemeral/foo/dir is created and bind-mounted into /foo/dir when the box starts.
Note that at start-up time, /mnt/persistent may be reassigned to /mnt/ephemeral if no
EBS volume is mounted at /mnt/persistent.
_lazy_mkdir( '/foo', 'dir', None ) will look up an instance tag named 'persist_foo_dir'
when the box starts and then behave like _lazy_mkdir( '/foo', 'dir', True ) if that tag's
value is 'True', or _lazy_mkdir( '/foo', 'dir', False ) if that tag's value is False.
"""
assert self.lazy_dirs is not None
assert '/' not in name
Expand Down

0 comments on commit 490c525

Please sign in to comment.