Skip to content

Commit

Permalink
working on #27
Browse files Browse the repository at this point in the history
  • Loading branch information
timkpaine committed Nov 13, 2018
1 parent 9ad25f7 commit 9b6d0da
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 97 deletions.
3 changes: 0 additions & 3 deletions paperboy/config/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
from .storage import SQLAStorageConfig
from .output import LocalOutputConfig

# dummy
from ..scheduler import DummyScheduler

# no auth
from ..middleware import NoUserMiddleware, NoAuthRequiredMiddleware

Expand Down
4 changes: 2 additions & 2 deletions paperboy/config/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import os.path
from traitlets import HasTraits, Unicode
from ..scheduler import DummyScheduler
from ..scheduler import AirflowScheduler


class SchedulerConfig(HasTraits):
Expand All @@ -12,4 +12,4 @@ class AirflowSchedulerConfig(SchedulerConfig):
type = 'airflow'
dagbag = Unicode(default_value=os.path.expanduser('~/airflow/dags'))
config = Unicode(default_value=os.path.expanduser('~/airflow/airflow.cfg'))
clazz = DummyScheduler
clazz = AirflowScheduler
2 changes: 1 addition & 1 deletion paperboy/scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .dummy import DummyScheduler
from .airflow import AirflowScheduler
46 changes: 29 additions & 17 deletions paperboy/scheduler/dummy.py → paperboy/scheduler/airflow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# -*- coding: utf-8 -*-
import configparser
import json
import os
import os.path
import jinja2
from sqlalchemy import create_engine
from base64 import b64encode
from random import randint, choice
from random import choice
from sqlalchemy import create_engine
from .base import BaseScheduler, TIMING_MAP

with open(os.path.abspath(os.path.join(os.path.dirname(__file__), 'paperboy.airflow.py')), 'r') as fp:
Expand All @@ -19,10 +18,15 @@
LIMIT 20;
'''

#######################################
# FIXME merge with dummy when #
# airflow has better python3 support #
#######################################


class DummyScheduler(BaseScheduler):
class AirflowScheduler(BaseScheduler):
def __init__(self, *args, **kwargs):
super(DummyScheduler, self).__init__(*args, **kwargs)
super(AirflowScheduler, self).__init__(*args, **kwargs)
cp = configparser.ConfigParser()
cp.read(self.config.scheduler.config)
try:
Expand All @@ -36,22 +40,25 @@ def __init__(self, *args, **kwargs):
def status(self, user, params, session, *args, **kwargs):
type = params.get('type', '')
if not self.sql_conn:
gen = AirflowScheduler.fakequery(self.engine)
if type == 'jobs':
return self.statusgeneralfake()['jobs']
return gen['jobs']
elif type == 'reports':
return self.statusgeneralfake()['reports']
return gen['reports']
else:
return self.statusgeneralfake()
return gen
gen = AirflowScheduler.query(self.engine)
if type == 'jobs':
return self.statusgeneral()['jobs']
return gen['jobs']
elif type == 'reports':
return self.statusgeneral()['reports']
return gen['reports']
else:
return self.statusgeneral()
return gen

def statusgeneral(self):
@staticmethod
def query(engine):
ret = {'jobs': [], 'reports': []}
with self.engine.begin() as conn:
with engine.begin() as conn:
res = conn.execute(QUERY)
for i, item in enumerate(res):
ret['jobs'].append(
Expand Down Expand Up @@ -86,7 +93,8 @@ def statusgeneral(self):
)
return ret

def statusgeneralfake(self):
@staticmethod
def fakequery(engine):
ret = {'jobs': [], 'reports': []}
for i in range(10):
ret['jobs'].append(
Expand All @@ -110,7 +118,8 @@ def statusgeneralfake(self):
)
return ret

def schedule(self, user, notebook, job, reports, *args, **kwargs):
@staticmethod
def schedule_airflow(config, user, notebook, job, reports, *args, **kwargs):
owner = user.name
start_date = job.meta.start_time.strftime('%m/%d/%Y %H:%M:%S')
email = '[email protected]'
Expand All @@ -125,8 +134,11 @@ def schedule(self, user, notebook, job, reports, *args, **kwargs):
email=email,
job_json=job_json,
report_json=report_json,
output_config=json.dumps(self.config.output.to_json())
output_config=json.dumps(config.output.to_json())
)
with open(os.path.join(self.config.scheduler.dagbag, job.id + '.py'), 'w') as fp:
with open(os.path.join(config.scheduler.dagbag, job.id + '.py'), 'w') as fp:
fp.write(tpl)
return tpl

def schedule(self, user, notebook, job, reports, *args, **kwargs):
AirflowScheduler.schedule_airflow(self.config, user, notebook, job, reports, *args, **kwargs)
1 change: 1 addition & 0 deletions paperboy/scheduler/airflow_operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .operators import JobOperator, JobCleanupOperator, ReportOperator, PapermillOperator, NBConvertOperator, ReportPostOperator
Original file line number Diff line number Diff line change
@@ -1,55 +1,6 @@
import json
import os
import os.path
import jinja2
from base64 import b64encode
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from paperboy.utils import name_to_class
from .base import BaseScheduler, TIMING_MAP

with open(os.path.abspath(os.path.join(os.path.dirname(__file__), 'paperboy.airflow.py')), 'r') as fp:
TEMPLATE = fp.read()

#######################################
# FIXME merge with dummy when #
# airflow has better python3 support #
#######################################


class AirflowScheduler(BaseScheduler):
def status(self, user, params, session, *args, **kwargs):
type = params.get('type', '')
if type == 'notebooks':
return []
elif type == 'jobs':
return []
elif type == 'reports':
return []
else:
return {'notebook': [], 'jobs': [], 'reports': []}

def schedule(self, user, notebook, job, reports, *args, **kwargs):
owner = user.name
start_date = job.meta.start_time.strftime('%m/%d/%Y %H:%M:%S')
email = '[email protected]'
job_json = b64encode(json.dumps(job.to_json(True)).encode('utf-8'))
report_json = b64encode(json.dumps([r.to_json() for r in reports]).encode('utf-8'))
interval = TIMING_MAP.get(job.meta.interval)

tpl = jinja2.Template(TEMPLATE).render(
owner=owner,
start_date=start_date,
interval=interval,
email=email,
job_json=job_json,
report_json=report_json,
output_type=self.config.output_type,
output_dir=self.config.output_dir,
)
with open(os.path.join(self.config.airflow_dagbag, job.id + '.py'), 'w') as fp:
fp.write(tpl)
return tpl


class JobOperator(BaseOperator):
Expand Down
6 changes: 3 additions & 3 deletions paperboy/scheduler/paperboy.airflow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
from base64 import b64decode
from paperboy.scheduler._airflow import JobOperator, JobCleanupOperator
from paperboy.scheduler._airflow import ReportOperator, ReportPostOperator
from paperboy.scheduler._airflow import PapermillOperator, NBConvertOperator
from paperboy.scheduler.airflow_operators import JobOperator, JobCleanupOperator
from paperboy.scheduler.airflow_operators import ReportOperator, ReportPostOperator
from paperboy.scheduler.airflow_operators import PapermillOperator, NBConvertOperator
from airflow import DAG
from datetime import timedelta, datetime

Expand Down
16 changes: 16 additions & 0 deletions paperboy/scheduler/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import requests
from .base import BaseScheduler


class RemoteScheduler(BaseScheduler):
def __init__(self, *args, **kwargs):
super(RemoteScheduler, self).__init__(*args, **kwargs)

def status(self, user, params, session, *args, **kwargs):
# FIXME async/celery
return requests.get(self.config.scheduler.status_url, params=params).json()

def schedule(self, user, notebook, job, reports, *args, **kwargs):
# FIXME async/celery
params = {'user': user.to_json(), 'notebook': notebook.to_json(), 'job': job.to_json(), 'reports': [r.to_json() for r in reports]}
return requests.post(self.config.scheduler.schedule_url, params=params).json()
107 changes: 86 additions & 21 deletions paperboy/worker/remote_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,85 @@
import falcon
import json
import logging
import os
from paperboy.server.deploy import FalconDeploy
from paperboy.config.scheduler import AirflowSchedulerConfig
from paperboy.scheduler.airflow import AirflowScheduler
from six.moves.urllib_parse import urljoin
from traitlets.config.application import Application
from traitlets import Int, Unicode


class RemoteAirflow(Application):
name = 'remoteairflow'
description = 'remoteairflow'

############
# Gunicorn #
############
workers = Int(default_value=1, help="Number of gunicorn workers").tag(config=True)
port = Unicode(default_value='8080', help="Port to run on").tag(config=True)
############

#############
# Scheduler #
#############
# FIXME doesnt allow default_value yet
scheduler = AirflowSchedulerConfig()
#############

def start(self):
"""Start the whole thing"""
self.port = os.environ.get('PORT', self.port)
options = {
'bind': '0.0.0.0:{}'.format(self.port),
'workers': self.workers
}

def from_base(url):
return urljoin(self.baseurl, url)

api = falcon.API()

remote = RemoteAirflowResource()
status = RemoteAirflowStatusResource()
api.add_route(from_base('remote'), remote)
api.add_route(from_base('status'), status)

##########
port = 8081
options = {
'bind': '0.0.0.0:{}'.format(port),
'workers': 1
}
logging.debug('Running on port:{}'.format(port))
FalconDeploy(api, options).run()

@classmethod
def launch_instance(cls, argv=None, **kwargs):
"""Launch an instance of a Paperboy Application"""
return super(RemoteAirflow, cls).launch_instance(argv=argv, **kwargs)

aliases = {
'workers': 'RemoteAirflow.workers',
'port': 'RemoteAirflow.port',
'baseurl': 'RemoteAirflow.baseurl',
}


class RemoteAirflowResource(object):
def __init__(self):
pass

def schedule(self, user, notebook, job, reports, *args, **kwargs):
AirflowScheduler.schedule_airflow(self.config, user, notebook, job, reports, *args, **kwargs)

def on_get(self, req, resp):
resp.content_type = 'application/json'
resp.body = json.dumps({'test': 'ok'})

def on_post(self, req, resp):
# TODO pull schedule args out of request
resp.content_type = 'application/json'
resp.body = json.dumps({'test': 'ok'})

Expand All @@ -27,7 +93,26 @@ class RemoteAirflowStatusResource(object):
def __init__(self):
pass

def status(self, user, params, session, *args, **kwargs):
type = params.get('type', '')
if not self.sql_conn:
gen = AirflowScheduler.fakequery(self.engine)
if type == 'jobs':
return gen['jobs']
elif type == 'reports':
return gen['reports']
else:
return gen
gen = AirflowScheduler.query(self.engine)
if type == 'jobs':
return gen['jobs']
elif type == 'reports':
return gen['reports']
else:
return gen

def on_get(self, req, resp):
# TODO pull status args out of request
resp.content_type = 'application/json'
resp.body = json.dumps({'test': 'ok'})

Expand All @@ -36,25 +121,5 @@ def on_post(self, req, resp):
resp.body = json.dumps({'test': 'ok'})


def main(baseurl='/'):
def from_base(url):
return urljoin(baseurl, url)

api = falcon.API()

remote = RemoteAirflowResource()
status = RemoteAirflowStatusResource()
api.add_route(from_base('remote'), remote)
api.add_route(from_base('status'), status)

##########
port = 8081
options = {
'bind': '0.0.0.0:{}'.format(port),
'workers': 1
}
logging.debug('Running on port:{}'.format(port))
FalconDeploy(api, options).run()

if __name__ == '__main__':
main()
RemoteAirflow.launch_instance()
2 changes: 1 addition & 1 deletion tests/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_import5(self):
from paperboy.resources import AutocompleteResource, ConfigResource, HTMLResource, JobResource, JobDetailResource, LoginResource, LogoutResource, NotebookResource, NotebookDetailResource, RegisterResource, ReportResource, ReportDetailResource, StaticResource, StatusResource

def test_import6(self):
from paperboy.scheduler import DummyScheduler
from paperboy.scheduler import AirflowScheduler

def test_import7(self):
from paperboy.server.api import FalconAPI
Expand Down

0 comments on commit 9b6d0da

Please sign in to comment.