forked from Linuxfabrik/lib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrocket.py
82 lines (66 loc) · 2.37 KB
/
rocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""This library collects some Rocket.Chat related functions that are
needed by more than one Rocket.Chat plugin."""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2024031501'
from . import url
def get_token(rc_url, user, password, insecure=False, no_proxy=False, timeout=3):
"""Gets an API token from Rocket.Chat, using your credentials.
Equivalent to:
$ curl -X "POST" \\
$ -d "user=admin&password=mypassword" \\
$ http://localhost:3000/api/v1/login
"""
if not rc_url.endswith('/login'):
rc_url += '/login'
data = {
'user': user,
'password': password,
}
success, result = url.fetch_json(
rc_url,
data=data,
insecure=insecure,
no_proxy=no_proxy,
timeout=timeout,
)
if not success:
return (success, result)
if not result:
return (False, 'There was no result from {}.'.format(rc_url))
if not 'authToken' in result['data']:
return (False, 'Something went wrong, maybe user is unauthorized.')
return (True, result['data']['authToken'] + ':' + result['data']['userId'])
def get_stats(rc_url, auth_token, user_id, insecure=False, no_proxy=False, timeout=3):
"""Calls api/v1/statistics. You need to get a token using
`get_token()` first. Equivalent to:
$ https://rocket.chat/docs/developer-guides/rest-api/miscellaneous/statistics/
$ curl -H "X-Auth-Token: 8h2mKAwxB3AQrFSjLVKMooJyjdCFaA7W45sWlHP8IzO" \\
$ -H "X-User-Id: ew28DpvKw3R" \\
$ http://localhost:3000/api/v1/statistics
"""
if not rc_url.endswith('/statistics'):
rc_url += '/statistics'
header = {
'X-Auth-Token': auth_token,
'X-User-Id': user_id,
}
success, result = url.fetch_json(
rc_url,
header=header,
insecure=insecure,
no_proxy=no_proxy,
timeout=timeout,
)
if not success:
return (success, result)
if not result:
return (False, 'There was no result from {}.'.format(rc_url))
return (True, result)