diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 6e894f301..6a703249c 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -668,82 +668,57 @@ where if need_commitment_signed { if !state.tlc_state.waiting_ack { - eprintln!("sending commitment_signed after first ack"); self.handle_commitment_signed_command(state)?; - } else { - eprintln!("waiting ack for commitment_signed ............."); } } // flush remove tlc for received tlcs after replying ack for peer - let mut settled_tlcs = vec![]; - for tlc in state.tlc_state.received_tlcs.tlcs.iter_mut() { - if let Some((_removed_at, reason)) = &tlc.removed_at { - if matches!( - tlc.status, - TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed) - ) { - settled_tlcs.push((tlc.tlc_id, reason.clone())); - } - } - } - for (tlc_id, reason) in settled_tlcs.iter() { - eprintln!("apply received tlc remove: {:?}", tlc_id); - let _ = self - .apply_remove_tlc_operation(myself, state, *tlc_id, reason.clone()) - .await; - } - + self.apply_settled_remvoe_tlcs(myself, state, true).await; Ok(()) } - async fn flush_tlc_operations( + async fn apply_settled_remvoe_tlcs( &self, myself: &ActorRef, state: &mut ChannelActorState, + inbound: bool, ) { - state.tlc_state.debug(); - - let mut settled_tlcs = vec![]; - for tlc in state.tlc_state.received_tlcs.tlcs.iter_mut() { - if let Some((_removed_at, reason)) = &tlc.removed_at { - if matches!( - tlc.status, - TlcStatus::Inbound(InboundTlctatus::LocalRemoved) - ) { - settled_tlcs.push((tlc.tlc_id, reason.clone())); + let pending_tlcs = if inbound { + state.tlc_state.received_tlcs.tlcs.iter_mut() + } else { + state.tlc_state.offered_tlcs.tlcs.iter_mut() + }; + let settled_tlcs: Vec<_> = pending_tlcs + .filter_map(|tlc| { + if let Some((_removed_at, reason)) = &tlc.removed_at { + if matches!( + tlc.status, + TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed) + | TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) + ) { + return Some((tlc.tlc_id, reason.clone())); + } } - } - } - for (tlc_id, _reason) in settled_tlcs.iter() { - let tlc = state.tlc_state.get_mut(tlc_id).expect("tlc"); - tlc.status = TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed); - } + None + }) + .collect(); - // flush outbound tlcs - let mut settled_tlcs = vec![]; - for tlc in state.tlc_state.offered_tlcs.tlcs.iter_mut() { - if let Some((_removed_at, reason)) = &tlc.removed_at { - if matches!( - tlc.status, - TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed) - ) { - settled_tlcs.push((tlc.tlc_id, reason.clone())); - } - } - } - for (tlc_id, reason) in settled_tlcs.iter() { - eprintln!("apply offered tlc remove: {:?}", tlc_id); - let _ = self - .apply_remove_tlc_operation(myself, state, *tlc_id, reason.clone()) - .await; + for (tlc_id, reason) in settled_tlcs { + self.apply_remove_tlc_operation(myself, state, tlc_id, reason) + .await + .expect("expect remove tlc success"); } + } + async fn flush_tlc_operations( + &self, + myself: &ActorRef, + state: &mut ChannelActorState, + ) { let apply_tlcs = state.tlc_state.get_committed_received_tlcs(); eprintln!("flushing {} tlcs", apply_tlcs.len()); for add_tlc in apply_tlcs { if add_tlc.removed_at.is_some() { - eprintln!("tlc already removed, skip"); continue; } @@ -783,6 +758,8 @@ where } } + // flush outbound tlcs + self.apply_settled_remvoe_tlcs(myself, state, false).await; eprintln!("end flusing now ..................."); } @@ -815,7 +792,7 @@ where &self, myself: &ActorRef, state: &mut ChannelActorState, - tlc_id: u64, + tlc_id: TLCId, ) { let tlc_info = state.get_received_tlc(tlc_id).expect("expect tlc"); let preimage = tlc_info @@ -906,7 +883,7 @@ where // we don't need to settle down the tlc if it is not the last hop here, // some e2e tests are calling AddTlc manually, so we can not use onion packet to // check whether it's the last hop here, maybe need to revisit in future. - self.try_to_settle_down_tlc(myself, state, add_tlc.tlc_id.into()) + self.try_to_settle_down_tlc(myself, state, add_tlc.tlc_id) .await; warn!("finished check tlc for peer message: {:?}", &add_tlc.tlc_id); @@ -1072,7 +1049,6 @@ where ) -> Result<(), ProcessingChannelError> { let channel_id = state.get_id(); let remove_reason = reason.clone(); - eprintln!("apply remove tlc operation: {:?}", tlc_id); let tlc_info = state .remove_tlc_with_reason(tlc_id, &remove_reason) @@ -1097,7 +1073,6 @@ where // only the original sender of the TLC should send `TlcRemoveReceived` event // because only the original sender cares about the TLC event to settle the payment if tlc_info.is_offered() { - eprintln!("sending TlcRemoveReceived ...: {:?}", tlc_info.payment_hash); self.network .send_message(NetworkActorMessage::new_event( NetworkActorEvent::TlcRemoveReceived( @@ -1108,10 +1083,6 @@ where .expect("myself alive"); } } else { - eprintln!( - "now we need to relay remove tlc ...: {:?} tlc_id: {:?}", - tlc_info.payment_hash, tlc_info.tlc_id - ); // relay RemoveTlc to previous channel if needed self.try_to_relay_remove_tlc(myself, state, &tlc_info, remove_reason) .await; @@ -1448,9 +1419,10 @@ where tlc_id: TLCId, reason: RemoveTlcReason, ) { - state.tlc_state.set_tlc_pending_remove(tlc_id, reason); - self.check_and_apply_retryable_remove_tlcs(myself, state) - .await; + state.tlc_state.insert_pending_remove_tlc(tlc_id, reason); + myself + .send_message(ChannelActorMessage::Event(ChannelEvent::CheckTlcSetdown)) + .expect("myself alive"); } pub async fn register_retryable_relay_tlc_remove( @@ -1464,8 +1436,9 @@ where state .tlc_state .insert_relay_tlc_remove(channel_id, tlc_id, reason); - self.check_and_apply_retryable_remove_tlcs(myself, state) - .await; + myself + .send_message(ChannelActorMessage::Event(ChannelEvent::CheckTlcSetdown)) + .expect("myself alive"); } pub async fn check_and_apply_retryable_remove_tlcs( @@ -2455,16 +2428,17 @@ impl TlcState { } pub fn get(&self, tlc_id: &TLCId) -> Option<&TlcInfo> { - self.offered_tlcs - .tlcs - .iter() - .find(|tlc| tlc.tlc_id == *tlc_id) - .or_else(|| { - self.received_tlcs - .tlcs - .iter() - .find(|tlc| tlc.tlc_id == *tlc_id) - }) + if tlc_id.is_offered() { + self.offered_tlcs + .tlcs + .iter() + .find(|tlc| tlc.tlc_id == *tlc_id) + } else { + self.received_tlcs + .tlcs + .iter() + .find(|tlc| tlc.tlc_id == *tlc_id) + } } pub fn get_committed_received_tlcs(&self) -> Vec { @@ -2492,7 +2466,7 @@ impl TlcState { self.waiting_ack = waiting_ack; } - pub fn set_tlc_pending_remove(&mut self, tlc_id: TLCId, reason: RemoveTlcReason) { + pub fn insert_pending_remove_tlc(&mut self, tlc_id: TLCId, reason: RemoveTlcReason) { self.retryable_remove_tlcs .push(RetryableRemoveTlc::RemoveTlc(tlc_id, reason)); } @@ -2684,6 +2658,9 @@ impl TlcState { InboundTlctatus::AnnounceWaitAck => { tlc.status = TlcStatus::Inbound(InboundTlctatus::Committed); } + InboundTlctatus::LocalRemoved => { + tlc.status = TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed); + } _ => {} } } @@ -4272,12 +4249,12 @@ impl ChannelActorState { self.get_current_commitment_number(for_remote) + 1 } - pub fn get_next_offering_tlc_id(&self) -> u64 { - self.tlc_state.get_next_offering() + pub fn get_next_offering_tlc_id(&self) -> TLCId { + TLCId::Offered(self.tlc_state.get_next_offering()) } - pub fn get_next_received_tlc_id(&self) -> u64 { - self.tlc_state.get_next_received() + pub fn get_next_received_tlc_id(&self) -> TLCId { + TLCId::Received(self.tlc_state.get_next_received()) } pub fn increment_next_offered_tlc_id(&mut self) { @@ -4288,12 +4265,12 @@ impl ChannelActorState { self.tlc_state.increment_received(); } - pub fn get_offered_tlc(&self, tlc_id: u64) -> Option<&TlcInfo> { - self.tlc_state.get(&TLCId::Offered(tlc_id)) + pub fn get_offered_tlc(&self, tlc_id: TLCId) -> Option<&TlcInfo> { + self.tlc_state.get(&tlc_id) } - pub fn get_received_tlc(&self, tlc_id: u64) -> Option<&TlcInfo> { - self.tlc_state.get(&TLCId::Received(tlc_id)) + pub fn get_received_tlc(&self, tlc_id: TLCId) -> Option<&TlcInfo> { + self.tlc_state.get(&tlc_id) } pub(crate) fn set_received_tlc_preimage(&mut self, tlc_id: u64, preimage: Option) { @@ -4303,6 +4280,21 @@ impl ChannelActorState { } pub fn check_insert_tlc(&mut self, tlc: &TlcInfo) -> Result<(), ProcessingChannelError> { + let next_tlc_id = if tlc.is_offered() { + self.get_next_offering_tlc_id() + } else { + self.get_next_received_tlc_id() + }; + if tlc.tlc_id != next_tlc_id { + eprintln!( + "Received tlc id {:?} is not the expected next id {:?}", + tlc.tlc_id, next_tlc_id + ); + return Err(ProcessingChannelError::InvalidParameter(format!( + "Received tlc id {:?} is not the expected next id {:?}", + tlc.tlc_id, next_tlc_id + ))); + } let payment_hash = tlc.payment_hash; if let Some(tlc) = self .tlc_state @@ -4555,7 +4547,8 @@ impl ChannelActorState { if let Some(remove_at) = &tlc.removed_at { let mut include = false; match tlc.status { - TlcStatus::Inbound(InboundTlctatus::LocalRemoved) => { + TlcStatus::Inbound(InboundTlctatus::LocalRemoved) + | TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed) => { include = for_remote; } TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitPrevAck) @@ -4845,16 +4838,16 @@ impl ChannelActorState { } fn create_outbounding_tlc(&self, command: AddTlcCommand) -> TlcInfo { - let id = self.get_next_offering_tlc_id(); + let tlc_id = self.get_next_offering_tlc_id(); assert!( - self.get_offered_tlc(id).is_none(), + self.get_offered_tlc(tlc_id).is_none(), "Must not have the same id in pending offered tlcs" ); TlcInfo { channel_id: self.get_id(), status: TlcStatus::Outbound(OutboundTlcStatus::LocalAnnounced), - tlc_id: TLCId::Offered(id), + tlc_id, amount: command.amount, payment_hash: command.payment_hash, expiry: command.expiry, @@ -5701,13 +5694,6 @@ impl ChannelActorState { eprintln!("handle revoke and ack peer message: {:?}", &revocation_data); let need_commitment_signed = self.tlc_state.update_for_revoke_and_ack(); - // if need_commitment_signed { - // myself - // .send_message(ChannelActorMessage::Command( - // ChannelCommand::CommitmentSigned(), - // )) - // .expect(ASSUME_NETWORK_ACTOR_ALIVE); - // } network .send_message(NetworkActorMessage::new_notification( NetworkServiceEvent::RevokeAndAckReceived(