Skip to content

Commit

Permalink
client updates download: get key from dist-git if no file
Browse files Browse the repository at this point in the history
This extends the 'try and find the correct signing key ID and
get signed packages' mechanism to try and find the key file from
fedora-repos dist-git if we can't find it as a local file. This
will make it work on non-Fedora systems (and Fedora systems where
we can't find the key file for some reason), but makes it a bit
more complicated, and will need updating when dist-git moves.

Signed-off-by: Adam Williamson <[email protected]>
  • Loading branch information
AdamWill committed Mar 1, 2025
1 parent f8342a5 commit a41fc16
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 15 deletions.
45 changes: 38 additions & 7 deletions bodhi-client/bodhi/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import click
import munch
import requests

from bodhi.client import bindings, constants

Expand Down Expand Up @@ -843,6 +844,8 @@ def download(url: str, id_provider: str, client_id: str, **kwargs):
keyname = f'RPM-GPG-KEY-EPEL-{relnum}'
else:
keyname = f'RPM-GPG-KEY-fedora-{relnum}-primary'

# first try from a local file
keypath = f'/etc/pki/rpm-gpg/{keyname}'
if os.path.exists(keypath):
try:
Expand All @@ -854,14 +857,42 @@ def download(url: str, id_provider: str, client_id: str, **kwargs):
except FileNotFoundError:
click.echo('WARNING: could not run gpg')
ret = None
if ret and not ret.returncode:
for line in ret.stdout.splitlines():
if 'keyid: ' in line:
keyid = line.split("keyid: ")[-1][-8:].lower()
elif ret:
click.echo('WARNING: gpg failed')
else:
click.echo(f'WARNING: key file {keypath} does not exist')
# try and get key file from dist-git
if update['release']['id_prefix'] == 'FEDORA-EPEL':
url = 'https://src.fedoraproject.org/rpms/epel-release'
url += f'/raw/epel{relnum}/f/{keyname}'
else:
url = 'https://src.fedoraproject.org/rpms/fedora-repos'
url += f'/raw/rawhide/f/{keyname}'
try:
resp = requests.get(url)
except requests.exceptions.RequestException as err:
click.echo(f'WARNING: Tried {url} to get key, failed with {err}')
resp = None
ret = None
if resp and resp.status_code == 200:
try:
ret = subprocess.run(
('gpg', '--list-packets', '-'),
input=resp.text,
capture_output=True,
text=True
)
except FileNotFoundError:
click.echo('WARNING: could not run gpg')
ret = None
elif resp:
click.echo(f'WARNING: Tried {url} to get key, got {resp.status_code}')
ret = None

if ret and not ret.returncode:
for line in ret.stdout.splitlines():
if 'keyid: ' in line:
keyid = line.split("keyid: ")[-1][-8:].lower()
elif ret:
click.echo('WARNING: gpg failed')

if not keyid:
click.echo('WARNING: could not find GPG key, packages will be unsigned')
for build in update['builds']:
Expand Down
143 changes: 135 additions & 8 deletions bodhi-client/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from requests import HTTPError
import click
import munch
import requests
import pytest

from bodhi.client import bindings, cli, constants
Expand Down Expand Up @@ -338,6 +339,9 @@ def setup_method(self, method):
self.exists = self.ep.start()
self.cp = mock.patch('bodhi.client.cli.subprocess.call', return_value=0)
self.call = self.cp.start()
# this is to try and check we don't hit get when we shouldn't
self.gp = mock.patch('bodhi.client.cli.requests.get', side_effect=Exception("yikes!"))
self.get = self.gp.start()
self.rp = mock.patch('bodhi.client.cli.subprocess.run')
self.run = self.rp.start()
self.run.return_value.returncode = 0
Expand All @@ -355,6 +359,7 @@ def teardown_method(self, method):
"""Stop the patchers."""
self.ep.stop()
self.cp.stop()
self.gp.stop()
self.rp.stop()

def test_fedora_good(self, mocked_client_class):
Expand Down Expand Up @@ -433,35 +438,157 @@ def test_gpg_noexist(self, mocked_client_class):
'koji', 'download-build', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])

def test_key_noexist(self, mocked_client_class):
"""Handle key file not existing."""
def test_no_key_id(self, mocked_client_class):
"""Handle key file existing but not showing a key ID."""
mocked_client_class.send_request.return_value = client_test_data.EXAMPLE_QUERY_MUNCH
runner = testing.CliRunner()
self.exists.return_value = False
self.run.return_value.stdout = "A fish!"
result = runner.invoke(
cli.download,
['--updateid', 'FEDORA-2017-c95b33872d', '--url', 'http://localhost:6543'])
assert result.exit_code == 0
assert result.output == '''Downloading packages from FEDORA-2017-c95b33872d
WARNING: key file /etc/pki/rpm-gpg/RPM-GPG-KEY-fedora-25-primary does not exist
WARNING: could not find GPG key, packages will be unsigned
'''
self.call.assert_called_once_with([
'koji', 'download-build', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])

def test_no_key_id(self, mocked_client_class):
"""Handle key file existing but not showing a key ID."""

class TestDownloadGPGRemote:
"""
Test the signature handling features of download() when no local
key file is available, so we get the key from dist-git.
"""
def setup_method(self, method):
"""We always use these patchers."""
self.ep = mock.patch('bodhi.client.cli.os.path.exists', return_value=False)
self.exists = self.ep.start()
self.cp = mock.patch('bodhi.client.cli.subprocess.call', return_value=0)
self.call = self.cp.start()
self.gp = mock.patch('bodhi.client.cli.requests.get')
self.get = self.gp.start()
self.get.return_value.status_code = 200
self.get.return_value.text = '''-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1
mQINBFb9YzMBEACy1RmbMa6MNIpfHYxLwgCgBVnFYCdCHZqWfYYYK14potfJ9uI2
4Y4w+oHiLeZ/HoG1EBQiDfXHetGZECAKEYQlE7BbRBcd3An9GalKTkWzcshhHFx7
f5JIprL0uY8x2D9HmCfAjMxoh6usWjmAQ+DUYd48iYCkahyZa0/2CgX9HIcEz/M/
oDeQbTwzw9AQbQz382oOErfRaXE/DQrjlx2ln0iejidiOe7DzGZOH9/Foc2KN062
A9VnZ7tU1ACKT8NxZ78RaBL3qmvMGdb7kf7GywjpRNo4J7XCQUP+nP51eCur2wMS
4mY2idDL8Ojouta79pPrviVLmwzunJoFnBcnIhbndebdxPqgOA5XAOaTdLtgurMq
90V45DPyJpkdEyptovksH7zYNGEIGB8cFmrVgUwriB0TLNJTEcM4Knbh4imfTX42
vCE+rEHn3YVqubG7rggibKznJbflwQcqOYZHLlPGYCxO47aaFUo5qJN7QN3lxajb
SzL/SdoHrVL67unzmHyktx5uF8Fv6EDgUV6NCb/IBiEwhR8YHi86NQ8nsI3K8Zhv
EnIxghJQD+cn3ykthwqYmZwi2PJDBiZsOGf3iXbalAjU3JVqoA7mboRPR+IBXQxK
xvAEpyIGeSUN8yBn+JVDRwZ37kkUVs2AOeUwMlnfFSqYFfmqbeQ73A9ECwARAQAB
tDxGZWRvcmEgMjUgUHJpbWFyeSAoMjUpIDxmZWRvcmEtMjUtcHJpbWFyeUBmZWRv
cmFwcm9qZWN0Lm9yZz6JAjgEEwECACIFAlb9YzMCGw8GCwkIBwMCBhUIAgkKCwQW
AgMBAh4BAheAAAoJEECJ2PL9sZyY1TEP/0u/v4g8HEdl9gqlhV179vXCJJiGtzB0
7IGAu++mrsxBrDpqPZTEs6dG5MyzvhhHcmHYrZIiicPAeL9xlZ75oIqQuvjDncoM
kROSGvtfUnvocZhQIPvvkgWe3UAmmP3cSlVzu3KtbTpM+KL71incWo4Tentq9L/f
vsow7vvGbKUMoSSZbAMfjJkzlzSDNlFtaRkrCBQFJ76EKeggjnEZ8H0cowCdGuyv
uBoxQeeQM13b2T9c/uyrXCIcasaOTIKTcqTjbJUTIC2NIZ8OHjtlxZacEaN3ml1M
lNRtbIvqzbtv+sb+DsOVTyd1XIcxU9s+TDKvUm0OBNvj3Bm2BQbi8RHyLFbHWvhx
Gjzb8Wb/MnlcdTlk3M2iPv8dWHXjEM9n9TKyStdpBD9X3P/Gy2gUquHgkl8p+r8o
xmzNH534mKH47kPL/trKInKwv0fkBwxvuPgHG0n79eMHQenVA8gXzG4P6JkcyObA
6xGEEQ/wXFF0gLksmwFWuPm2GcnOI5KmGNgDP2PMhS8/cfJfW04a/tL2T2zr4CmE
LynbOvY/yOJk7/2W3Cb47+yhqo/htrpJDP6n6zQNNk3+e8EVgfhkQqFxom8yCmEP
pW0gBFeE83VoytYPXRkavwmFR+tplyZfOkXG9gysTn8SpRp5+B44O+VeaZumanQZ
kRFmBygMR6M/
=NrXo
-----END PGP PUBLIC KEY BLOCK-----
'''
self.rp = mock.patch('bodhi.client.cli.subprocess.run')
self.run = self.rp.start()
self.run.return_value.returncode = 0
self.run.return_value.stdout = """
# off=0 ctb=99 tag=6 hlen=3 plen=525
:public key packet:
version 4, algo 1, created 1459446579, expires 0
pkey[0]: [4096 bits]
pkey[1]: [17 bits]
keyid: 4089D8F2FDB19C98
# off=528 ctb=b4 tag=13 hlen=2 plen=60
"""

def teardown_method(self, method):
"""Stop the patchers."""
self.ep.stop()
self.cp.stop()
self.gp.stop()
self.rp.stop()

def test_fedora_good_remote(self, mocked_client_class):
"""Success path for Fedora update."""
mocked_client_class.send_request.return_value = client_test_data.EXAMPLE_QUERY_MUNCH
runner = testing.CliRunner()
self.run.return_value.stdout = "A fish!"
result = runner.invoke(
cli.download,
['--updateid', 'FEDORA-2017-c95b33872d', '--url', 'http://localhost:6543'])
assert result.exit_code == 0
assert result.output == 'Downloading packages from FEDORA-2017-c95b33872d\n'
url = 'https://src.fedoraproject.org/rpms/fedora-repos'
url += '/raw/rawhide/f/RPM-GPG-KEY-fedora-25-primary'
self.get.assert_called_once_with(url)
self.run.assert_called_once_with(
('gpg', '--list-packets', '-'),
input=self.get.return_value.text,
capture_output=True,
text=True
)
self.call.assert_called_once_with([
'koji', 'download-build', '--key=fdb19c98', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])

def test_epel_good_remote(self, mocked_client_class):
"""Success path for EPEL update."""
# epel case
mocked_client_class.send_request.return_value = EXAMPLE_QUERY_MUNCH_EPEL
runner = testing.CliRunner()
result = runner.invoke(
cli.download,
['--updateid', 'FEDORA-2017-c95b33872d', '--url', 'http://localhost:6543'])
assert result.exit_code == 0
assert result.output == 'Downloading packages from FEDORA-2017-c95b33872d\n'
url = 'https://src.fedoraproject.org/rpms/epel-release'
url += '/raw/epel7/f/RPM-GPG-KEY-EPEL-7'
self.get.assert_called_once_with(url)
self.call.assert_called_once_with([
'koji', 'download-build', '--key=fdb19c98', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])

def test_bad_status(self, mocked_client_class):
"""We get a bad status code from requests."""
mocked_client_class.send_request.return_value = client_test_data.EXAMPLE_QUERY_MUNCH
runner = testing.CliRunner()
self.get.return_value.status_code = 404
result = runner.invoke(
cli.download,
['--updateid', 'FEDORA-2017-c95b33872d', '--url', 'http://localhost:6543'])
assert result.exit_code == 0
assert result.output == '''Downloading packages from FEDORA-2017-c95b33872d
WARNING: Tried https://src.fedoraproject.org/rpms/fedora-repos/raw/rawhide/f/RPM-GPG-KEY-fedora-25-primary to get key, got 404
WARNING: could not find GPG key, packages will be unsigned
'''
''' # noqa: E501
self.call.assert_called_once_with([
'koji', 'download-build', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])

def test_request_exception(self, mocked_client_class):
"""We get an exception from requests."""
mocked_client_class.send_request.return_value = client_test_data.EXAMPLE_QUERY_MUNCH
runner = testing.CliRunner()
self.get.side_effect = requests.ConnectionError("foo")
result = runner.invoke(
cli.download,
['--updateid', 'FEDORA-2017-c95b33872d', '--url', 'http://localhost:6543'])
assert result.exit_code == 0
assert result.output == '''Downloading packages from FEDORA-2017-c95b33872d
WARNING: Tried https://src.fedoraproject.org/rpms/fedora-repos/raw/rawhide/f/RPM-GPG-KEY-fedora-25-primary to get key, failed with foo
WARNING: could not find GPG key, packages will be unsigned
''' # noqa: E501
self.call.assert_called_once_with([
'koji', 'download-build', '--arch=noarch',
'--arch={}'.format(platform.machine()), 'nodejs-grunt-wrap-0.3.0-2.fc25'])
Expand Down

0 comments on commit a41fc16

Please sign in to comment.