-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Real-time Sync: Add base methods and traits #509
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice start!
My main comments are:
- Removing methods from the sync trait that are not used externally (not sure we need an interface).
- More atomic/consistent racking mechanism for unsynced record
- Association mechanism of server record id for a swap.
- Possibly separate the synced data into different tables.
@@ -111,6 +115,10 @@ impl SendSwapHandler { | |||
// Boltz has detected the lockup in the mempool, we can speed up | |||
// the claim by doing so cooperatively | |||
Ok(SubSwapStates::TransactionClaimPending) => { | |||
if !swap.is_local { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True we don't want to claim but we still want to update the state somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I'll add a condition at the top of the stream listener that still updates the state on update (only when the swap is non-local)
@@ -121,6 +129,10 @@ impl SendSwapHandler { | |||
|
|||
// Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx | |||
Ok(SubSwapStates::TransactionClaimed) => { | |||
if !swap.is_local { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here, we need to update the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #509 (comment)
while let Some(message) = stream.next().await { | ||
match message { | ||
Ok(record) => { | ||
if let Err(err) = cloned.apply_changes(&[record]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the apply_changes in the interface if it is only used internally by the sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's going to be needed in the SDK init hook when we call get_latest_changes
(see #509 (comment)). I tried making the OPs as atomic as possible by doing exactly what they say and nothing else. So the caller will have to call get_latest_changes
to pull and then apply
.
This is more of a model decision and kind of unnecessary for the SDK application, so I can just make get_latest_changes
into something like pull_latest_changes
and ensure that this also calls apply internally, so we don't have to expose it. Both are fine by me 😄
lib/core/src/persist/sync.rs
Outdated
pub(crate) fn apply_record(&self, record: DecryptedRecord) -> Result<()> { | ||
match record.data { | ||
SyncData::Chain(chain_data) => self.insert_chain_swap(&chain_data.into()), | ||
SyncData::Send(send_data) => self.insert_send_swap(&send_data.into()), | ||
SyncData::Receive(receive_data) => self.insert_receive_swap(&receive_data.into()), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function implementation looks like business logic, seems that it belongs inside the sync service itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, initially I thought the logic would be longer, though since it's just a match I agree it can be moved into the service 👍
Ok(()) | ||
} | ||
|
||
async fn get_changes_since(&self, from_id: i64) -> Result<Vec<Record>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who should call get_changes_sync? If it is internal in the sync service perhaps we don't need it in the trait?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #509 (comment)
lib/core/src/sync/mod.rs
Outdated
return Err(anyhow!("Cannot run `set_record`: client not connected")); | ||
}; | ||
|
||
let id = self.persister.get_latest_record_id()? + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have something unique here across devices as the id propagated to the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the client is synced, we are sure that the new tip is always unique. There is an issue with concurrency though when, for example, two clients try to send a (different) payment and each calls set_record
before the change gets caught by the listening stream. I think a trivial fix to this is to simply retry X number of times until it succeeds (perhaps doing this in a separate thread so that it doesn't block the payment response).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two identifiers that have different purpose:
- record_id - which is created and set by the client, it shouldn't be synced, it only should be unique and we can use GUID for example. Is is used to identify the record to update/sync.
- version - which is set by the server and is used to detect changes.
Do you think we can use GUID and not sync this id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version - which is set by the server and is used to detect changes.
This is set by the client, to show which schema version it is currently using. That was the change we settled on, correct?
which is created and set by the client, it shouldn't be synced, it only should be unique and we can use GUID for example. Is is used to identify the record to update/sync.
Why do we need a separate id/GUID even? The implementation is currently pretty simple:
- User sends record with an id (local_tip + 1)
- Server checks if given id can be added to the tip (local_tip + 1 == current_tip + 1). If not, return error (client then has to re-sync and try again)
- If it is added, return a response with the new_id (= the same as the one in the request payload, so we update local_tip to local_tip + 1)
This ensures that the client has to be synced, and also that records cannot be added at random.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we don't need to ensure the ientbis synced in order to add a record. Also what do you mean add records at random? Is there any requirement here?
I am seeing a mechanism of retry on the client that can be avoided by using client generated IDs. Do you see any disadvantage in the suggested approach or any advantage in the current approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole reason behind numbered IDs is that we can fetch from a given height. If we don't use a numbered ID, then we have to create a record timestamp on the server's db and then use that to index.
I agree though that the id in the set record payload is pretty useless and can be avoided. I've re-thought about the use-cases and there seems to be no issue if a client sets a record even when they're behind.
So I'll change SetRecordRequest
so it only passes data and version, and keep the new_id
in SetRecordResponse
which can then be used by the client to update the local tip.
Edit: Done in c520aba. Logic now looks simpler and neater.
swap_id: value.id, | ||
preimage: value.preimage, | ||
create_response_json: value.create_response_json, | ||
direction: value.direction, | ||
lockup_address: value.lockup_address, | ||
claim_address: value.claim_address, | ||
claim_fees_sat: value.claim_fees_sat, | ||
claim_private_key: value.claim_private_key, | ||
refund_private_key: value.refund_private_key, | ||
timeout_block_height: value.timeout_block_height, | ||
payer_amount_sat: value.payer_amount_sat, | ||
receiver_amount_sat: value.receiver_amount_sat, | ||
accept_zero_conf: value.accept_zero_conf, | ||
created_at: value.created_at, | ||
description: value.description, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about separating the synced data into a different table for each swap?
We will need to add join for each query but perhaps it makes it more clear and separated what persistent data we sync/udpate and what data is in responsibility of the client to update locally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your point, I think it definitely helps making the database structure have more sense (we can look at the schema and have a better idea of how the implementation is going to look like).
Is this necessary from a code standpoint, though? Taking a look at the sync models IMO is enough to do what you said: distinguish which fields will be synced and which won't. Since these models are only necessary for transport (meaning that, in the end, we still have to convert to a swap to insert to the tables), I don't see the need to reflect these abstractions to the database.
}; | ||
self.persister.insert_send_swap(&swap)?; | ||
self.sync_service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not atomic so we can end up with some data that is not synced upstream.
Perhaps we can have an updated_timestamp and synced_timestamp and always query for these records that have their updated_timestamp larger than the synced_timestamp as candidates to sync?
Also where is the server record id is saved in the client to associate a swap with a record id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a new method that needs to be added in the interface. As you said in the comment above, get_changes_since
is an internal method. I will add a get_latest_changes
, which uses the client's latestRecordId
.
For the first question, that is what get_latest_changes
will be for. I haven't added it to the SDK init yet, though the idea is that on startup the sync service will send its latestRecordId
(stored in a new settings
table on the DB, it can be renamed afterwards), and try and pull all the upstream changes.
This mechanism + listening for new updates at startup ensures that, when we later call set_record
, we are always synced with upstream and can actually push changes. If we are not, set_record returns error (and we probably want to trigger get_latest_changes
again).
b96c4b6
to
d47fc3d
Compare
We ensure that the prost-generated files match those which have been committed
We ensure the client is cloned as it is a lightweight clone and enables concurrency. See hyperium/tonic#33
d47fc3d
to
1c8a909
Compare
e1ec475
to
c520aba
Compare
revision: Option<i64>, | ||
signer: Arc<Box<dyn Signer>>, | ||
) -> Result<Self, anyhow::Error> { | ||
let id = uuid::Uuid::new_v4().to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we always create a new id and not saving it in the db (associate it with the swap).
the whole point of this client generated id is so we can associate later updated data to the right record in the db.
} | ||
|
||
async fn set_record(&self, req: SetRecordRequest) -> Result<SetRecordReply> { | ||
let Some(mut client) = self.inner.lock().await.clone() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lock the client until the end of the invocation. I think it is better to extract out getting the client to its own function: get_client()
so the lock is released right after the get.
self.client.connect(self.connect_url.clone()).await | ||
} | ||
|
||
pub(crate) async fn listen(self: Arc<Self>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: from the client perspective it makes sense to have listen_changes but from the Syncer perhaps just start ? It does not only listen but also apply the changes and ensure we are up to sync.
Closes #501, closes #502, closes #503, closes #504, closes #505;
TODO: Add signer methods from #495TODO: Add hooks to sync service across the SDK