diff --git a/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/main.rs b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/main.rs index 506da68c34075..f01cece1a52b2 100644 --- a/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/main.rs +++ b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/main.rs @@ -5,407 +5,24 @@ * GNU General Public License version 2. */ -use std::fs::File; -use std::io::BufRead; -use std::io::BufReader; -use std::str::FromStr; +mod run; +mod sharding; + use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::OnceLock; -use std::time::Duration; -use anyhow::bail; -use anyhow::format_err; -use anyhow::Context; use anyhow::Error; -use async_trait::async_trait; -use backsyncer::backsync_latest; -use backsyncer::format_counter; -use backsyncer::open_backsyncer_dbs; -use backsyncer::BacksyncLimit; -use backsyncer::Repo; -use blobrepo_hg::BlobRepoHg; -use bookmarks::BookmarkUpdateLogRef; -use bookmarks::Freshness; -use clap::Arg; -use clap::SubCommand; -use clientinfo::ClientEntryPoint; -use clientinfo::ClientInfo; -use cloned::cloned; use cmdlib::args; -use cmdlib::args::MononokeMatches; use cmdlib::helpers; -use cmdlib_x_repo::create_commit_syncer_from_matches; -use context::CoreContext; -use context::SessionContainer; -use cross_repo_sync::CandidateSelectionHint; -use cross_repo_sync::CommitSyncContext; -use cross_repo_sync::CommitSyncOutcome; -use cross_repo_sync::CommitSyncer; -use executor_lib::RepoShardedProcess; -use executor_lib::RepoShardedProcessExecutor; use executor_lib::ShardedProcessExecutor; use fbinit::FacebookInit; -use futures::future; -use futures::future::FutureExt; -use futures::stream; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; -use futures::try_join; -use live_commit_sync_config::LiveCommitSyncConfig; -use mercurial_derivation::DeriveHgChangeset; -use mercurial_types::HgChangesetId; -use metadata::Metadata; -use mononoke_types::ChangesetId; -use repo_identity::RepoIdentityRef; -use scuba_ext::MononokeScubaSampleBuilder; -use sharding_ext::RepoShard; -use slog::debug; -use slog::error; -use slog::info; -use stats::prelude::*; -use tokio::runtime::Runtime; -use wireproto_handler::TargetRepoDbs; - -const ARG_MODE_BACKSYNC_FOREVER: &str = "backsync-forever"; -const ARG_MODE_BACKSYNC_ALL: &str = "backsync-all"; -const ARG_MODE_BACKSYNC_COMMITS: &str = "backsync-commits"; -const ARG_BATCH_SIZE: &str = "batch-size"; -const ARG_INPUT_FILE: &str = "INPUT_FILE"; -const SCUBA_TABLE: &str = "mononoke_xrepo_backsync"; - -define_stats! { - prefix = "mononoke.backsyncer"; - remaining_entries: dynamic_singleton_counter( - "{}.{}.remaining_entries", - (source_repo_name: String, target_repo_name: String) - ), - delay_secs: dynamic_singleton_counter( - "{}.{}.delay_secs", - (source_repo_name: String, target_repo_name: String) - ), -} - -const DEFAULT_SHARDED_SCOPE_NAME: &str = "global"; -const SM_CLEANUP_TIMEOUT_SECS: u64 = 120; -const APP_NAME: &str = "backsyncer cmd-line tool"; - -/// Struct representing the Back Syncer BP. -pub struct BacksyncProcess { - matches: Arc>, - fb: FacebookInit, - _runtime: Runtime, -} - -impl BacksyncProcess { - fn new(fb: FacebookInit) -> anyhow::Result { - let app = args::MononokeAppBuilder::new(APP_NAME) - .with_fb303_args() - .with_source_and_target_repos() - .with_dynamic_repos() - .with_scribe_args() - .with_default_scuba_dataset(SCUBA_TABLE) - .with_scuba_logging_args() - .build(); - let backsync_forever_subcommand = SubCommand::with_name(ARG_MODE_BACKSYNC_FOREVER) - .about("Backsyncs all new bookmark moves"); - - let sync_loop = SubCommand::with_name(ARG_MODE_BACKSYNC_COMMITS) - .about("Syncs all commits from the file") - .arg( - Arg::with_name(ARG_INPUT_FILE) - .takes_value(true) - .required(true) - .help("list of hg commits to backsync"), - ) - .arg( - Arg::with_name(ARG_BATCH_SIZE) - .long(ARG_BATCH_SIZE) - .takes_value(true) - .required(false) - .help("how many commits to backsync at once"), - ); - - let backsync_all_subcommand = SubCommand::with_name(ARG_MODE_BACKSYNC_ALL) - .about("Backsyncs all new bookmark moves once"); - let app = app - .subcommand(backsync_all_subcommand) - .subcommand(backsync_forever_subcommand) - .subcommand(sync_loop); - let (matches, _runtime) = app.get_matches(fb)?; - let matches = Arc::new(matches); - Ok(Self { - matches, - fb, - _runtime, - }) - } -} - -#[async_trait] -impl RepoShardedProcess for BacksyncProcess { - async fn setup(&self, repo: &RepoShard) -> anyhow::Result> { - let logger = self.matches.logger(); - // For backsyncer, two repos (i.e. source and target) are required as input - let source_repo_name = repo.repo_name.clone(); - let target_repo_name = match repo.target_repo_name.clone() { - Some(repo_name) => repo_name, - None => { - let details = format!( - "Only source repo name {} provided, target repo name missing in {}", - source_repo_name, repo - ); - error!(logger, "{}", details); - bail!("{}", details) - } - }; - info!( - logger, - "Setting up back syncer command from repo {} to repo {}", - source_repo_name, - target_repo_name, - ); - let details = format!( - "Completed back syncer command setup from repo {} to repo {}", - source_repo_name, target_repo_name - ); - let executor = BacksyncProcessExecutor::new( - self.fb, - Arc::clone(&self.matches), - source_repo_name, - target_repo_name, - ); - info!(logger, "{}", details); - Ok(Arc::new(executor)) - } -} - -/// Struct representing the execution of the Back Syncer -/// BP over the context of a provided repos. -pub struct BacksyncProcessExecutor { - fb: FacebookInit, - matches: Arc>, - source_repo_name: String, - target_repo_name: String, - cancellation_requested: Arc, -} - -impl BacksyncProcessExecutor { - fn new( - fb: FacebookInit, - matches: Arc>, - source_repo_name: String, - target_repo_name: String, - ) -> Self { - Self { - fb, - matches, - source_repo_name, - target_repo_name, - cancellation_requested: Arc::new(AtomicBool::new(false)), - } - } -} - -#[async_trait] -impl RepoShardedProcessExecutor for BacksyncProcessExecutor { - async fn execute(&self) -> anyhow::Result<()> { - info!( - self.matches.logger(), - "Initiating back syncer command execution for repo pair {}-{}", - &self.source_repo_name, - &self.target_repo_name, - ); - run( - self.fb, - Arc::clone(&self.matches), - self.source_repo_name.clone(), - self.target_repo_name.clone(), - Arc::clone(&self.cancellation_requested), - ) - .await - .with_context(|| { - format!( - "Error during back syncer command execution for repo pair {}-{}", - &self.source_repo_name, &self.target_repo_name, - ) - })?; - info!( - self.matches.logger(), - "Finished back syncer command execution for repo pair {}-{}", - &self.source_repo_name, - self.target_repo_name - ); - Ok(()) - } - - async fn stop(&self) -> anyhow::Result<()> { - info!( - self.matches.logger(), - "Terminating back syncer command execution for repo pair {}-{}", - &self.source_repo_name, - self.target_repo_name, - ); - self.cancellation_requested.store(true, Ordering::Relaxed); - Ok(()) - } -} - -fn extract_cs_id_from_sync_outcome( - source_cs_id: ChangesetId, - maybe_sync_outcome: Option, -) -> Result, Error> { - use CommitSyncOutcome::*; - - match maybe_sync_outcome { - Some(RewrittenAs(cs_id, _)) => Ok(Some(cs_id)), - Some(NotSyncCandidate(_)) => Ok(None), - Some(EquivalentWorkingCopyAncestor(cs_id, _)) => Ok(Some(cs_id)), - None => Err(format_err!( - "sync outcome is not available for {}", - source_cs_id - )), - } -} -async fn derive_target_hg_changesets( - ctx: &CoreContext, - maybe_target_cs_id: Option, - commit_syncer: &CommitSyncer, -) -> Result<(), Error> { - match maybe_target_cs_id { - Some(target_cs_id) => { - let hg_cs_id = commit_syncer - .get_target_repo() - .derive_hg_changeset(ctx, target_cs_id) - .await?; - info!( - ctx.logger(), - "Hg cs id {} derived for {}", hg_cs_id, target_cs_id - ); - Ok(()) - } - None => Ok(()), - } -} - -pub async fn backsync_forever( - ctx: CoreContext, - commit_syncer: CommitSyncer, - target_repo_dbs: Arc, - source_repo_name: String, - target_repo_name: String, - live_commit_sync_config: Arc, - cancellation_requested: Arc, -) -> Result<(), Error> { - let target_repo_id = commit_syncer.get_target_repo_id(); - let mut commit_only_backsync_future: Box + Send + Unpin> = - Box::new(future::ready(())); - - loop { - // Before initiating loop, check if cancellation has been - // requested. If yes, then exit early. - if cancellation_requested.load(Ordering::Relaxed) { - info!(ctx.logger(), "sync stopping due to cancellation request"); - return Ok(()); - } - // We only care about public pushes because draft pushes are not in the bookmark - // update log at all. - let enabled = live_commit_sync_config - .push_redirector_enabled_for_public(&ctx, target_repo_id) - .await?; - - if enabled { - let delay = calculate_delay(&ctx, &commit_syncer, &target_repo_dbs).await?; - log_delay(&ctx, &delay, &source_repo_name, &target_repo_name); - if delay.remaining_entries == 0 { - debug!(ctx.logger(), "no entries remained"); - tokio::time::sleep(Duration::new(1, 0)).await; - } else { - debug!(ctx.logger(), "backsyncing..."); - - commit_only_backsync_future = backsync_latest( - ctx.clone(), - commit_syncer.clone(), - target_repo_dbs.clone(), - BacksyncLimit::NoLimit, - Arc::clone(&cancellation_requested), - CommitSyncContext::Backsyncer, - false, - commit_only_backsync_future, - ) - .await? - } - } else { - debug!(ctx.logger(), "push redirector is disabled"); - let delay = Delay::no_delay(); - log_delay(&ctx, &delay, &source_repo_name, &target_repo_name); - tokio::time::sleep(Duration::new(1, 0)).await; - } - } -} - -struct Delay { - delay_secs: i64, - remaining_entries: u64, -} - -impl Delay { - fn no_delay() -> Self { - Self { - delay_secs: 0, - remaining_entries: 0, - } - } -} - -// Returns logs delay and returns the number of remaining bookmark update log entries -async fn calculate_delay( - ctx: &CoreContext, - commit_syncer: &CommitSyncer, - target_repo_dbs: &TargetRepoDbs, -) -> Result { - let TargetRepoDbs { ref counters, .. } = target_repo_dbs; - let source_repo_id = commit_syncer.get_source_repo().repo_identity().id(); - - let counter_name = format_counter(&source_repo_id); - let maybe_counter = counters.get_counter(ctx, &counter_name).await?; - let counter = maybe_counter - .ok_or_else(|| format_err!("{} counter not found", counter_name))? - .try_into()?; - let source_repo = commit_syncer.get_source_repo(); - let next_entry = source_repo - .bookmark_update_log() - .read_next_bookmark_log_entries(ctx.clone(), counter, 1, Freshness::MostRecent) - .try_collect::>(); - let remaining_entries = source_repo - .bookmark_update_log() - .count_further_bookmark_log_entries(ctx.clone(), counter, None); - - let (next_entry, remaining_entries) = try_join!(next_entry, remaining_entries)?; - let delay_secs = next_entry - .first() - .map_or(0, |entry| entry.timestamp.since_seconds()); - - Ok(Delay { - delay_secs, - remaining_entries, - }) -} - -fn log_delay(ctx: &CoreContext, delay: &Delay, source_repo_name: &str, target_repo_name: &str) { - STATS::remaining_entries.set_value( - ctx.fb, - delay.remaining_entries as i64, - (source_repo_name.to_owned(), target_repo_name.to_owned()), - ); - STATS::delay_secs.set_value( - ctx.fb, - delay.delay_secs, - (source_repo_name.to_owned(), target_repo_name.to_owned()), - ); -} +use crate::run::run_backsyncer; +use crate::sharding::BacksyncProcess; +use crate::sharding::APP_NAME; +use crate::sharding::DEFAULT_SHARDED_SCOPE_NAME; +use crate::sharding::SM_CLEANUP_TIMEOUT_SECS; #[fbinit::main] fn main(fb: FacebookInit) -> Result<(), Error> { @@ -455,7 +72,7 @@ fn main(fb: FacebookInit) -> Result<(), Error> { args::get_config_by_repoid(config_store, &matches, source_repo_id)?; let (target_repo_name, _) = args::get_config_by_repoid(config_store, &matches, target_repo_id)?; - let fut = run( + let fut = run_backsyncer( fb, matches.clone(), source_repo_name, @@ -473,197 +90,3 @@ fn main(fb: FacebookInit) -> Result<(), Error> { } } } - -async fn run( - fb: FacebookInit, - matches: Arc>, - source_repo_name: String, - target_repo_name: String, - cancellation_requested: Arc, -) -> Result<(), Error> { - let config_store = matches.config_store(); - - let source_repo = args::resolve_repo_by_name(config_store, &matches, &source_repo_name)?; - let target_repo = args::resolve_repo_by_name(config_store, &matches, &target_repo_name)?; - let repo_tag = format!("{}=>{}", &source_repo_name, &target_repo_name); - let mut metadata = Metadata::default(); - metadata.add_client_info(ClientInfo::default_with_entry_point( - ClientEntryPoint::MegarepoBacksyncer, - )); - let session_container = SessionContainer::builder(fb) - .metadata(Arc::new(metadata)) - .build(); - let scribe = args::get_scribe(fb, &matches)?; - let ctx = session_container - .new_context_with_scribe( - matches.logger().clone(), - MononokeScubaSampleBuilder::with_discard(), - scribe.clone(), - ) - .clone_with_repo_name(&repo_tag); - let commit_syncer = create_commit_syncer_from_matches::( - &ctx, - &matches, - Some((source_repo.id, target_repo.id)), - ) - .await?; - let logger = ctx.logger(); - - info!( - logger, - "syncing from repoid {:?} into repoid {:?}", source_repo.id, target_repo.id, - ); - - let live_commit_sync_config = commit_syncer.live_commit_sync_config.clone(); - - match matches.subcommand() { - (ARG_MODE_BACKSYNC_ALL, _) => { - let scuba_sample = MononokeScubaSampleBuilder::with_discard(); - let ctx = - session_container.new_context_with_scribe(logger.clone(), scuba_sample, scribe); - let target_repo_dbs = Arc::new( - open_backsyncer_dbs(commit_syncer.get_target_repo()) - .boxed() - .await?, - ); - - // TODO(ikostia): why do we use discarding ScubaSample for BACKSYNC_ALL? - backsync_latest( - ctx, - commit_syncer, - target_repo_dbs, - BacksyncLimit::NoLimit, - cancellation_requested, - CommitSyncContext::Backsyncer, - false, - Box::new(future::ready(())), - ) - .boxed() - .await? - .await; - } - (ARG_MODE_BACKSYNC_FOREVER, _) => { - let target_repo_dbs = Arc::new( - open_backsyncer_dbs(commit_syncer.get_target_repo()) - .boxed() - .await?, - ); - - let mut scuba_sample = matches.scuba_sample_builder(); - scuba_sample.add("source_repo", source_repo.id.id()); - scuba_sample.add("source_repo_name", source_repo.name.clone()); - scuba_sample.add("target_repo", target_repo.id.id()); - scuba_sample.add("target_repo_name", target_repo.name.clone()); - scuba_sample.add_common_server_data(); - - let ctx = - session_container.new_context_with_scribe(logger.clone(), scuba_sample, scribe); - let f = backsync_forever( - ctx, - commit_syncer, - target_repo_dbs, - source_repo.name, - target_repo.name, - live_commit_sync_config, - cancellation_requested, - ) - .boxed(); - f.await?; - } - (ARG_MODE_BACKSYNC_COMMITS, Some(sub_m)) => { - let ctx = session_container.new_context_with_scribe( - logger.clone(), - MononokeScubaSampleBuilder::with_discard(), - scribe, - ); - let inputfile = sub_m - .value_of(ARG_INPUT_FILE) - .expect("input file is not set"); - let inputfile = File::open(inputfile)?; - let file = BufReader::new(&inputfile); - let batch_size = args::get_usize(matches.as_ref(), ARG_BATCH_SIZE, 100); - - let source_repo = commit_syncer.get_source_repo().clone(); - - let mut hg_cs_ids = vec![]; - for line in file.lines() { - hg_cs_ids.push(HgChangesetId::from_str(&line?)?); - } - let total_to_backsync = hg_cs_ids.len(); - info!(ctx.logger(), "backsyncing {} commits", total_to_backsync); - - let ctx = &ctx; - let commit_syncer = &commit_syncer; - - // Before processing each commit, check if cancellation has - // been requested and exit if that's the case. - if cancellation_requested.load(Ordering::Relaxed) { - info!(ctx.logger(), "sync stopping due to cancellation request"); - return Ok(()); - } - let f = stream::iter(hg_cs_ids.clone()) - .chunks(batch_size) - .map(Result::<_, Error>::Ok) - .and_then({ - cloned!(ctx); - move |chunk| { - cloned!(ctx, source_repo); - async move { source_repo.get_hg_bonsai_mapping(ctx.clone(), chunk).await } - } - }) - .try_fold(0, move |backsynced_so_far, hg_bonsai_mapping| { - hg_bonsai_mapping - .into_iter() - .map({ - move |(_, bonsai)| async move { - // Backsyncer is always used in the large-to-small direction, - // therefore there can be at most one remapped candidate, - // so `CandidateSelectionHint::Only` is a safe choice - commit_syncer - .sync_commit( - ctx, - bonsai.clone(), - CandidateSelectionHint::Only, - CommitSyncContext::Backsyncer, - false, - ) - .await?; - - let maybe_sync_outcome = - commit_syncer.get_commit_sync_outcome(ctx, bonsai).await?; - - info!( - ctx.logger(), - "{} backsynced as {:?}", bonsai, maybe_sync_outcome - ); - - let maybe_target_cs_id = - extract_cs_id_from_sync_outcome(bonsai, maybe_sync_outcome)?; - - derive_target_hg_changesets(ctx, maybe_target_cs_id, commit_syncer) - .await - } - }) - .collect::>() - .try_fold(backsynced_so_far, { - move |backsynced_so_far, _| async move { - info!( - ctx.logger(), - "backsynced so far {} out of {}", - backsynced_so_far + 1, - total_to_backsync - ); - Ok::<_, Error>(backsynced_so_far + 1) - } - }) - }); - - f.await?; - } - _ => { - bail!("unknown subcommand"); - } - } - - Ok(()) -} diff --git a/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/run.rs b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/run.rs new file mode 100644 index 0000000000000..66bac8db7b46e --- /dev/null +++ b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/run.rs @@ -0,0 +1,425 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use std::fs::File; +use std::io::BufRead; +use std::io::BufReader; +use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::bail; +use anyhow::format_err; +use anyhow::Error; +use backsyncer::backsync_latest; +use backsyncer::format_counter; +use backsyncer::open_backsyncer_dbs; +use backsyncer::BacksyncLimit; +use backsyncer::Repo; +use blobrepo_hg::BlobRepoHg; +use bookmarks::BookmarkUpdateLogRef; +use bookmarks::Freshness; +use clientinfo::ClientEntryPoint; +use clientinfo::ClientInfo; +use cloned::cloned; +use cmdlib::args; +use cmdlib::args::MononokeMatches; +use cmdlib_x_repo::create_commit_syncer_from_matches; +use context::CoreContext; +use context::SessionContainer; +use cross_repo_sync::CandidateSelectionHint; +use cross_repo_sync::CommitSyncContext; +use cross_repo_sync::CommitSyncOutcome; +use cross_repo_sync::CommitSyncer; +use fbinit::FacebookInit; +use futures::future; +use futures::future::FutureExt; +use futures::stream; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; +use futures::try_join; +use live_commit_sync_config::LiveCommitSyncConfig; +use mercurial_derivation::DeriveHgChangeset; +use mercurial_types::HgChangesetId; +use metadata::Metadata; +use mononoke_types::ChangesetId; +use repo_identity::RepoIdentityRef; +use scuba_ext::MononokeScubaSampleBuilder; +use slog::debug; +use slog::info; +use stats::prelude::*; +use wireproto_handler::TargetRepoDbs; + +use crate::sharding::ARG_BATCH_SIZE; +use crate::sharding::ARG_INPUT_FILE; +use crate::sharding::ARG_MODE_BACKSYNC_ALL; +use crate::sharding::ARG_MODE_BACKSYNC_COMMITS; +use crate::sharding::ARG_MODE_BACKSYNC_FOREVER; + +define_stats! { + prefix = "mononoke.backsyncer"; + remaining_entries: dynamic_singleton_counter( + "{}.{}.remaining_entries", + (source_repo_name: String, target_repo_name: String) + ), + delay_secs: dynamic_singleton_counter( + "{}.{}.delay_secs", + (source_repo_name: String, target_repo_name: String) + ), +} + +pub(crate) async fn run_backsyncer( + fb: FacebookInit, + matches: Arc>, + source_repo_name: String, + target_repo_name: String, + cancellation_requested: Arc, +) -> Result<(), Error> { + let config_store = matches.config_store(); + + let source_repo = args::resolve_repo_by_name(config_store, &matches, &source_repo_name)?; + let target_repo = args::resolve_repo_by_name(config_store, &matches, &target_repo_name)?; + let repo_tag = format!("{}=>{}", &source_repo_name, &target_repo_name); + let mut metadata = Metadata::default(); + metadata.add_client_info(ClientInfo::default_with_entry_point( + ClientEntryPoint::MegarepoBacksyncer, + )); + let session_container = SessionContainer::builder(fb) + .metadata(Arc::new(metadata)) + .build(); + let scribe = args::get_scribe(fb, &matches)?; + let ctx = session_container + .new_context_with_scribe( + matches.logger().clone(), + MononokeScubaSampleBuilder::with_discard(), + scribe.clone(), + ) + .clone_with_repo_name(&repo_tag); + let commit_syncer = create_commit_syncer_from_matches::( + &ctx, + &matches, + Some((source_repo.id, target_repo.id)), + ) + .await?; + let logger = ctx.logger(); + + info!( + logger, + "syncing from repoid {:?} into repoid {:?}", source_repo.id, target_repo.id, + ); + + let live_commit_sync_config = commit_syncer.live_commit_sync_config.clone(); + + match matches.subcommand() { + (ARG_MODE_BACKSYNC_ALL, _) => { + let scuba_sample = MononokeScubaSampleBuilder::with_discard(); + let ctx = + session_container.new_context_with_scribe(logger.clone(), scuba_sample, scribe); + let target_repo_dbs = Arc::new( + open_backsyncer_dbs(commit_syncer.get_target_repo()) + .boxed() + .await?, + ); + + // TODO(ikostia): why do we use discarding ScubaSample for BACKSYNC_ALL? + backsync_latest( + ctx, + commit_syncer, + target_repo_dbs, + BacksyncLimit::NoLimit, + cancellation_requested, + CommitSyncContext::Backsyncer, + false, + Box::new(future::ready(())), + ) + .boxed() + .await? + .await; + } + (ARG_MODE_BACKSYNC_FOREVER, _) => { + let target_repo_dbs = Arc::new( + open_backsyncer_dbs(commit_syncer.get_target_repo()) + .boxed() + .await?, + ); + + let mut scuba_sample = matches.scuba_sample_builder(); + scuba_sample.add("source_repo", source_repo.id.id()); + scuba_sample.add("source_repo_name", source_repo.name.clone()); + scuba_sample.add("target_repo", target_repo.id.id()); + scuba_sample.add("target_repo_name", target_repo.name.clone()); + scuba_sample.add_common_server_data(); + + let ctx = + session_container.new_context_with_scribe(logger.clone(), scuba_sample, scribe); + let f = backsync_forever( + ctx, + commit_syncer, + target_repo_dbs, + source_repo.name, + target_repo.name, + live_commit_sync_config, + cancellation_requested, + ) + .boxed(); + f.await?; + } + (ARG_MODE_BACKSYNC_COMMITS, Some(sub_m)) => { + let ctx = session_container.new_context_with_scribe( + logger.clone(), + MononokeScubaSampleBuilder::with_discard(), + scribe, + ); + let inputfile = sub_m + .value_of(ARG_INPUT_FILE) + .expect("input file is not set"); + let inputfile = File::open(inputfile)?; + let file = BufReader::new(&inputfile); + let batch_size = args::get_usize(matches.as_ref(), ARG_BATCH_SIZE, 100); + + let source_repo = commit_syncer.get_source_repo().clone(); + + let mut hg_cs_ids = vec![]; + for line in file.lines() { + hg_cs_ids.push(HgChangesetId::from_str(&line?)?); + } + let total_to_backsync = hg_cs_ids.len(); + info!(ctx.logger(), "backsyncing {} commits", total_to_backsync); + + let ctx = &ctx; + let commit_syncer = &commit_syncer; + + // Before processing each commit, check if cancellation has + // been requested and exit if that's the case. + if cancellation_requested.load(Ordering::Relaxed) { + info!(ctx.logger(), "sync stopping due to cancellation request"); + return Ok(()); + } + let f = stream::iter(hg_cs_ids.clone()) + .chunks(batch_size) + .map(Result::<_, Error>::Ok) + .and_then({ + cloned!(ctx); + move |chunk| { + cloned!(ctx, source_repo); + async move { source_repo.get_hg_bonsai_mapping(ctx.clone(), chunk).await } + } + }) + .try_fold(0, move |backsynced_so_far, hg_bonsai_mapping| { + hg_bonsai_mapping + .into_iter() + .map({ + move |(_, bonsai)| async move { + // Backsyncer is always used in the large-to-small direction, + // therefore there can be at most one remapped candidate, + // so `CandidateSelectionHint::Only` is a safe choice + commit_syncer + .sync_commit( + ctx, + bonsai.clone(), + CandidateSelectionHint::Only, + CommitSyncContext::Backsyncer, + false, + ) + .await?; + + let maybe_sync_outcome = + commit_syncer.get_commit_sync_outcome(ctx, bonsai).await?; + + info!( + ctx.logger(), + "{} backsynced as {:?}", bonsai, maybe_sync_outcome + ); + + let maybe_target_cs_id = + extract_cs_id_from_sync_outcome(bonsai, maybe_sync_outcome)?; + + derive_target_hg_changesets(ctx, maybe_target_cs_id, commit_syncer) + .await + } + }) + .collect::>() + .try_fold(backsynced_so_far, { + move |backsynced_so_far, _| async move { + info!( + ctx.logger(), + "backsynced so far {} out of {}", + backsynced_so_far + 1, + total_to_backsync + ); + Ok::<_, Error>(backsynced_so_far + 1) + } + }) + }); + + f.await?; + } + _ => { + bail!("unknown subcommand"); + } + } + + Ok(()) +} + +// TODO2: make private +pub async fn backsync_forever( + ctx: CoreContext, + commit_syncer: CommitSyncer, + target_repo_dbs: Arc, + source_repo_name: String, + target_repo_name: String, + live_commit_sync_config: Arc, + cancellation_requested: Arc, +) -> Result<(), Error> { + let target_repo_id = commit_syncer.get_target_repo_id(); + let mut commit_only_backsync_future: Box + Send + Unpin> = + Box::new(future::ready(())); + + loop { + // Before initiating loop, check if cancellation has been + // requested. If yes, then exit early. + if cancellation_requested.load(Ordering::Relaxed) { + info!(ctx.logger(), "sync stopping due to cancellation request"); + return Ok(()); + } + // We only care about public pushes because draft pushes are not in the bookmark + // update log at all. + let enabled = live_commit_sync_config + .push_redirector_enabled_for_public(&ctx, target_repo_id) + .await?; + + if enabled { + let delay = calculate_delay(&ctx, &commit_syncer, &target_repo_dbs).await?; + log_delay(&ctx, &delay, &source_repo_name, &target_repo_name); + if delay.remaining_entries == 0 { + debug!(ctx.logger(), "no entries remained"); + tokio::time::sleep(Duration::new(1, 0)).await; + } else { + debug!(ctx.logger(), "backsyncing..."); + + commit_only_backsync_future = backsync_latest( + ctx.clone(), + commit_syncer.clone(), + target_repo_dbs.clone(), + BacksyncLimit::NoLimit, + Arc::clone(&cancellation_requested), + CommitSyncContext::Backsyncer, + false, + commit_only_backsync_future, + ) + .await? + } + } else { + debug!(ctx.logger(), "push redirector is disabled"); + let delay = Delay::no_delay(); + log_delay(&ctx, &delay, &source_repo_name, &target_repo_name); + tokio::time::sleep(Duration::new(1, 0)).await; + } + } +} + +fn extract_cs_id_from_sync_outcome( + source_cs_id: ChangesetId, + maybe_sync_outcome: Option, +) -> Result, Error> { + use CommitSyncOutcome::*; + + match maybe_sync_outcome { + Some(RewrittenAs(cs_id, _)) => Ok(Some(cs_id)), + Some(NotSyncCandidate(_)) => Ok(None), + Some(EquivalentWorkingCopyAncestor(cs_id, _)) => Ok(Some(cs_id)), + None => Err(format_err!( + "sync outcome is not available for {}", + source_cs_id + )), + } +} + +async fn derive_target_hg_changesets( + ctx: &CoreContext, + maybe_target_cs_id: Option, + commit_syncer: &CommitSyncer, +) -> Result<(), Error> { + match maybe_target_cs_id { + Some(target_cs_id) => { + let hg_cs_id = commit_syncer + .get_target_repo() + .derive_hg_changeset(ctx, target_cs_id) + .await?; + info!( + ctx.logger(), + "Hg cs id {} derived for {}", hg_cs_id, target_cs_id + ); + Ok(()) + } + None => Ok(()), + } +} + +struct Delay { + delay_secs: i64, + remaining_entries: u64, +} + +impl Delay { + fn no_delay() -> Self { + Self { + delay_secs: 0, + remaining_entries: 0, + } + } +} + +// Returns logs delay and returns the number of remaining bookmark update log entries +async fn calculate_delay( + ctx: &CoreContext, + commit_syncer: &CommitSyncer, + target_repo_dbs: &TargetRepoDbs, +) -> Result { + let TargetRepoDbs { ref counters, .. } = target_repo_dbs; + let source_repo_id = commit_syncer.get_source_repo().repo_identity().id(); + + let counter_name = format_counter(&source_repo_id); + let maybe_counter = counters.get_counter(ctx, &counter_name).await?; + let counter = maybe_counter + .ok_or_else(|| format_err!("{} counter not found", counter_name))? + .try_into()?; + let source_repo = commit_syncer.get_source_repo(); + let next_entry = source_repo + .bookmark_update_log() + .read_next_bookmark_log_entries(ctx.clone(), counter, 1, Freshness::MostRecent) + .try_collect::>(); + let remaining_entries = source_repo + .bookmark_update_log() + .count_further_bookmark_log_entries(ctx.clone(), counter, None); + + let (next_entry, remaining_entries) = try_join!(next_entry, remaining_entries)?; + let delay_secs = next_entry + .first() + .map_or(0, |entry| entry.timestamp.since_seconds()); + + Ok(Delay { + delay_secs, + remaining_entries, + }) +} + +fn log_delay(ctx: &CoreContext, delay: &Delay, source_repo_name: &str, target_repo_name: &str) { + STATS::remaining_entries.set_value( + ctx.fb, + delay.remaining_entries as i64, + (source_repo_name.to_owned(), target_repo_name.to_owned()), + ); + STATS::delay_secs.set_value( + ctx.fb, + delay.delay_secs, + (source_repo_name.to_owned(), target_repo_name.to_owned()), + ); +} diff --git a/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/sharding.rs b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/sharding.rs new file mode 100644 index 0000000000000..50a24ea902b84 --- /dev/null +++ b/eden/mononoke/commit_rewriting/backsyncer/backsyncer_cmd/sharding.rs @@ -0,0 +1,203 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ + +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use anyhow::bail; +use anyhow::Context; +use async_trait::async_trait; +use clap::Arg; +use clap::SubCommand; +use cmdlib::args; +use cmdlib::args::MononokeMatches; +use executor_lib::RepoShardedProcess; +use executor_lib::RepoShardedProcessExecutor; +use fbinit::FacebookInit; +use sharding_ext::RepoShard; +use slog::error; +use slog::info; +use tokio::runtime::Runtime; + +use crate::run::run_backsyncer; + +pub(crate) const DEFAULT_SHARDED_SCOPE_NAME: &str = "global"; +pub(crate) const SM_CLEANUP_TIMEOUT_SECS: u64 = 120; +pub(crate) const APP_NAME: &str = "backsyncer cmd-line tool"; + +pub(crate) const ARG_MODE_BACKSYNC_FOREVER: &str = "backsync-forever"; +pub(crate) const ARG_MODE_BACKSYNC_ALL: &str = "backsync-all"; +pub(crate) const ARG_MODE_BACKSYNC_COMMITS: &str = "backsync-commits"; +pub(crate) const ARG_BATCH_SIZE: &str = "batch-size"; +pub(crate) const ARG_INPUT_FILE: &str = "INPUT_FILE"; + +pub(crate) const SCUBA_TABLE: &str = "mononoke_xrepo_backsync"; + +/// Struct representing the Back Syncer BP. +pub struct BacksyncProcess { + pub(crate) matches: Arc>, + pub(crate) fb: FacebookInit, + _runtime: Runtime, +} + +impl BacksyncProcess { + pub(crate) fn new(fb: FacebookInit) -> anyhow::Result { + let app = args::MononokeAppBuilder::new(APP_NAME) + .with_fb303_args() + .with_source_and_target_repos() + .with_dynamic_repos() + .with_scribe_args() + .with_default_scuba_dataset(SCUBA_TABLE) + .with_scuba_logging_args() + .build(); + let backsync_forever_subcommand = SubCommand::with_name(ARG_MODE_BACKSYNC_FOREVER) + .about("Backsyncs all new bookmark moves"); + + let sync_loop = SubCommand::with_name(ARG_MODE_BACKSYNC_COMMITS) + .about("Syncs all commits from the file") + .arg( + Arg::with_name(ARG_INPUT_FILE) + .takes_value(true) + .required(true) + .help("list of hg commits to backsync"), + ) + .arg( + Arg::with_name(ARG_BATCH_SIZE) + .long(ARG_BATCH_SIZE) + .takes_value(true) + .required(false) + .help("how many commits to backsync at once"), + ); + + let backsync_all_subcommand = SubCommand::with_name(ARG_MODE_BACKSYNC_ALL) + .about("Backsyncs all new bookmark moves once"); + let app = app + .subcommand(backsync_all_subcommand) + .subcommand(backsync_forever_subcommand) + .subcommand(sync_loop); + let (matches, _runtime) = app.get_matches(fb)?; + let matches = Arc::new(matches); + Ok(Self { + matches, + fb, + _runtime, + }) + } +} + +#[async_trait] +impl RepoShardedProcess for BacksyncProcess { + async fn setup(&self, repo: &RepoShard) -> anyhow::Result> { + let logger = self.matches.logger(); + // For backsyncer, two repos (i.e. source and target) are required as input + let source_repo_name = repo.repo_name.clone(); + let target_repo_name = match repo.target_repo_name.clone() { + Some(repo_name) => repo_name, + None => { + let details = format!( + "Only source repo name {} provided, target repo name missing in {}", + source_repo_name, repo + ); + error!(logger, "{}", details); + bail!("{}", details) + } + }; + info!( + logger, + "Setting up back syncer command from repo {} to repo {}", + source_repo_name, + target_repo_name, + ); + let details = format!( + "Completed back syncer command setup from repo {} to repo {}", + source_repo_name, target_repo_name + ); + let executor = BacksyncProcessExecutor::new( + self.fb, + Arc::clone(&self.matches), + source_repo_name, + target_repo_name, + ); + info!(logger, "{}", details); + Ok(Arc::new(executor)) + } +} + +/// Struct representing the execution of the Back Syncer +/// BP over the context of a provided repos. +pub struct BacksyncProcessExecutor { + fb: FacebookInit, + matches: Arc>, + source_repo_name: String, + target_repo_name: String, + cancellation_requested: Arc, +} + +impl BacksyncProcessExecutor { + pub(crate) fn new( + fb: FacebookInit, + matches: Arc>, + source_repo_name: String, + target_repo_name: String, + // ctx: Arc, + // app: Arc, + // repo_args: SourceAndTargetRepoArgs, + ) -> Self { + Self { + fb, + matches, + source_repo_name, + target_repo_name, + cancellation_requested: Arc::new(AtomicBool::new(false)), + } + } +} + +#[async_trait] +impl RepoShardedProcessExecutor for BacksyncProcessExecutor { + async fn execute(&self) -> anyhow::Result<()> { + info!( + self.matches.logger(), + "Initiating back syncer command execution for repo pair {}-{}", + &self.source_repo_name, + &self.target_repo_name, + ); + run_backsyncer( + self.fb, + Arc::clone(&self.matches), + self.source_repo_name.clone(), + self.target_repo_name.clone(), + Arc::clone(&self.cancellation_requested), + ) + .await + .with_context(|| { + format!( + "Error during back syncer command execution for repo pair {}-{}", + &self.source_repo_name, &self.target_repo_name, + ) + })?; + info!( + self.matches.logger(), + "Finished back syncer command execution for repo pair {}-{}", + &self.source_repo_name, + self.target_repo_name + ); + Ok(()) + } + + async fn stop(&self) -> anyhow::Result<()> { + info!( + self.matches.logger(), + "Terminating back syncer command execution for repo pair {}-{}", + &self.source_repo_name, + self.target_repo_name, + ); + self.cancellation_requested.store(true, Ordering::Relaxed); + Ok(()) + } +}