-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathjenkins.py
105 lines (84 loc) · 3.23 KB
/
jenkins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
Jenkins methods.
(c) 2018 Balloon Inc. VOF
Wouter Devriendt
"""
import requests
import os
from fuzzywuzzy import process
class Jenkins:
def __init__(self, jenkins_url=None, username=None, auth_token=None, jobs=[]):
self.jenkins_url = jenkins_url
self.username = username
self.auth_token = auth_token
self.jobs = jobs
def getAuth(self):
return (self.username, self.auth_token)
def startJob(self, jobname):
url = self.jenkins_url+"/job/" + jobname + "/build"
return requests.post(url, auth=self.getAuth())
def abortJob(self, jobname):
url = self.jenkins_url+"/job/" + jobname + "/lastBuild/stop"
return requests.post(url, auth=self.getAuth())
def getAllJobs(self):
if not self.jobs:
self.refetchAllJobs()
return self.jobs
def refetchAllJobs(self):
url = self.jenkins_url + "/api/json?tree=jobs[name]"
r = requests.get(url, auth=self.getAuth())
self.jobs = [job["name"] for job in r.json()['jobs']]
def searchJobByName(self, jobname):
res = process.extractOne(jobname, self.getAllJobs())
return res[0] if res[1] > 86 else None
def getJobStatus(self, jobname):
url = self.jenkins_url + "/job/" + jobname + \
"/lastBuild/api/json?tree=result,timestamp,estimatedDuration,number,building,duration"
return requests.get(url, auth=self.getAuth())
def readSettingsFromEnvironment(self):
self.jenkins_url = os.environ["jenkins_url"]
self.username = os.environ["jenkins_username"]
self.auth_token = os.environ["jenkins_auth_token"]
def readSettingsFromFile(self):
import yaml
with open("settings.yaml", 'r') as stream:
try:
settings = yaml.load(stream)
self.jenkins_url = settings["jenkins_url"]
self.username = settings["jenkins_username"]
self.auth_token = settings["jenkins_auth_token"]
except yaml.YAMLError as exc:
print("Could not read settings.yaml")
print(
"Verify if the file exists and has a correct syntax and has all necessary values filled in.")
print("See settings.yaml.sample for what is needed.")
print(exc)
exit()
# --------------- Tests for job interaction and fuzzy search ------------------
def testJobInteraction(jenkins):
print(jenkins.getAllJobs())
r = jenkins.getJobStatus("sleep-test")
print(r.json())
# Start the job
r = jenkins.startJob("sleep-test")
print(r.text)
# Get the job status immediately
r = jenkins.getJobStatus("sleep-test")
print(r.json())
# Get it again ten seconds later
time.sleep(10)
r = jenkins.getJobStatus("sleep-test")
print(r.json())
# Abort the job
r = jenkins.abortJob("sleep-test")
print(r.text)
def testFuzzySearch(jenkins):
print(jenkins.searchJobByName("website build"))
print(jenkins.searchJobByName("sleeping test"))
print(jenkins.searchJobByName("sleep test"))
if __name__ == "__main__":
import time
jenkins = Jenkins()
jenkins.readSettingsFromFile()
testJobInteraction(jenkins)
testFuzzySearch(jenkins)