Skip to content

Commit

Permalink
code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Dec 25, 2024
1 parent 5f779c1 commit 5f490d0
Showing 1 changed file with 85 additions and 99 deletions.
184 changes: 85 additions & 99 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,82 +668,57 @@ where

if need_commitment_signed {
if !state.tlc_state.waiting_ack {
eprintln!("sending commitment_signed after first ack");
self.handle_commitment_signed_command(state)?;
} else {
eprintln!("waiting ack for commitment_signed .............");
}
}

// flush remove tlc for received tlcs after replying ack for peer
let mut settled_tlcs = vec![];
for tlc in state.tlc_state.received_tlcs.tlcs.iter_mut() {
if let Some((_removed_at, reason)) = &tlc.removed_at {
if matches!(
tlc.status,
TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed)
) {
settled_tlcs.push((tlc.tlc_id, reason.clone()));
}
}
}
for (tlc_id, reason) in settled_tlcs.iter() {
eprintln!("apply received tlc remove: {:?}", tlc_id);
let _ = self
.apply_remove_tlc_operation(myself, state, *tlc_id, reason.clone())
.await;
}

self.apply_settled_remvoe_tlcs(myself, state, true).await;
Ok(())
}

async fn flush_tlc_operations(
async fn apply_settled_remvoe_tlcs(
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
inbound: bool,
) {
state.tlc_state.debug();

let mut settled_tlcs = vec![];
for tlc in state.tlc_state.received_tlcs.tlcs.iter_mut() {
if let Some((_removed_at, reason)) = &tlc.removed_at {
if matches!(
tlc.status,
TlcStatus::Inbound(InboundTlctatus::LocalRemoved)
) {
settled_tlcs.push((tlc.tlc_id, reason.clone()));
let pending_tlcs = if inbound {
state.tlc_state.received_tlcs.tlcs.iter_mut()
} else {
state.tlc_state.offered_tlcs.tlcs.iter_mut()
};
let settled_tlcs: Vec<_> = pending_tlcs
.filter_map(|tlc| {
if let Some((_removed_at, reason)) = &tlc.removed_at {
if matches!(
tlc.status,
TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed)
| TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed)
) {
return Some((tlc.tlc_id, reason.clone()));
}
}
}
}
for (tlc_id, _reason) in settled_tlcs.iter() {
let tlc = state.tlc_state.get_mut(tlc_id).expect("tlc");
tlc.status = TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed);
}
None
})
.collect();

// flush outbound tlcs
let mut settled_tlcs = vec![];
for tlc in state.tlc_state.offered_tlcs.tlcs.iter_mut() {
if let Some((_removed_at, reason)) = &tlc.removed_at {
if matches!(
tlc.status,
TlcStatus::Outbound(OutboundTlcStatus::RemoveAckConfirmed)
) {
settled_tlcs.push((tlc.tlc_id, reason.clone()));
}
}
}
for (tlc_id, reason) in settled_tlcs.iter() {
eprintln!("apply offered tlc remove: {:?}", tlc_id);
let _ = self
.apply_remove_tlc_operation(myself, state, *tlc_id, reason.clone())
.await;
for (tlc_id, reason) in settled_tlcs {
self.apply_remove_tlc_operation(myself, state, tlc_id, reason)
.await
.expect("expect remove tlc success");
}
}

async fn flush_tlc_operations(
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
) {
let apply_tlcs = state.tlc_state.get_committed_received_tlcs();
eprintln!("flushing {} tlcs", apply_tlcs.len());
for add_tlc in apply_tlcs {
if add_tlc.removed_at.is_some() {
eprintln!("tlc already removed, skip");
continue;
}

Expand Down Expand Up @@ -783,6 +758,8 @@ where
}
}

// flush outbound tlcs
self.apply_settled_remvoe_tlcs(myself, state, false).await;
eprintln!("end flusing now ...................");
}

Expand Down Expand Up @@ -815,7 +792,7 @@ where
&self,
myself: &ActorRef<ChannelActorMessage>,
state: &mut ChannelActorState,
tlc_id: u64,
tlc_id: TLCId,
) {
let tlc_info = state.get_received_tlc(tlc_id).expect("expect tlc");
let preimage = tlc_info
Expand Down Expand Up @@ -906,7 +883,7 @@ where
// we don't need to settle down the tlc if it is not the last hop here,
// some e2e tests are calling AddTlc manually, so we can not use onion packet to
// check whether it's the last hop here, maybe need to revisit in future.
self.try_to_settle_down_tlc(myself, state, add_tlc.tlc_id.into())
self.try_to_settle_down_tlc(myself, state, add_tlc.tlc_id)
.await;

warn!("finished check tlc for peer message: {:?}", &add_tlc.tlc_id);
Expand Down Expand Up @@ -1072,7 +1049,6 @@ where
) -> Result<(), ProcessingChannelError> {
let channel_id = state.get_id();
let remove_reason = reason.clone();
eprintln!("apply remove tlc operation: {:?}", tlc_id);

let tlc_info = state
.remove_tlc_with_reason(tlc_id, &remove_reason)
Expand All @@ -1097,7 +1073,6 @@ where
// only the original sender of the TLC should send `TlcRemoveReceived` event
// because only the original sender cares about the TLC event to settle the payment
if tlc_info.is_offered() {
eprintln!("sending TlcRemoveReceived ...: {:?}", tlc_info.payment_hash);
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::TlcRemoveReceived(
Expand All @@ -1108,10 +1083,6 @@ where
.expect("myself alive");
}
} else {
eprintln!(
"now we need to relay remove tlc ...: {:?} tlc_id: {:?}",
tlc_info.payment_hash, tlc_info.tlc_id
);
// relay RemoveTlc to previous channel if needed
self.try_to_relay_remove_tlc(myself, state, &tlc_info, remove_reason)
.await;
Expand Down Expand Up @@ -1448,9 +1419,10 @@ where
tlc_id: TLCId,
reason: RemoveTlcReason,
) {
state.tlc_state.set_tlc_pending_remove(tlc_id, reason);
self.check_and_apply_retryable_remove_tlcs(myself, state)
.await;
state.tlc_state.insert_pending_remove_tlc(tlc_id, reason);
myself
.send_message(ChannelActorMessage::Event(ChannelEvent::CheckTlcSetdown))
.expect("myself alive");
}

pub async fn register_retryable_relay_tlc_remove(
Expand All @@ -1464,8 +1436,9 @@ where
state
.tlc_state
.insert_relay_tlc_remove(channel_id, tlc_id, reason);
self.check_and_apply_retryable_remove_tlcs(myself, state)
.await;
myself
.send_message(ChannelActorMessage::Event(ChannelEvent::CheckTlcSetdown))
.expect("myself alive");
}

pub async fn check_and_apply_retryable_remove_tlcs(
Expand Down Expand Up @@ -2455,16 +2428,17 @@ impl TlcState {
}

pub fn get(&self, tlc_id: &TLCId) -> Option<&TlcInfo> {
self.offered_tlcs
.tlcs
.iter()
.find(|tlc| tlc.tlc_id == *tlc_id)
.or_else(|| {
self.received_tlcs
.tlcs
.iter()
.find(|tlc| tlc.tlc_id == *tlc_id)
})
if tlc_id.is_offered() {
self.offered_tlcs
.tlcs
.iter()
.find(|tlc| tlc.tlc_id == *tlc_id)
} else {
self.received_tlcs
.tlcs
.iter()
.find(|tlc| tlc.tlc_id == *tlc_id)
}
}

pub fn get_committed_received_tlcs(&self) -> Vec<TlcInfo> {
Expand Down Expand Up @@ -2492,7 +2466,7 @@ impl TlcState {
self.waiting_ack = waiting_ack;
}

pub fn set_tlc_pending_remove(&mut self, tlc_id: TLCId, reason: RemoveTlcReason) {
pub fn insert_pending_remove_tlc(&mut self, tlc_id: TLCId, reason: RemoveTlcReason) {
self.retryable_remove_tlcs
.push(RetryableRemoveTlc::RemoveTlc(tlc_id, reason));
}
Expand Down Expand Up @@ -2684,6 +2658,9 @@ impl TlcState {
InboundTlctatus::AnnounceWaitAck => {
tlc.status = TlcStatus::Inbound(InboundTlctatus::Committed);
}
InboundTlctatus::LocalRemoved => {
tlc.status = TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed);
}
_ => {}
}
}
Expand Down Expand Up @@ -4272,12 +4249,12 @@ impl ChannelActorState {
self.get_current_commitment_number(for_remote) + 1
}

pub fn get_next_offering_tlc_id(&self) -> u64 {
self.tlc_state.get_next_offering()
pub fn get_next_offering_tlc_id(&self) -> TLCId {
TLCId::Offered(self.tlc_state.get_next_offering())
}

pub fn get_next_received_tlc_id(&self) -> u64 {
self.tlc_state.get_next_received()
pub fn get_next_received_tlc_id(&self) -> TLCId {
TLCId::Received(self.tlc_state.get_next_received())
}

pub fn increment_next_offered_tlc_id(&mut self) {
Expand All @@ -4288,12 +4265,12 @@ impl ChannelActorState {
self.tlc_state.increment_received();
}

pub fn get_offered_tlc(&self, tlc_id: u64) -> Option<&TlcInfo> {
self.tlc_state.get(&TLCId::Offered(tlc_id))
pub fn get_offered_tlc(&self, tlc_id: TLCId) -> Option<&TlcInfo> {
self.tlc_state.get(&tlc_id)
}

pub fn get_received_tlc(&self, tlc_id: u64) -> Option<&TlcInfo> {
self.tlc_state.get(&TLCId::Received(tlc_id))
pub fn get_received_tlc(&self, tlc_id: TLCId) -> Option<&TlcInfo> {
self.tlc_state.get(&tlc_id)
}

pub(crate) fn set_received_tlc_preimage(&mut self, tlc_id: u64, preimage: Option<Hash256>) {
Expand All @@ -4303,6 +4280,21 @@ impl ChannelActorState {
}

pub fn check_insert_tlc(&mut self, tlc: &TlcInfo) -> Result<(), ProcessingChannelError> {
let next_tlc_id = if tlc.is_offered() {
self.get_next_offering_tlc_id()
} else {
self.get_next_received_tlc_id()
};
if tlc.tlc_id != next_tlc_id {
eprintln!(
"Received tlc id {:?} is not the expected next id {:?}",
tlc.tlc_id, next_tlc_id
);
return Err(ProcessingChannelError::InvalidParameter(format!(
"Received tlc id {:?} is not the expected next id {:?}",
tlc.tlc_id, next_tlc_id
)));
}
let payment_hash = tlc.payment_hash;
if let Some(tlc) = self
.tlc_state
Expand Down Expand Up @@ -4555,7 +4547,8 @@ impl ChannelActorState {
if let Some(remove_at) = &tlc.removed_at {
let mut include = false;
match tlc.status {
TlcStatus::Inbound(InboundTlctatus::LocalRemoved) => {
TlcStatus::Inbound(InboundTlctatus::LocalRemoved)
| TlcStatus::Inbound(InboundTlctatus::RemoveAckConfirmed) => {
include = for_remote;
}
TlcStatus::Outbound(OutboundTlcStatus::RemoveWaitPrevAck)
Expand Down Expand Up @@ -4845,16 +4838,16 @@ impl ChannelActorState {
}

fn create_outbounding_tlc(&self, command: AddTlcCommand) -> TlcInfo {
let id = self.get_next_offering_tlc_id();
let tlc_id = self.get_next_offering_tlc_id();
assert!(
self.get_offered_tlc(id).is_none(),
self.get_offered_tlc(tlc_id).is_none(),
"Must not have the same id in pending offered tlcs"
);

TlcInfo {
channel_id: self.get_id(),
status: TlcStatus::Outbound(OutboundTlcStatus::LocalAnnounced),
tlc_id: TLCId::Offered(id),
tlc_id,
amount: command.amount,
payment_hash: command.payment_hash,
expiry: command.expiry,
Expand Down Expand Up @@ -5701,13 +5694,6 @@ impl ChannelActorState {

eprintln!("handle revoke and ack peer message: {:?}", &revocation_data);
let need_commitment_signed = self.tlc_state.update_for_revoke_and_ack();
// if need_commitment_signed {
// myself
// .send_message(ChannelActorMessage::Command(
// ChannelCommand::CommitmentSigned(),
// ))
// .expect(ASSUME_NETWORK_ACTOR_ALIVE);
// }
network
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RevokeAndAckReceived(
Expand Down

0 comments on commit 5f490d0

Please sign in to comment.