From a526bb70ea27a401419acc4b2942577705229f99 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Sat, 30 Sep 2023 13:39:50 +0800 Subject: [PATCH] chore: fix bbr bugs --- go.mod | 4 +- go.sum | 8 +- transport/hysteria/congestion/brutal.go | 4 + transport/tuic/congestion/bbr_sender.go | 178 ++++++++-------------- transport/tuic/congestion/cubic_sender.go | 4 + 5 files changed, 79 insertions(+), 119 deletions(-) diff --git a/go.mod b/go.mod index 3c26f0ae..826eb0ba 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 github.com/mdlayher/netlink v1.7.2 github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 - github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731 - github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255 + github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55 + github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c github.com/metacubex/sing-shadowsocks v0.2.5 github.com/metacubex/sing-shadowsocks2 v0.1.4 github.com/metacubex/sing-tun v0.1.13-0.20230926010214-4e9d1add2aee diff --git a/go.sum b/go.sum index 584a797f..d6c96aaa 100644 --- a/go.sum +++ b/go.sum @@ -93,12 +93,12 @@ github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 h1:cjd4biTvO github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759/go.mod h1:UHOv2xu+RIgLwpXca7TLrXleEd4oR3sPatW6IF8wU88= github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475 h1:qSEOvPPaMrWggFyFhFYGyMR8i1HKyhXjdi1QYUAa2ww= github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475/go.mod h1:wehEpqiogdeyncfhckJP5gD2LtBgJW0wnDC24mJ+8Jg= -github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731 h1:F+xz1KCwUfCud5eWHKZr0QsWOeA+mqIFvn90r8hq+R8= -github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731/go.mod h1:4pe6cY+nAMFU/Uxn1rfnxNIowsaJGDQ3uyy4VuiPkP4= +github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55 h1:cAqp0BFOTr/1TpFicH1dA1q/6fp7E/JkqHBORfohqr4= +github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55/go.mod h1:4pe6cY+nAMFU/Uxn1rfnxNIowsaJGDQ3uyy4VuiPkP4= github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1 h1:MkYAvDyhb7cwuqL4ZLKU3Oi6tYjFnz1sz5LS82JmtDo= github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1/go.mod h1:GQ673iPfUnkbK/dIPkfd1Xh1MjOGo36gkl/mkiHY7Jg= -github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255 h1:NfdM4hDFIhq9QxDStJ9Rz1h73sRUO/2L4pRZ6lGWRz8= -github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255/go.mod h1:asoMecRyaA6pLSLVR+qFdp/vD24m8KZ1O/QDxWa7RsM= +github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c h1:j7PKIUUhOAxJaLf/NmUKuIs9R06xNoYizwYgqf5HSrA= +github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c/go.mod h1:TPAXFCHCtzW9Dm+wq1l1R/p0v/S/xmuRU0qfPR7WlOA= github.com/metacubex/sing-shadowsocks v0.2.5 h1:O2RRSHlKGEpAVG/OHJQxyHqDy8uvvdCW/oW2TDBOIhc= github.com/metacubex/sing-shadowsocks v0.2.5/go.mod h1:Xz2uW9BEYGEoA8B4XEpoxt7ERHClFCwsMAvWaruoyMo= github.com/metacubex/sing-shadowsocks2 v0.1.4 h1:OOCf8lgsVcpTOJUeaFAMzyKVebaQOBnKirDdUdBoKIE= diff --git a/transport/hysteria/congestion/brutal.go b/transport/hysteria/congestion/brutal.go index 88bf6f34..67067917 100644 --- a/transport/hysteria/congestion/brutal.go +++ b/transport/hysteria/congestion/brutal.go @@ -100,6 +100,10 @@ func (b *BrutalSender) OnCongestionEvent(number congestion.PacketNumber, lostByt b.updateAckRate(currentTimestamp) } +func (b *BrutalSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) { + // Stub +} + func (b *BrutalSender) SetMaxDatagramSize(size congestion.ByteCount) { b.maxDatagramSize = size b.pacer.SetMaxDatagramSize(size) diff --git a/transport/tuic/congestion/bbr_sender.go b/transport/tuic/congestion/bbr_sender.go index 2c842300..8c18c616 100644 --- a/transport/tuic/congestion/bbr_sender.go +++ b/transport/tuic/congestion/bbr_sender.go @@ -347,15 +347,36 @@ func (b *bbrSender) MaybeExitSlowStart() { } func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes congestion.ByteCount, priorInFlight congestion.ByteCount, eventTime time.Time) { + // Stub +} + +func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes congestion.ByteCount, priorInFlight congestion.ByteCount) { + // Stub +} + +func (b *bbrSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) { totalBytesAckedBefore := b.sampler.totalBytesAcked isRoundStart, minRttExpired := false, false - lastAckedPacket := number - isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) - minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, number, ackedBytes) - b.UpdateRecoveryState(false, isRoundStart) - bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore - excessAcked := b.UpdateAckAggregationBytes(eventTime, bytesAcked) + if lostPackets != nil { + b.DiscardLostPackets(lostPackets) + } + + // Input the new data into the BBR model of the connection. + var excessAcked congestion.ByteCount + if len(ackedPackets) > 0 { + lastAckedPacket := ackedPackets[len(ackedPackets)-1].PacketNumber + isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) + minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, ackedPackets) + b.UpdateRecoveryState(len(lostPackets) > 0, isRoundStart) + bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore + excessAcked = b.UpdateAckAggregationBytes(eventTime, bytesAcked) + } + + // Handle logic specific to PROBE_BW mode. + if b.mode == PROBE_BW { + b.UpdateGainCyclePhase(eventTime, priorInFlight, len(lostPackets) > 0) + } // Handle logic specific to STARTUP and DRAIN modes. if isRoundStart && !b.isAtFullBandwidth { @@ -366,38 +387,12 @@ func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes con // Handle logic specific to PROBE_RTT. b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) - // After the model is updated, recalculate the pacing rate and congestion - // window. - b.CalculatePacingRate() - b.CalculateCongestionWindow(bytesAcked, excessAcked) - b.CalculateRecoveryWindow(bytesAcked, congestion.ByteCount(0)) - -} - -func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes congestion.ByteCount, priorInFlight congestion.ByteCount) { - eventTime := time.Now() - totalBytesAckedBefore := b.sampler.totalBytesAcked - isRoundStart, minRttExpired := false, false - - b.DiscardLostPackets(number, lostBytes) - - // Input the new data into the BBR model of the connection. - var excessAcked congestion.ByteCount - - // Handle logic specific to PROBE_BW mode. - if b.mode == PROBE_BW { - b.UpdateGainCyclePhase(time.Now(), priorInFlight, true) - } - - // Handle logic specific to STARTUP and DRAIN modes. - b.MaybeExitStartupOrDrain(eventTime) - - // Handle logic specific to PROBE_RTT. - b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) - // Calculate number of packets acked and lost. bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore - bytesLost := lostBytes + bytesLost := congestion.ByteCount(0) + for _, packet := range lostPackets { + bytesLost += packet.BytesLost + } // After the model is updated, recalculate the pacing rate and congestion // window. @@ -406,53 +401,6 @@ func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes b.CalculateRecoveryWindow(bytesAcked, bytesLost) } -//func (b *bbrSender) OnCongestionEvent(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets, lostPackets []*congestion.Packet) { -// totalBytesAckedBefore := b.sampler.totalBytesAcked -// isRoundStart, minRttExpired := false, false -// -// if lostPackets != nil { -// b.DiscardLostPackets(lostPackets) -// } -// -// // Input the new data into the BBR model of the connection. -// var excessAcked congestion.ByteCount -// if len(ackedPackets) > 0 { -// lastAckedPacket := ackedPackets[len(ackedPackets)-1].PacketNumber -// isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) -// minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, ackedPackets) -// b.UpdateRecoveryState(lastAckedPacket, len(lostPackets) > 0, isRoundStart) -// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore -// excessAcked = b.UpdateAckAggregationBytes(eventTime, bytesAcked) -// } -// -// // Handle logic specific to PROBE_BW mode. -// if b.mode == PROBE_BW { -// b.UpdateGainCyclePhase(eventTime, priorInFlight, len(lostPackets) > 0) -// } -// -// // Handle logic specific to STARTUP and DRAIN modes. -// if isRoundStart && !b.isAtFullBandwidth { -// b.CheckIfFullBandwidthReached() -// } -// b.MaybeExitStartupOrDrain(eventTime) -// -// // Handle logic specific to PROBE_RTT. -// b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) -// -// // Calculate number of packets acked and lost. -// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore -// bytesLost := congestion.ByteCount(0) -// for _, packet := range lostPackets { -// bytesLost += packet.Length -// } -// -// // After the model is updated, recalculate the pacing rate and congestion -// // window. -// b.CalculatePacingRate() -// b.CalculateCongestionWindow(bytesAcked, excessAcked) -// b.CalculateRecoveryWindow(bytesAcked, bytesLost) -//} - //func (b *bbrSender) SetNumEmulatedConnections(n int) { // //} @@ -553,30 +501,32 @@ func (b *bbrSender) UpdateRoundTripCounter(lastAckedPacket congestion.PacketNumb return false } -func (b *bbrSender) UpdateBandwidthAndMinRtt(now time.Time, number congestion.PacketNumber, ackedBytes congestion.ByteCount) bool { +func (b *bbrSender) UpdateBandwidthAndMinRtt(now time.Time, ackedPackets []congestion.AckedPacketInfo) bool { sampleMinRtt := InfiniteRTT - if !b.alwaysGetBwSampleWhenAcked && ackedBytes == 0 { - // Skip acked packets with 0 in flight bytes when updating bandwidth. - return false - } - bandwidthSample := b.sampler.OnPacketAcked(now, number) - if b.alwaysGetBwSampleWhenAcked && !bandwidthSample.stateAtSend.isValid { - // From the sampler's perspective, the packet has never been sent, or the - // packet has been acked or marked as lost previously. - return false - } - b.lastSampleIsAppLimited = bandwidthSample.stateAtSend.isAppLimited - // has_non_app_limited_sample_ |= - // !bandwidth_sample.state_at_send.is_app_limited; - if !bandwidthSample.stateAtSend.isAppLimited { - b.hasNoAppLimitedSample = true - } - if bandwidthSample.rtt > 0 { - sampleMinRtt = minRtt(sampleMinRtt, bandwidthSample.rtt) - } - if !bandwidthSample.stateAtSend.isAppLimited || bandwidthSample.bandwidth > b.BandwidthEstimate() { - b.maxBandwidth.Update(int64(bandwidthSample.bandwidth), b.roundTripCount) + for _, packet := range ackedPackets { + if !b.alwaysGetBwSampleWhenAcked && packet.BytesAcked == 0 { + // Skip acked packets with 0 in flight bytes when updating bandwidth. + return false + } + bandwidthSample := b.sampler.OnPacketAcked(now, packet.PacketNumber) + if b.alwaysGetBwSampleWhenAcked && !bandwidthSample.stateAtSend.isValid { + // From the sampler's perspective, the packet has never been sent, or the + // packet has been acked or marked as lost previously. + return false + } + b.lastSampleIsAppLimited = bandwidthSample.stateAtSend.isAppLimited + // has_non_app_limited_sample_ |= + // !bandwidth_sample.state_at_send.is_app_limited; + if !bandwidthSample.stateAtSend.isAppLimited { + b.hasNoAppLimitedSample = true + } + if bandwidthSample.rtt > 0 { + sampleMinRtt = minRtt(sampleMinRtt, bandwidthSample.rtt) + } + if !bandwidthSample.stateAtSend.isAppLimited || bandwidthSample.bandwidth > b.BandwidthEstimate() { + b.maxBandwidth.Update(int64(bandwidthSample.bandwidth), b.roundTripCount) + } } // If none of the RTT samples are valid, return immediately. @@ -619,14 +569,16 @@ func (b *bbrSender) ShouldExtendMinRttExpiry() bool { return false } -func (b *bbrSender) DiscardLostPackets(number congestion.PacketNumber, lostBytes congestion.ByteCount) { - b.sampler.OnCongestionEvent(number) - if b.mode == STARTUP { - // if b.rttStats != nil { - // TODO: slow start. - // } - if b.startupRateReductionMultiplier != 0 { - b.startupBytesLost += lostBytes +func (b *bbrSender) DiscardLostPackets(lostPackets []congestion.LostPacketInfo) { + for _, packet := range lostPackets { + b.sampler.OnCongestionEvent(packet.PacketNumber) + if b.mode == STARTUP { + // if b.rttStats != nil { + // TODO: slow start. + // } + if b.startupRateReductionMultiplier != 0 { + b.startupBytesLost += packet.BytesLost + } } } } diff --git a/transport/tuic/congestion/cubic_sender.go b/transport/tuic/congestion/cubic_sender.go index e058ac7d..f544cd74 100644 --- a/transport/tuic/congestion/cubic_sender.go +++ b/transport/tuic/congestion/cubic_sender.go @@ -197,6 +197,10 @@ func (c *cubicSender) OnCongestionEvent(packetNumber congestion.PacketNumber, lo c.numAckedPackets = 0 } +func (b *cubicSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) { + // Stub +} + // Called when we receive an ack. Normal TCP tracks how many packets one ack // represents, but quic has a separate ack for each packet. func (c *cubicSender) maybeIncreaseCwnd(