Skip to content

Commit

Permalink
Merge pull request #7461 from Turbo87/lazy-repo
Browse files Browse the repository at this point in the history
worker: Clone index repository in the background
  • Loading branch information
Turbo87 authored Nov 7, 2023
2 parents 3cbd5ee + 7ed4297 commit a8fb349
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 21 deletions.
22 changes: 12 additions & 10 deletions src/bin/background-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use crates_io::worker::swirl::Runner;
use crates_io::worker::{Environment, RunnerExt};
use crates_io::{db, ssh};
use crates_io_env_vars::{var, var_parsed};
use crates_io_index::{Repository, RepositoryConfig};
use crates_io_index::RepositoryConfig;
use diesel::r2d2;
use diesel::r2d2::ConnectionManager;
use reqwest::blocking::Client;
use secrecy::ExposeSecret;
use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::time::Duration;

fn main() -> anyhow::Result<()> {
let _sentry = crates_io::sentry::init();
Expand Down Expand Up @@ -59,18 +59,11 @@ fn main() -> anyhow::Result<()> {

let job_start_timeout = var_parsed("BACKGROUND_JOB_TIMEOUT")?.unwrap_or(30);

info!("Cloning index");

if var("HEROKU")?.is_some() {
ssh::write_known_hosts_file().unwrap();
}

let clone_start = Instant::now();
let repository_config = RepositoryConfig::from_environment()?;
let repository = Repository::open(&repository_config).expect("Failed to clone index");

let clone_duration = clone_start.elapsed();
info!(duration = ?clone_duration, "Index cloned");

let cloudfront = CloudFront::from_environment();
let fastly = Fastly::from_environment();
Expand All @@ -81,10 +74,19 @@ fn main() -> anyhow::Result<()> {
.build()
.expect("Couldn't build client");

let environment = Environment::new(repository, client, cloudfront, fastly, storage);
let environment = Environment::new(repository_config, client, cloudfront, fastly, storage);

let environment = Arc::new(environment);

std::thread::spawn({
let environment = environment.clone();
move || {
if let Err(err) = environment.lock_index() {
warn!(%err, "Failed to clone index");
};
}
});

let connection_pool = r2d2::Pool::builder()
.max_size(10)
.min_idle(Some(0))
Expand Down
5 changes: 2 additions & 3 deletions src/tests/util/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crates_io::worker::{Environment, RunnerExt};
use crates_io::{App, Emails, Env};
use crates_io_env_vars::required_var;
use crates_io_index::testing::UpstreamIndex;
use crates_io_index::{Credentials, Repository as WorkerRepository, RepositoryConfig};
use crates_io_index::{Credentials, RepositoryConfig};
use crates_io_test_db::TestDatabase;
use diesel::PgConnection;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -252,9 +252,8 @@ impl TestAppBuilder {
index_location: UpstreamIndex::url(),
credentials: Credentials::Missing,
};
let index = WorkerRepository::open(&repository_config).expect("Could not clone index");
let environment = Environment::new(
index,
repository_config,
app.http_client().clone(),
None,
None,
Expand Down
53 changes: 45 additions & 8 deletions src/worker/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use crate::cloudfront::CloudFront;
use crate::fastly::Fastly;
use crate::storage::Storage;
use crate::worker::swirl::PerformError;
use crates_io_index::Repository;
use crates_io_index::{Repository, RepositoryConfig};
use reqwest::blocking::Client;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::Instant;

pub struct Environment {
index: Mutex<Repository>,
repository_config: RepositoryConfig,
repository: Mutex<Option<Repository>>,
http_client: Client,
cloudfront: Option<CloudFront>,
fastly: Option<Fastly>,
Expand All @@ -16,14 +19,15 @@ pub struct Environment {

impl Environment {
pub fn new(
index: Repository,
repository_config: RepositoryConfig,
http_client: Client,
cloudfront: Option<CloudFront>,
fastly: Option<Fastly>,
storage: Arc<Storage>,
) -> Self {
Self {
index: Mutex::new(index),
repository_config,
repository: Mutex::new(None),
http_client,
cloudfront,
fastly,
Expand All @@ -32,10 +36,25 @@ impl Environment {
}

#[instrument(skip_all)]
pub fn lock_index(&self) -> Result<MutexGuard<'_, Repository>, PerformError> {
let repo = self.index.lock().unwrap_or_else(PoisonError::into_inner);
repo.reset_head()?;
Ok(repo)
pub fn lock_index(&self) -> Result<RepositoryLock<'_>, PerformError> {
let mut repo = self
.repository
.lock()
.unwrap_or_else(PoisonError::into_inner);

if repo.is_none() {
info!("Cloning index");
let clone_start = Instant::now();

*repo = Some(Repository::open(&self.repository_config)?);

let clone_duration = clone_start.elapsed();
info!(duration = ?clone_duration, "Index cloned");
}

let repo_lock = RepositoryLock { repo };
repo_lock.reset_head()?;
Ok(repo_lock)
}

/// Returns a client for making HTTP requests to upload crate files.
Expand All @@ -51,3 +70,21 @@ impl Environment {
self.fastly.as_ref()
}
}

pub struct RepositoryLock<'a> {
repo: MutexGuard<'a, Option<Repository>>,
}

impl Deref for RepositoryLock<'_> {
type Target = Repository;

fn deref(&self) -> &Self::Target {
self.repo.as_ref().unwrap()
}
}

impl DerefMut for RepositoryLock<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.repo.as_mut().unwrap()
}
}

0 comments on commit a8fb349

Please sign in to comment.