Skip to content

Commit

Permalink
add save points (#2)
Browse files Browse the repository at this point in the history
* file_system: limit thread_io target os (tikv#11715)

* limit thread_io target os

Signed-off-by: MuZhou233 <[email protected]>

* limit thread_io target os. typo. close tikv#11698

Signed-off-by: MuZhou233 <[email protected]>

Co-authored-by: Yilin Chen <[email protected]>

* backup: return api-version to BR when backup (tikv#11704)

* Update kvproto

Signed-off-by: Peng Guanwen <[email protected]>

* Return api-version for br

ref tikv#10938

Signed-off-by: Peng Guanwen <[email protected]>

* Reformat code

Signed-off-by: Peng Guanwen <[email protected]>

* Update components/external_storage/export/src/export.rs

Signed-off-by: Peng Guanwen <[email protected]>

Co-authored-by: Andy Lok <[email protected]>

* format code

Signed-off-by: Peng Guanwen <[email protected]>

Co-authored-by: Andy Lok <[email protected]>

* add savepoints

Signed-off-by: disksing <[email protected]>

Co-authored-by: 沐 <[email protected]>
Co-authored-by: Yilin Chen <[email protected]>
Co-authored-by: Peng Guanwen <[email protected]>
Co-authored-by: Andy Lok <[email protected]>
  • Loading branch information
5 people authored Jan 6, 2022
1 parent 27ebbf4 commit ff005bb
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 155 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7
# When you modify TiKV cooperatively with kvproto, this will be useful to submit the PR to TiKV and the PR to
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
# [patch.'https://github.com/pingcap/kvproto']
# kvproto = {git = "https://github.com/your_github_id/kvproto", branch="your_branch"}
[patch.'https://github.com/pingcap/kvproto']
kvproto = {path = "../kvproto"}

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
9 changes: 8 additions & 1 deletion components/backup/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::channel::mpsc::*;
use futures::Future;
use kvproto::brpb::*;
use kvproto::encryptionpb::EncryptionMethod;
use kvproto::kvrpcpb::{Context, IsolationLevel};
use kvproto::kvrpcpb::{ApiVersion, Context, IsolationLevel};
use kvproto::metapb::*;
use online_config::OnlineConfig;

Expand Down Expand Up @@ -197,6 +197,7 @@ async fn save_backup_file_worker(
rx: async_channel::Receiver<InMemBackupFiles>,
tx: UnboundedSender<BackupResponse>,
storage: Arc<dyn ExternalStorage>,
api_version: ApiVersion,
) {
while let Ok(msg) = rx.recv().await {
let files = if msg.files.need_flush_keys() {
Expand Down Expand Up @@ -234,6 +235,7 @@ async fn save_backup_file_worker(
}
response.set_start_key(msg.start_key.clone());
response.set_end_key(msg.end_key.clone());
response.set_api_version(api_version);
if let Err(e) = tx.unbounded_send(response) {
error_unknown!(?e; "backup failed to send response"; "region" => ?msg.region,
"start_key" => &log_wrappers::Value::key(&msg.start_key),
Expand Down Expand Up @@ -611,6 +613,7 @@ pub struct Endpoint<E: Engine, R: RegionInfoProvider + Clone + 'static> {
config_manager: ConfigManager,
concurrency_manager: ConcurrencyManager,
softlimit: SoftLimitKeeper,
api_version: ApiVersion,

pub(crate) engine: E,
pub(crate) region_info: R,
Expand Down Expand Up @@ -795,6 +798,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
db: Arc<DB>,
config: BackupConfig,
concurrency_manager: ConcurrencyManager,
api_version: ApiVersion,
) -> Endpoint<E, R> {
let pool = ControlThreadPool::new();
let rt = create_tokio_runtime(config.io_thread_size, "backup-io").unwrap();
Expand All @@ -811,6 +815,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
softlimit,
config_manager,
concurrency_manager,
api_version,
}
}

Expand Down Expand Up @@ -1025,6 +1030,7 @@ impl<E: Engine, R: RegionInfoProvider + Clone + 'static> Endpoint<E, R> {
rx.clone(),
resp.clone(),
backend.clone(),
self.api_version,
));
}
}
Expand Down Expand Up @@ -1236,6 +1242,7 @@ pub mod tests {
..Default::default()
},
concurrency_manager,
ApiVersion::V1,
),
)
}
Expand Down
2 changes: 1 addition & 1 deletion components/external_storage/export/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn create_backend_inner(
return Err(bad_backend(Backend::CloudDynamic(dyn_backend.clone())));
}
},
#[cfg(not(any(feature = "cloud-gcp", feature = "cloud-aws")))]
#[allow(unreachable_patterns)]
_ => return Err(bad_backend(backend.clone())),
};
record_storage_create(start, &*storage);
Expand Down
Loading

0 comments on commit ff005bb

Please sign in to comment.