Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSH tunneling #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
enable ssh tunneling
aroder committed Jun 18, 2020
commit 9969d22b59e9517047bb697f8468d1cd9265f51a
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -98,3 +98,6 @@ ENV/
*.txt

/venv--*
*config.json
*catalog.json
.vscode/
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,8 @@
install_requires=[
'boto3>=1.9.205,<1.10.0',
'singer-target-postgres==0.2.4',
'urllib3==1.25.9'
'urllib3==1.25.9',
'sshtunnel==0.1.5'
],
setup_requires=[
"pytest-runner"
107 changes: 72 additions & 35 deletions target_redshift/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import psycopg2
import singer
from singer import utils
import sshtunnel
from target_postgres import target_tools
from target_postgres.postgres import MillisLoggingConnection

@@ -9,47 +10,83 @@

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = [
'redshift_host',
'redshift_database',
'redshift_username',
'redshift_password',
'target_s3'
]
def required_config_keys(use_ssh_tunnel=False):
keys = [
'redshift_host',
'redshift_database',
'redshift_username',
'redshift_password',
'target_s3'
]
if use_ssh_tunnel:
keys += [
'ssh_jump_server',
'ssh_jump_server_port',
'ssh_private_key_path',
'ssh_username'
]
return keys



def main(config, input_stream=None):
with psycopg2.connect(
connection_factory=MillisLoggingConnection,
host=config.get('redshift_host'),
port=config.get('redshift_port', 5439),
dbname=config.get('redshift_database'),
user=config.get('redshift_username'),
password=config.get('redshift_password')
) as connection:
s3_config = config.get('target_s3')
s3 = S3(s3_config.get('aws_access_key_id'),
s3_config.get('aws_secret_access_key'),
s3_config.get('bucket'),
s3_config.get('key_prefix'),
aws_session_token=s3_config.get('aws_session_token'))

redshift_target = RedshiftTarget(
connection,
s3,
redshift_schema=config.get('redshift_schema', 'public'),
logging_level=config.get('logging_level'),
default_column_length=config.get('default_column_length', 1000),
persist_empty_tables=config.get('persist_empty_tables')
)

if input_stream:
target_tools.stream_to_target(input_stream, redshift_target, config=config)
tunnel = None
try:
LOGGER.info(config)
if bool(config.get('use_ssh_tunnel')) == True:
LOGGER.info(f"use_ssh_tunnel is set to true; connecting to {config['redshift_host']}:{config['redshift_port']} via {config['ssh_jump_server']}:{config['ssh_jump_server_port']}")
tunnel = sshtunnel.open_tunnel(
(config['ssh_jump_server'], int(config['ssh_jump_server_port'])),
ssh_username=config['ssh_username'],
ssh_pkey=config['ssh_private_key_path'],
ssh_private_key_password=config['ssh_private_key_password'] if 'ssh_private_key_password' in config else None,
remote_bind_address=(config['redshift_host'], int(config['redshift_port']))
)
tunnel.start()
config['redshift_host'] = '127.0.0.1' # rewrite the config to go through the tunnel
config['redshift_port'] = tunnel.local_bind_port
else:
target_tools.main(redshift_target)
LOGGER.debug(f"use_ssh_tunnel is not set or is false; connecting directly to {config['redshift_host']}:{config['redshift_port']}")

with psycopg2.connect(
connection_factory=MillisLoggingConnection,
host=config.get('redshift_host'),
port=config.get('redshift_port', 5439),
dbname=config.get('redshift_database'),
user=config.get('redshift_username'),
password=config.get('redshift_password')
) as connection:
s3_config = config.get('target_s3')
s3 = S3(s3_config.get('aws_access_key_id'),
s3_config.get('aws_secret_access_key'),
s3_config.get('bucket'),
s3_config.get('key_prefix'),
aws_session_token=s3_config.get('aws_session_token'))

redshift_target = RedshiftTarget(
connection,
s3,
redshift_schema=config.get('redshift_schema', 'public'),
logging_level=config.get('logging_level'),
default_column_length=config.get('default_column_length', 1000),
persist_empty_tables=config.get('persist_empty_tables')
)

if input_stream:
target_tools.stream_to_target(input_stream, redshift_target, config=config)
else:
target_tools.main(redshift_target)

finally:
if tunnel is not None:
tunnel.stop()



def cli():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
args = utils.parse_args(required_config_keys())
if bool(args.config.get('use_ssh_tunnel')) == True:
args = utils.parse_args(required_config_keys(True))


main(args.config)