Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Real-time Sync: Add base methods and traits #509

Open
wants to merge 34 commits into
base: main
Choose a base branch
from

Conversation

hydra-yse
Copy link
Collaborator

@hydra-yse hydra-yse commented Sep 30, 2024

Closes #501, closes #502, closes #503, closes #504, closes #505;

TODO: Add signer methods from #495

TODO: Add hooks to sync service across the SDK

Copy link
Member

@roeierez roeierez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice start!
My main comments are:

  1. Removing methods from the sync trait that are not used externally (not sure we need an interface).
  2. More atomic/consistent racking mechanism for unsynced record
  3. Association mechanism of server record id for a swap.
  4. Possibly separate the synced data into different tables.

@@ -111,6 +115,10 @@ impl SendSwapHandler {
// Boltz has detected the lockup in the mempool, we can speed up
// the claim by doing so cooperatively
Ok(SubSwapStates::TransactionClaimPending) => {
if !swap.is_local {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True we don't want to claim but we still want to update the state somehow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I'll add a condition at the top of the stream listener that still updates the state on update (only when the swap is non-local)

@@ -121,6 +129,10 @@ impl SendSwapHandler {

// Boltz announced they successfully broadcast the (cooperative or non-cooperative) claim tx
Ok(SubSwapStates::TransactionClaimed) => {
if !swap.is_local {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, we need to update the state.

Copy link
Collaborator Author

@hydra-yse hydra-yse Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while let Some(message) = stream.next().await {
match message {
Ok(record) => {
if let Err(err) = cloned.apply_changes(&[record]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the apply_changes in the interface if it is only used internally by the sync?

Copy link
Collaborator Author

@hydra-yse hydra-yse Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be needed in the SDK init hook when we call get_latest_changes (see #509 (comment)). I tried making the OPs as atomic as possible by doing exactly what they say and nothing else. So the caller will have to call get_latest_changes to pull and then apply.

This is more of a model decision and kind of unnecessary for the SDK application, so I can just make get_latest_changes into something like pull_latest_changes and ensure that this also calls apply internally, so we don't have to expose it. Both are fine by me 😄

Comment on lines 93 to 101
pub(crate) fn apply_record(&self, record: DecryptedRecord) -> Result<()> {
match record.data {
SyncData::Chain(chain_data) => self.insert_chain_swap(&chain_data.into()),
SyncData::Send(send_data) => self.insert_send_swap(&send_data.into()),
SyncData::Receive(receive_data) => self.insert_receive_swap(&receive_data.into()),
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function implementation looks like business logic, seems that it belongs inside the sync service itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially I thought the logic would be longer, though since it's just a match I agree it can be moved into the service 👍

Ok(())
}

async fn get_changes_since(&self, from_id: i64) -> Result<Vec<Record>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who should call get_changes_sync? If it is internal in the sync service perhaps we don't need it in the trait?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Err(anyhow!("Cannot run `set_record`: client not connected"));
};

let id = self.persister.get_latest_record_id()? + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have something unique here across devices as the id propagated to the server.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the client is synced, we are sure that the new tip is always unique. There is an issue with concurrency though when, for example, two clients try to send a (different) payment and each calls set_record before the change gets caught by the listening stream. I think a trivial fix to this is to simply retry X number of times until it succeeds (perhaps doing this in a separate thread so that it doesn't block the payment response).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two identifiers that have different purpose:

  1. record_id - which is created and set by the client, it shouldn't be synced, it only should be unique and we can use GUID for example. Is is used to identify the record to update/sync.
  2. version - which is set by the server and is used to detect changes.

Do you think we can use GUID and not sync this id?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version - which is set by the server and is used to detect changes.

This is set by the client, to show which schema version it is currently using. That was the change we settled on, correct?

which is created and set by the client, it shouldn't be synced, it only should be unique and we can use GUID for example. Is is used to identify the record to update/sync.

Why do we need a separate id/GUID even? The implementation is currently pretty simple:

  • User sends record with an id (local_tip + 1)
  • Server checks if given id can be added to the tip (local_tip + 1 == current_tip + 1). If not, return error (client then has to re-sync and try again)
  • If it is added, return a response with the new_id (= the same as the one in the request payload, so we update local_tip to local_tip + 1)

This ensures that the client has to be synced, and also that records cannot be added at random.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we don't need to ensure the ientbis synced in order to add a record. Also what do you mean add records at random? Is there any requirement here?
I am seeing a mechanism of retry on the client that can be avoided by using client generated IDs. Do you see any disadvantage in the suggested approach or any advantage in the current approach?

Copy link
Collaborator Author

@hydra-yse hydra-yse Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole reason behind numbered IDs is that we can fetch from a given height. If we don't use a numbered ID, then we have to create a record timestamp on the server's db and then use that to index.

I agree though that the id in the set record payload is pretty useless and can be avoided. I've re-thought about the use-cases and there seems to be no issue if a client sets a record even when they're behind.

So I'll change SetRecordRequest so it only passes data and version, and keep the new_id in SetRecordResponse which can then be used by the client to update the local tip.

Edit: Done in c520aba. Logic now looks simpler and neater.

Comment on lines +37 to +54
swap_id: value.id,
preimage: value.preimage,
create_response_json: value.create_response_json,
direction: value.direction,
lockup_address: value.lockup_address,
claim_address: value.claim_address,
claim_fees_sat: value.claim_fees_sat,
claim_private_key: value.claim_private_key,
refund_private_key: value.refund_private_key,
timeout_block_height: value.timeout_block_height,
payer_amount_sat: value.payer_amount_sat,
receiver_amount_sat: value.receiver_amount_sat,
accept_zero_conf: value.accept_zero_conf,
created_at: value.created_at,
description: value.description,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about separating the synced data into a different table for each swap?
We will need to add join for each query but perhaps it makes it more clear and separated what persistent data we sync/udpate and what data is in responsibility of the client to update locally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your point, I think it definitely helps making the database structure have more sense (we can look at the schema and have a better idea of how the implementation is going to look like).

Is this necessary from a code standpoint, though? Taking a look at the sync models IMO is enough to do what you said: distinguish which fields will be synced and which won't. Since these models are only necessary for transport (meaning that, in the end, we still have to convert to a swap to insert to the tables), I don't see the need to reflect these abstractions to the database.

};
self.persister.insert_send_swap(&swap)?;
self.sync_service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not atomic so we can end up with some data that is not synced upstream.
Perhaps we can have an updated_timestamp and synced_timestamp and always query for these records that have their updated_timestamp larger than the synced_timestamp as candidates to sync?

Also where is the server record id is saved in the client to associate a swap with a record id?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a new method that needs to be added in the interface. As you said in the comment above, get_changes_since is an internal method. I will add a get_latest_changes, which uses the client's latestRecordId.

For the first question, that is what get_latest_changes will be for. I haven't added it to the SDK init yet, though the idea is that on startup the sync service will send its latestRecordId (stored in a new settings table on the DB, it can be renamed afterwards), and try and pull all the upstream changes.

This mechanism + listening for new updates at startup ensures that, when we later call set_record, we are always synced with upstream and can actually push changes. If we are not, set_record returns error (and we probably want to trigger get_latest_changes again).

@hydra-yse hydra-yse force-pushed the yse-rt-sync-methods branch 2 times, most recently from b96c4b6 to d47fc3d Compare October 17, 2024 06:40
@hydra-yse hydra-yse changed the base branch from yse-rt-sync-base to signer October 17, 2024 06:41
@hydra-yse hydra-yse changed the title Real-time Sync: Add base methods (get/set records) Real-time Sync: Add base methods and traits Oct 21, 2024
@hydra-yse hydra-yse changed the base branch from signer to main October 21, 2024 13:49
revision: Option<i64>,
signer: Arc<Box<dyn Signer>>,
) -> Result<Self, anyhow::Error> {
let id = uuid::Uuid::new_v4().to_string();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we always create a new id and not saving it in the db (associate it with the swap).
the whole point of this client generated id is so we can associate later updated data to the right record in the db.

}

async fn set_record(&self, req: SetRecordRequest) -> Result<SetRecordReply> {
let Some(mut client) = self.inner.lock().await.clone() else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lock the client until the end of the invocation. I think it is better to extract out getting the client to its own function: get_client() so the lock is released right after the get.

self.client.connect(self.connect_url.clone()).await
}

pub(crate) async fn listen(self: Arc<Self>) -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: from the client perspective it makes sense to have listen_changes but from the Syncer perhaps just start ? It does not only listen but also apply the changes and ensure we are up to sync.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants