-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathtableau_repository.py
executable file
·270 lines (243 loc) · 9.76 KB
/
tableau_repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# -*- coding: utf-8 -*-
from .tableau_exceptions import *
import psycopg2
import psycopg2.extensions
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
from typing import Union, Any, Optional, List, Dict, Tuple
class TableauRepository:
def __init__(self, tableau_server_url: str, repository_password: str, repository_username: str = 'readonly'):
if repository_username not in ['tableau', 'readonly', 'tblwgadmin']:
raise InvalidOptionException('Must use one of the three valid usernames')
# Remove the http:// or https:// to log in to the repository. (Do we need things if this is SSL?)
colon_slash_slash = tableau_server_url.find('://')
if colon_slash_slash != -1:
self.repository_server = tableau_server_url[colon_slash_slash+2:]
else:
self.repository_server = tableau_server_url
self.repository_port = 8060
self.repository_db = 'workgroup'
# user 'tableau' does not have enough rights
self.repository_user = repository_username
self.repository_pw = repository_password
self.db_conn = psycopg2.connect(host=self.repository_server, database=self.repository_db,
user=self.repository_user, password=self.repository_pw,
port=self.repository_port)
self.db_conn.set_session(autocommit=True)
def __del__(self):
self.db_conn.close()
# Base method for querying
def query(self, sql: str, sql_parameter_list: Optional[List] = None):
cur = self.db_conn.cursor()
if sql_parameter_list is not None:
cur.execute(sql, sql_parameter_list)
else:
cur.execute(sql)
return cur
def query_sessions(self, username: Optional[str] = None):
# Trusted tickets sessions do not have anything in the 'data' column
# The auth token is contained within the shared_wg_write column, stored as JSON
sessions_sql = """
SELECT
sessions.session_id,
sessions.data,
sessions.updated_at,
sessions.user_id,
sessions.shared_wg_write,
sessions.shared_vizql_write,
system_users.name AS user_name,
users.system_user_id
FROM sessions
JOIN users ON sessions.user_id = users.id
JOIN system_users ON users.system_user_id = system_users.id
"""
if username is not None:
sessions_sql += "WHERE system_users.name = %s\n"
sessions_sql += "ORDER BY sessions.updated_at DESC;"
if username is not None:
cur = self.query(sessions_sql, [username, ])
else:
cur = self.query(sessions_sql)
return cur
def query_subscriptions(self, schedule_name: Optional[str] = None, views_only: bool = True):
subscriptions_sql = """
SELECT
s.id,
s.subject,
s.user_name,
s.site_name,
COALESCE(cv.repository_url, s.view_url) as view_url,
sch.name,
su.email
FROM _subscriptions s
LEFT JOIN _customized_views cv ON s.customized_view_id = cv.id
JOIN _schedules sch ON sch.name = s.schedule_name
JOIN system_users su ON su.name = s.user_name
"""
if schedule_name is not None:
subscriptions_sql += 'WHERE sch.name = %s\n'
if views_only is True:
subscriptions_sql += 'AND s.view_url IS NOT NULL -- Export command in tabcmd requires a View not a Workbook'
else:
if views_only is True:
subscriptions_sql += 'WHERE s.view_url IS NOT NULL -- Export command in tabcmd requires a View not a Workbook'
if schedule_name is not None:
cur = self.query(subscriptions_sql, [schedule_name, ])
else:
cur = self.query(subscriptions_sql)
return cur
# Set extract refresh schedules
def query_extract_schedules(self, schedule_name: Optional[str] = None):
schedules_sql = """
SELECT *
FROM _schedules
WHERE scheduled_action_type = 'Refresh Extracts'
AND hidden = false
"""
if schedule_name is not None:
schedules_sql += 'AND name = %s\n'
cur = self.query(schedules_sql, [schedule_name, ])
else:
cur = self.query(schedules_sql)
return cur
def get_extract_schedule_id_by_name(self, schedule_name: str):
cur = self.query_extract_schedules(schedule_name=schedule_name)
if cur.rowcount == 0:
raise NoMatchFoundException('No schedule found with name "{}"'.format(schedule_name))
sched_id = None
# Should only be one row
for row in cur:
sched_id = row[0]
return sched_id
def query_sites(self, site_content_url: Optional[str] = None, site_pretty_name: Optional[str] = None):
if site_content_url is None and site_pretty_name is None:
raise InvalidOptionException('Must pass one of either the site_content_url or site_pretty_name')
sites_sql = """
SELECT *
FROM _sites
"""
if site_content_url is not None and site_pretty_name is None:
sites_sql += 'WHERE url_namespace = %s\n'
cur = self.query(sites_sql, [site_content_url, ])
elif site_content_url is None and site_pretty_name is not None:
sites_sql += 'WHERE name = %s\n'
cur = self.query(sites_sql, [site_pretty_name, ])
else:
sites_sql += 'WHERE url_namesspace = %s AND name = %s\n'
cur = self.query(sites_sql, [site_content_url, site_pretty_name])
return cur
def get_site_id_by_site_content_url(self, site_content_url: str):
cur = self.query_sites(site_content_url=site_content_url)
if cur.rowcount == 0:
raise NoMatchFoundException('No site found with content url "{}"'.format(site_content_url))
site_id = None
# Should only be one row
for row in cur:
site_id = row[0]
return site_id
def get_site_id_by_site_pretty_name(self, site_pretty_name: str):
cur = self.query_sites(site_pretty_name=site_pretty_name)
if cur.rowcount == 0:
raise NoMatchFoundException('No site found with pretty name "{}"'.format(site_pretty_name))
site_id = None
# Should only be one row
for row in cur:
site_id = row[0]
return site_id
def query_project_id_on_site_by_name(self, project_name: str, site_id: str):
project_sql = """
SELECT *
FROM _projects
WHERE project_name = %s
AND site_id = %s
"""
cur = self.query(project_sql, [project_name, site_id])
if cur.rowcount == 0:
raise NoMatchFoundException('No project named {} found on the site'.format(project_name))
project_id = None
for row in cur:
project_id = row[0]
return project_id
def query_datasource_id_on_site_in_project(self, datasource_name: str, site_id: str, project_id: str):
datasource_query = """
SELECT id
FROM _datasources
WHERE name = %s
AND site_id = %s
AND project_id = %s
"""
cur = self.query(datasource_query, [datasource_name, site_id, project_id])
if cur.rowcount == 0:
raise NoMatchFoundException('No data source found with name "{}"'.format(datasource_name))
datasource_id = None
for row in cur:
datasource_id = row[0]
return datasource_id
def query_workbook_id_on_site_in_project(self, workbook_name: str, site_id: str, project_id: str):
workbook_query = """
SELECT id
FROM _workbooks
WHERE name = %s
AND site_id = %s
AND project_id = %s
"""
cur = self.query(workbook_query, [workbook_name, site_id, project_id])
if cur.rowcount == 0:
raise NoMatchFoundException('No workbook found with name "{}"'.format(workbook_name))
workbook_id = None
for row in cur:
workbook_id = row[0]
return workbook_id
def query_workbook_id_from_luid(self, workbook_luid: str):
workbook_query = """
SELECT id
FROM workbooks
WHERE luid = %s
"""
cur = self.query(workbook_query, [workbook_luid, ])
if cur.rowcount == 0:
raise NoMatchFoundException('No workbook found with luid "{}"'.format(workbook_luid))
workbook_id = None
for row in cur:
workbook_id = row[0]
return workbook_id
def query_site_id_from_workbook_luid(self, workbook_luid: str):
workbook_query = """
SELECT site_id
FROM workbooks
WHERE luid = %s
"""
cur = self.query(workbook_query, [workbook_luid, ])
if cur.rowcount == 0:
raise NoMatchFoundException('No workbook found with luid "{}"'.format(workbook_luid))
workbook_id = None
for row in cur:
workbook_id = row[0]
return workbook_id
def query_datasource_id_from_luid(self, datasource_luid: str):
datasource_query = """
SELECT id
FROM datasources
WHERE luid = %s
"""
cur = self.query(datasource_query, [datasource_luid, ])
if cur.rowcount == 0:
raise NoMatchFoundException('No data source found with luid "{}"'.format(datasource_luid))
datasource_id = None
for row in cur:
datasource_id = row[0]
return datasource_id
def query_site_id_from_datasource_luid(self, datasource_luid: str):
datasource_query = """
SELECT site_id
FROM datasources
WHERE luid = %s
"""
cur = self.query(datasource_query, [datasource_luid, ])
if cur.rowcount == 0:
raise NoMatchFoundException('No data source found with luid "{}"'.format(datasource_luid))
datasource_id = None
for row in cur:
datasource_id = row[0]
return datasource_id
# Need to add in some classes to find Custom View LUIDs