Skip to content

Commit

Permalink
Move PyPI bridge call to worker from importer.
Browse files Browse the repository at this point in the history
Workers add additional information to vulnerabilities (e.g. by expanding
ranges). With GHSA advisories, they needed this step to be properly
propagated to warehouse.

Also make the source repo processing faster when loading it from
scratch, rather than enumerating through every single commit.

This will help pypa/pip-audit#274.
  • Loading branch information
oliverchang committed May 18, 2022
1 parent 1dc8682 commit 68dd34a
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 101 deletions.
90 changes: 39 additions & 51 deletions docker/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
_EXPORT_WORKERS = 32
_NO_UPDATE_MARKER = 'OSV-NO-UPDATE'

_ECOSYSTEM_PUSH_TOPICS = {
'PyPI': 'projects/oss-vdb/topics/pypi-bridge',
}


def _is_vulnerability_file(source_repo, file_path):
"""Return whether or not the file is a Vulnerability entry."""
Expand Down Expand Up @@ -99,8 +95,7 @@ def _request_analysis_external(self,
source_repo,
original_sha256,
path,
deleted=False,
vulnerability=None):
deleted=False):
"""Request analysis."""
self._publisher.publish(
_TASKS_TOPIC,
Expand All @@ -111,22 +106,6 @@ def _request_analysis_external(self,
original_sha256=original_sha256,
deleted=str(deleted).lower())

if not vulnerability:
return

ecosystems = set()
for affected in vulnerability.affected:
if affected.package.ecosystem in ecosystems:
continue

ecosystems.add(affected.package.ecosystem)
ecosystem_push_topic = _ECOSYSTEM_PUSH_TOPICS.get(
affected.package.ecosystem)
if ecosystem_push_topic:
self._publisher.publish(
ecosystem_push_topic,
data=json.dumps(osv.vulnerability_to_dict(vulnerability)).encode())

def _request_internal_analysis(self, bug):
"""Request internal analysis."""
self._publisher.publish(
Expand Down Expand Up @@ -227,38 +206,50 @@ def _process_updates_git(self, source_repo):
"""Process updates for a git source_repo."""
repo = self.checkout(source_repo)

walker = repo.walk(repo.head.target, pygit2.GIT_SORT_TOPOLOGICAL)
if source_repo.last_synced_hash:
walker.hide(source_repo.last_synced_hash)

# Get list of changed files since last sync.
changed_entries = set()
deleted_entries = set()
for commit in walker:
if commit.author.email == osv.AUTHOR_EMAIL:
continue

if _NO_UPDATE_MARKER in commit.message:
logging.info('Skipping commit %s as no update marker found.', commit.id)
continue
if source_repo.last_synced_hash:
# Syncing from a previous commit.
walker = repo.walk(repo.head.target, pygit2.GIT_SORT_TOPOLOGICAL)
walker.hide(source_repo.last_synced_hash)

logging.info('Processing commit %s from %s', commit.id,
commit.author.email)
for commit in walker:
if commit.author.email == osv.AUTHOR_EMAIL:
continue

for parent in commit.parents:
diff = repo.diff(parent, commit)
for delta in diff.deltas:
if delta.old_file and _is_vulnerability_file(source_repo,
delta.old_file.path):
if delta.status == pygit2.GIT_DELTA_DELETED:
deleted_entries.add(delta.old_file.path)
continue
if _NO_UPDATE_MARKER in commit.message:
logging.info('Skipping commit %s as no update marker found.',
commit.id)
continue

changed_entries.add(delta.old_file.path)
logging.info('Processing commit %s from %s', commit.id,
commit.author.email)

if delta.new_file and _is_vulnerability_file(source_repo,
delta.new_file.path):
changed_entries.add(delta.new_file.path)
for parent in commit.parents:
diff = repo.diff(parent, commit)
for delta in diff.deltas:
if delta.old_file and _is_vulnerability_file(
source_repo, delta.old_file.path):
if delta.status == pygit2.GIT_DELTA_DELETED:
deleted_entries.add(delta.old_file.path)
continue

changed_entries.add(delta.old_file.path)

if delta.new_file and _is_vulnerability_file(
source_repo, delta.new_file.path):
changed_entries.add(delta.new_file.path)
else:
# First sync from scratch.
logging.info('Syncing repo from scratch')
for root, _, filenames in os.walk(osv.repo_path(repo)):
for filename in filenames:
path = os.path.join(root, filename)
rel_path = os.path.relpath(path, osv.repo_path(repo))
if _is_vulnerability_file(source_repo, rel_path):
changed_entries.add(rel_path)

# Create tasks for changed files.
for changed_entry in changed_entries:
Expand All @@ -276,11 +267,8 @@ def _process_updates_git(self, source_repo):

logging.info('Re-analysis triggered for %s', changed_entry)
original_sha256 = osv.sha256(path)
self._request_analysis_external(
source_repo,
original_sha256,
changed_entry,
vulnerability=vulnerability)
self._request_analysis_external(source_repo, original_sha256,
changed_entry)

# Mark deleted entries as invalid.
for deleted_entry in deleted_entries:
Expand Down
52 changes: 3 additions & 49 deletions docker/importer/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ def test_scheduled_updates(self, mock_publish):
source_repo = osv.SourceRepository.get_by_id('oss-fuzz')
self.assertEqual(datetime.date(2021, 1, 1), source_repo.last_update_date)

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
def test_scheduled_updates_already_done(self, mock_publish):
def test_scheduled_updates_already_done(self):
"""Scheduled updates already done."""
source_repo = osv.SourceRepository.get_by_id('oss-fuzz')
source_repo.last_update_date = importer.utcnow().date()
Expand All @@ -317,68 +316,23 @@ def test_scheduled_updates_already_done(self, mock_publish):
'bucket')
imp.run()

self.assertEqual(0, mock_publish.call_count)

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
def test_no_updates(self, mock_publish):
def test_no_updates(self):
"""Test no update marker."""
self.mock_repo.add_file('2021-111.yaml', _EMPTY_VULNERABILITY)
self.mock_repo.commit('User', 'user@email', 'message. OSV-NO-UPDATE')

imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir,
'bucket')
imp.run()
mock_publish.assert_not_called()

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
def test_ignore(self, mock_publish):
def test_ignore(self):
"""Test ignoring."""
self.mock_repo.add_file('2021-111IGNORE.yaml', _EMPTY_VULNERABILITY)
self.mock_repo.commit('User', 'user@email', 'message.')

imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir,
'bucket')
imp.run()
mock_publish.assert_not_called()

@mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
def test_ecosystem_bridge(self, mock_publish):
"""Test ecosystem pub/sub publishing."""
self.source_repo.key.delete()
self.source_repo = osv.SourceRepository(
type=osv.SourceRepositoryType.GIT,
id='PyPI',
name='PyPI',
repo_url='file://' + self.remote_source_repo_path,
repo_username='')
self.source_repo.put()
self.mock_repo.add_file(
'PYSEC-2021-1.yaml', 'id: PYSEC-2021-1\n'
'affected:\n'
'- package:\n'
' name: pkg\n'
' ecosystem: PyPI\n')
self.mock_repo.commit('User', 'user@email')

imp = importer.Importer('fake_public_key', 'fake_private_key', self.tmp_dir,
'bucket')
imp.run()
mock_publish.assert_has_calls([
mock.call(
'projects/oss-vdb/topics/tasks',
data=b'',
type='update',
source='PyPI',
path='PYSEC-2021-1.yaml',
original_sha256=('875811e67e3e9bb50f3442dc262583c2'
'99b2d8b571e80a53af837b8f3787fa20'),
deleted='false'),
mock.call(
'projects/oss-vdb/topics/pypi-bridge',
data=b'{"id": "PYSEC-2021-1", "affected": '
b'[{"package": {"name": "pkg", "ecosystem": "PyPI"}, '
b'"versions": []}]}')
])


@mock.patch('importer.utcnow', lambda: datetime.datetime(2021, 1, 1))
Expand Down
21 changes: 21 additions & 0 deletions docker/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
'https://github.com/google/AFL.git',
}

_ECOSYSTEM_PUSH_TOPICS = {
'PyPI': 'projects/oss-vdb/topics/pypi-bridge',
}

_state = threading.local()


Expand Down Expand Up @@ -467,6 +471,23 @@ def _do_update(self, source_repo, repo, vulnerability, relative_path,

bug.put()
osv.update_affected_commits(bug.key.id(), result.commits, bug.public)
self._notify_ecosystem_bridge(vulnerability)

def _notify_ecosystem_bridge(self, vulnerability):
"""Notify ecosystem bridges."""
ecosystems = set()
for affected in vulnerability.affected:
if affected.package.ecosystem in ecosystems:
continue

ecosystems.add(affected.package.ecosystem)
ecosystem_push_topic = _ECOSYSTEM_PUSH_TOPICS.get(
affected.package.ecosystem)
if ecosystem_push_topic:
publisher = pubsub_v1.PublisherClient()
publisher.publish(
ecosystem_push_topic,
data=json.dumps(osv.vulnerability_to_dict(vulnerability)).encode())

def _do_process_task(self, subscriber, subscription, ack_id, message,
done_event):
Expand Down
10 changes: 10 additions & 0 deletions docker/worker/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,10 @@ def setUp(self):
source_id='source:BLAH-127.yaml',
source_of_truth=osv.SourceOfTruth.SOURCE_REPO).put()

mock_publish = mock.patch('google.cloud.pubsub_v1.PublisherClient.publish')
self.mock_publish = mock_publish.start()
self.addCleanup(mock_publish.stop)

def tearDown(self):
self.tmp_dir.cleanup()

Expand Down Expand Up @@ -644,6 +648,8 @@ def test_update(self):
'ff8cc32ba60ad9cbb3b23f0a82aad96ebe9ff76b',
], [commit.commit for commit in affected_commits])

self.mock_publish.assert_not_called()

def test_update_limit(self):
"""Test basic update with limit events."""
task_runner = worker.TaskRunner(ndb_client, None, self.tmp_dir.name, None,
Expand Down Expand Up @@ -919,6 +925,8 @@ def test_update_pypi(self):
'eefe8ec3f1f90d0e684890e810f3f21e8500a4cd',
], [a.commit for a in affected_commits])

self.expect_equal('pypi_pubsub_calls', self.mock_publish.mock_calls)

def test_update_maven(self):
"""Test updating maven."""
self.source_repo.ignore_git = False
Expand Down Expand Up @@ -955,6 +963,8 @@ def test_update_maven(self):
'update_maven',
ndb.Key(osv.Bug, 'source:GHSA-838r-hvwh-24h8').get()._to_dict())

self.mock_publish.assert_not_called()

def test_update_bucket(self):
"""Test bucket entries."""
self.source_repo.type = osv.SourceRepositoryType.BUCKET
Expand Down
5 changes: 4 additions & 1 deletion lib/osv/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ def _load_expected(self, expected_name, actual):
f.write(pp.pformat(actual))

with open(expected_path) as f:
return eval(f.read()) # pylint: disable=eval-used
eval_globals = globals()
eval_globals['call'] = mock.call

return eval(f.read(), eval_globals) # pylint: disable=eval-used

def expect_dict_equal(self, expected_name, actual):
"""Check if the output dict is equal to the expected value."""
Expand Down

0 comments on commit 68dd34a

Please sign in to comment.