Skip to content

Commit

Permalink
Use the REST API to pull down all projects due to "list all projects …
Browse files Browse the repository at this point in the history
…v1" deprecation

Switches and uses REST API instead of v1 list all projects
Uses lazy loading for additional required API calls
  • Loading branch information
nathan-roys authored Jul 28, 2023
2 parents 6a2daec + e00ca63 commit c0a783e
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 672 deletions.
606 changes: 0 additions & 606 deletions poetry.lock

This file was deleted.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pysnyk"
version = "0.9.5"
version = "0.9.7"
description = "A Python client for the Snyk API"
authors = [
"Gareth Rushgrove <[email protected]>",
Expand Down
18 changes: 15 additions & 3 deletions snyk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

class SnykClient(object):
API_URL = "https://api.snyk.io/v1"
REST_API_URL = "https://api.snyk.io/rest"
USER_AGENT = "pysnyk/%s" % __version__

def __init__(
Expand Down Expand Up @@ -117,7 +118,11 @@ def put(self, path: str, body: Any, headers: dict = {}) -> requests.Response:
return resp

def get(
self, path: str, params: dict = None, version: str = None
self,
path: str,
params: dict = None,
version: str = None,
exclude_version: bool = False,
) -> requests.Response:
"""
Rest (formerly v3) Compatible Snyk Client, assumes the presence of Version, either set in the client
Expand All @@ -131,8 +136,15 @@ def get(
"""

path = cleanup_path(path)

url = f"{self.api_url}/{path}"
if version:
# When calling a "next page" link, it fails if a version parameter is appended on to the URL - this is a
# workaround to prevent that from happening...
if exclude_version:
url = f"{self.REST_API_URL}/{path}"
else:
url = f"{self.REST_API_URL}/{path}?version={version}"
else:
url = f"{self.api_url}/{path}"

if params or self.version:

Expand Down
85 changes: 71 additions & 14 deletions snyk/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,31 +138,88 @@ def delete(self, key, value) -> bool:


class ProjectManager(Manager):
def _query(self, tags: List[Dict[str, str]] = []):
def _rest_to_v1_response_format(self, project):
attributes = project.get("attributes", {})
settings = attributes.get("settings", {})
recurring_tests = settings.get("recurring_tests", {})
issue_counts = project.get("meta", {}).get("latest_issue_counts")

return {
"name": attributes.get("name"),
"id": project.get("id"),
"created": attributes.get("created"),
"origin": attributes.get("origin"),
"type": attributes.get("type"),
"readOnly": attributes.get("read_only"),
"testFrequency": recurring_tests.get("frequency"),
"isMonitored": True
if project.get("meta", {}).get("cli_monitored_at")
else False,
"issueCountsBySeverity": {
"low": issue_counts.get("low"),
"medium": issue_counts.get("medium"),
"high": issue_counts.get("high"),
"critical": issue_counts.get("critical"),
},
"targetReference": attributes.get("target_reference"),
"_tags": attributes.get("tags", []),
"importingUserId": project.get("relationships", {})
.get("importer", {})
.get("data", {})
.get("id"),
"owningUserId": project.get("relationships", {})
.get("owner", {})
.get("data", {})
.get("id"),
}

def _query(self, tags: List[Dict[str, str]] = [], next_url: str = None):
projects = []
params = {}
if self.instance:
path = "org/%s/projects" % self.instance.id
path = "/orgs/%s/projects" % self.instance.id if not next_url else next_url

# Append to params if we've got tags
if tags:
for tag in tags:
if "key" not in tag or "value" not in tag or len(tag.keys()) != 2:
raise SnykError("Each tag must contain only a key and a value")
data = {"filters": {"tags": {"includes": tags}}}
resp = self.client.post(path, data)
else:
resp = self.client.get(path)
if "projects" in resp.json():
for project_data in resp.json()["projects"]:
data = [f'{d["key"]}:{d["value"]}' for d in tags]
params["tags"] = ",".join(data)

# Append the issue count param to the params if this is the first page
if not next_url:
params["meta.latest_issue_counts"] = "true"

# And lastly, make the API call
resp = self.client.get(
path,
version="2023-06-19",
params=params,
exclude_version=True if next_url else False,
)

if "data" in resp.json():
# Process projects in current response
for response_data in resp.json()["data"]:
project_data = self._rest_to_v1_response_format(response_data)
project_data["organization"] = self.instance.to_dict()
# We move tags to _tags as a cache, to avoid the need for additional requests
# when working with tags. We want tags to be the manager
try:
project_data["_tags"] = project_data["tags"]
del project_data["tags"]
project_data["attributes"]["_tags"] = project_data[
"attributes"
]["tags"]
del project_data["attributes"]["tags"]
except KeyError:
pass
if project_data["totalDependencies"] is None:
if not project_data.get("totalDependencies"):
project_data["totalDependencies"] = 0
projects.append(self.klass.from_dict(project_data))

# If we have another page, then process this page too
if "next" in resp.json().get("links", {}):
next_url = resp.json().get("links", {})["next"]
projects.extend(self._query(tags, next_url))

for x in projects:
x.organization = self.instance
else:
Expand Down Expand Up @@ -192,7 +249,7 @@ def get(self, id: str):
del project_data["tags"]
except KeyError:
pass
if project_data["totalDependencies"] is None:
if project_data.get("totalDependencies") is None:
project_data["totalDependencies"] = 0
project_klass = self.klass.from_dict(project_data)
project_klass.organization = self.instance
Expand Down
95 changes: 84 additions & 11 deletions snyk/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -561,6 +562,14 @@ class Dependency(DataClassJSONMixin):
dependenciesWithIssues: Optional[List[Any]] = field(default_factory=list)


@dataclass
class User(DataClassJSONMixin):
id: str
name: str
username: str
email: str


@dataclass
class Project(DataClassJSONMixin):
name: str
Expand All @@ -571,24 +580,15 @@ class Project(DataClassJSONMixin):
type: str
readOnly: bool
testFrequency: str
totalDependencies: int
lastTestedDate: str
browseUrl: str
isMonitored: bool
issueCountsBySeverity: IssueCounts
imageTag: Optional[str] = None
imageId: Optional[str] = None
imageBaseImage: Optional[str] = None
imagePlatform: Optional[str] = None
imageCluster: Optional[str] = None
importingUserId: Optional[str] = None
owningUserId: Optional[str] = None
hostname: Optional[str] = None
remoteRepoUrl: Optional[str] = None
branch: Optional[str] = None
attributes: Optional[Dict[str, List[str]]] = None
_tags: Optional[List[Any]] = field(default_factory=list)
remediation: Optional[Dict[Any, Any]] = field(default_factory=dict)
owner: Optional[Dict[Any, Any]] = field(default_factory=dict)
importingUser: Optional[Dict[Any, Any]] = field(default_factory=dict)

def delete(self) -> bool:
path = "org/%s/project/%s" % (self.organization.id, self.id)
Expand Down Expand Up @@ -624,6 +624,79 @@ def move(self, new_org_id: str) -> bool:

return bool(self.organization.client.put(path, payload))

def _get_project_snapshot(self):
"""
Gets the latest project snapshot
"""
project_snapshot_result = self.organization.client.post(
f"org/{self.organization.id}/project/{self.id}/history?perPage=1&page=1",
{},
)
return project_snapshot_result.json().get("snapshots", [{}])[0]

def __getattr__(self, item):
"""
Will handle lazy loading of attributes which require further API calls or are computationally expensive. This
avoids having to load them when we retrieve the full list from the API.
"""
# These attributes require us to get the latest snapshot
if item in [
"totalDependencies",
"lastTestedDate",
"imageId",
"imageTag",
"imageBaseImage",
"imagePlatform",
]:
snapshot = self._get_project_snapshot()
if item == "totalDependencies":
return snapshot.get("totalDependencies", 0)
elif item == "lastTestedDate":
return snapshot.get("created")
elif item == "imageId":
return snapshot.get("imageId")
elif item == "imageTag":
return snapshot.get("imageTag")
elif item == "imageBaseImage":
return snapshot.get("baseImageName")
elif item == "imagePlatform":
return snapshot.get("imagePlatform")
# These attributes require us to call the user API to get a users details
elif item in ["importingUser", "owner"]:
if item == "importingUser":
selected_user = self.importingUserId
else:
selected_user = self.owningUserId
user_response = self.organization.client.get(
f"orgs/{self.organization.id}/users/{selected_user}",
version="2023-05-29~beta",
)
user = user_response.json()
user_data = user.get("data", {})
user_attributes = user_data.get("attributes", {})
return User(
id=self.importingUserId,
name=user_attributes.get("name"),
username=user_attributes.get("username"),
email=user_attributes.get("email"),
)
elif item == "browseUrl":
# Ensure that our browse URL matches the tenant the user is making a request to
tenant_matches = match = re.match(
r"^https://api\.(.*?)\.snyk\.io", self.organization.client.api_url
)
if tenant_matches:
# If a tenant is found, insert it into the URL
url_prefix = f"https://app.{match.group(1)}.snyk.io"
else:
# If no tenant is found, use a default URL
url_prefix = "https://app.snyk.io"
return f"{url_prefix}/org/{self.organization.slug}/project/{self.id}"
else:
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{item}'"
)

@property
def settings(self) -> Manager:
return Manager.factory("Setting", self.organization.client, self)
Expand Down
12 changes: 6 additions & 6 deletions snyk/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,29 @@ def test_organization_load_group(self, requests_mock, client, organizations):

def test_empty_projects(self, requests_mock, client, organizations):
requests_mock.get("https://api.snyk.io/v1/orgs", json=organizations)
matcher = re.compile("projects$")
matcher = re.compile("projects.*$")
requests_mock.get(matcher, json={})
assert [] == client.projects.all()

def test_projects(self, requests_mock, client, organizations, projects):
requests_mock.get("https://api.snyk.io/v1/orgs", json=organizations)
matcher = re.compile("projects$")
matcher = re.compile("projects.*$")
requests_mock.get(matcher, json=projects)
assert len(client.projects.all()) == 2
assert all(type(x) is Project for x in client.projects.all())

def test_project(self, requests_mock, client, organizations, projects):
requests_mock.get("https://api.snyk.io/v1/orgs", json=organizations)
matcher = re.compile("projects$")
matcher = re.compile("projects.*$")
requests_mock.get(matcher, json=projects)
assert (
"atokeneduser/goof"
== client.projects.get("6d5813be-7e6d-4ab8-80c2-1e3e2a454545").name
"testing-new-name"
== client.projects.get("f9fec29a-d288-40d9-a019-cedf825e6efb").name
)

def test_non_existent_project(self, requests_mock, client, organizations, projects):
requests_mock.get("https://api.snyk.io/v1/orgs", json=organizations)
matcher = re.compile("projects$")
matcher = re.compile("projects.*$")
requests_mock.get(matcher, json=projects)
with pytest.raises(SnykNotFoundError):
client.projects.get("not-present")
Expand Down
Loading

0 comments on commit c0a783e

Please sign in to comment.