chore: fix bbr bugs

This commit is contained in:
wwqgtxx 2023-09-30 13:39:50 +08:00
parent 5f6de610e1
commit a526bb70ea
5 changed files with 79 additions and 119 deletions

4
go.mod
View file

@ -19,8 +19,8 @@ require (
github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40
github.com/mdlayher/netlink v1.7.2 github.com/mdlayher/netlink v1.7.2
github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759
github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731 github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55
github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255 github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c
github.com/metacubex/sing-shadowsocks v0.2.5 github.com/metacubex/sing-shadowsocks v0.2.5
github.com/metacubex/sing-shadowsocks2 v0.1.4 github.com/metacubex/sing-shadowsocks2 v0.1.4
github.com/metacubex/sing-tun v0.1.13-0.20230926010214-4e9d1add2aee github.com/metacubex/sing-tun v0.1.13-0.20230926010214-4e9d1add2aee

8
go.sum
View file

@ -93,12 +93,12 @@ github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 h1:cjd4biTvO
github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759/go.mod h1:UHOv2xu+RIgLwpXca7TLrXleEd4oR3sPatW6IF8wU88= github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759/go.mod h1:UHOv2xu+RIgLwpXca7TLrXleEd4oR3sPatW6IF8wU88=
github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475 h1:qSEOvPPaMrWggFyFhFYGyMR8i1HKyhXjdi1QYUAa2ww= github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475 h1:qSEOvPPaMrWggFyFhFYGyMR8i1HKyhXjdi1QYUAa2ww=
github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475/go.mod h1:wehEpqiogdeyncfhckJP5gD2LtBgJW0wnDC24mJ+8Jg= github.com/metacubex/gvisor v0.0.0-20230611153922-78842f086475/go.mod h1:wehEpqiogdeyncfhckJP5gD2LtBgJW0wnDC24mJ+8Jg=
github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731 h1:F+xz1KCwUfCud5eWHKZr0QsWOeA+mqIFvn90r8hq+R8= github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55 h1:cAqp0BFOTr/1TpFicH1dA1q/6fp7E/JkqHBORfohqr4=
github.com/metacubex/quic-go v0.39.1-0.20230926003849-db956da2a731/go.mod h1:4pe6cY+nAMFU/Uxn1rfnxNIowsaJGDQ3uyy4VuiPkP4= github.com/metacubex/quic-go v0.39.1-0.20230930051114-b486c7799a55/go.mod h1:4pe6cY+nAMFU/Uxn1rfnxNIowsaJGDQ3uyy4VuiPkP4=
github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1 h1:MkYAvDyhb7cwuqL4ZLKU3Oi6tYjFnz1sz5LS82JmtDo= github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1 h1:MkYAvDyhb7cwuqL4ZLKU3Oi6tYjFnz1sz5LS82JmtDo=
github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1/go.mod h1:GQ673iPfUnkbK/dIPkfd1Xh1MjOGo36gkl/mkiHY7Jg= github.com/metacubex/sing v0.0.0-20230926010351-b23b466642d1/go.mod h1:GQ673iPfUnkbK/dIPkfd1Xh1MjOGo36gkl/mkiHY7Jg=
github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255 h1:NfdM4hDFIhq9QxDStJ9Rz1h73sRUO/2L4pRZ6lGWRz8= github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c h1:j7PKIUUhOAxJaLf/NmUKuIs9R06xNoYizwYgqf5HSrA=
github.com/metacubex/sing-quic v0.0.0-20230926004739-7c7c534c2255/go.mod h1:asoMecRyaA6pLSLVR+qFdp/vD24m8KZ1O/QDxWa7RsM= github.com/metacubex/sing-quic v0.0.0-20230930052455-ae588c275b9c/go.mod h1:TPAXFCHCtzW9Dm+wq1l1R/p0v/S/xmuRU0qfPR7WlOA=
github.com/metacubex/sing-shadowsocks v0.2.5 h1:O2RRSHlKGEpAVG/OHJQxyHqDy8uvvdCW/oW2TDBOIhc= github.com/metacubex/sing-shadowsocks v0.2.5 h1:O2RRSHlKGEpAVG/OHJQxyHqDy8uvvdCW/oW2TDBOIhc=
github.com/metacubex/sing-shadowsocks v0.2.5/go.mod h1:Xz2uW9BEYGEoA8B4XEpoxt7ERHClFCwsMAvWaruoyMo= github.com/metacubex/sing-shadowsocks v0.2.5/go.mod h1:Xz2uW9BEYGEoA8B4XEpoxt7ERHClFCwsMAvWaruoyMo=
github.com/metacubex/sing-shadowsocks2 v0.1.4 h1:OOCf8lgsVcpTOJUeaFAMzyKVebaQOBnKirDdUdBoKIE= github.com/metacubex/sing-shadowsocks2 v0.1.4 h1:OOCf8lgsVcpTOJUeaFAMzyKVebaQOBnKirDdUdBoKIE=

View file

@ -100,6 +100,10 @@ func (b *BrutalSender) OnCongestionEvent(number congestion.PacketNumber, lostByt
b.updateAckRate(currentTimestamp) b.updateAckRate(currentTimestamp)
} }
func (b *BrutalSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) {
// Stub
}
func (b *BrutalSender) SetMaxDatagramSize(size congestion.ByteCount) { func (b *BrutalSender) SetMaxDatagramSize(size congestion.ByteCount) {
b.maxDatagramSize = size b.maxDatagramSize = size
b.pacer.SetMaxDatagramSize(size) b.pacer.SetMaxDatagramSize(size)

View file

@ -347,15 +347,36 @@ func (b *bbrSender) MaybeExitSlowStart() {
} }
func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes congestion.ByteCount, priorInFlight congestion.ByteCount, eventTime time.Time) { func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes congestion.ByteCount, priorInFlight congestion.ByteCount, eventTime time.Time) {
// Stub
}
func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes congestion.ByteCount, priorInFlight congestion.ByteCount) {
// Stub
}
func (b *bbrSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) {
totalBytesAckedBefore := b.sampler.totalBytesAcked totalBytesAckedBefore := b.sampler.totalBytesAcked
isRoundStart, minRttExpired := false, false isRoundStart, minRttExpired := false, false
lastAckedPacket := number
isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket) if lostPackets != nil {
minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, number, ackedBytes) b.DiscardLostPackets(lostPackets)
b.UpdateRecoveryState(false, isRoundStart) }
bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore
excessAcked := b.UpdateAckAggregationBytes(eventTime, bytesAcked) // Input the new data into the BBR model of the connection.
var excessAcked congestion.ByteCount
if len(ackedPackets) > 0 {
lastAckedPacket := ackedPackets[len(ackedPackets)-1].PacketNumber
isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket)
minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, ackedPackets)
b.UpdateRecoveryState(len(lostPackets) > 0, isRoundStart)
bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore
excessAcked = b.UpdateAckAggregationBytes(eventTime, bytesAcked)
}
// Handle logic specific to PROBE_BW mode.
if b.mode == PROBE_BW {
b.UpdateGainCyclePhase(eventTime, priorInFlight, len(lostPackets) > 0)
}
// Handle logic specific to STARTUP and DRAIN modes. // Handle logic specific to STARTUP and DRAIN modes.
if isRoundStart && !b.isAtFullBandwidth { if isRoundStart && !b.isAtFullBandwidth {
@ -366,38 +387,12 @@ func (b *bbrSender) OnPacketAcked(number congestion.PacketNumber, ackedBytes con
// Handle logic specific to PROBE_RTT. // Handle logic specific to PROBE_RTT.
b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired) b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired)
// After the model is updated, recalculate the pacing rate and congestion
// window.
b.CalculatePacingRate()
b.CalculateCongestionWindow(bytesAcked, excessAcked)
b.CalculateRecoveryWindow(bytesAcked, congestion.ByteCount(0))
}
func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes congestion.ByteCount, priorInFlight congestion.ByteCount) {
eventTime := time.Now()
totalBytesAckedBefore := b.sampler.totalBytesAcked
isRoundStart, minRttExpired := false, false
b.DiscardLostPackets(number, lostBytes)
// Input the new data into the BBR model of the connection.
var excessAcked congestion.ByteCount
// Handle logic specific to PROBE_BW mode.
if b.mode == PROBE_BW {
b.UpdateGainCyclePhase(time.Now(), priorInFlight, true)
}
// Handle logic specific to STARTUP and DRAIN modes.
b.MaybeExitStartupOrDrain(eventTime)
// Handle logic specific to PROBE_RTT.
b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired)
// Calculate number of packets acked and lost. // Calculate number of packets acked and lost.
bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore
bytesLost := lostBytes bytesLost := congestion.ByteCount(0)
for _, packet := range lostPackets {
bytesLost += packet.BytesLost
}
// After the model is updated, recalculate the pacing rate and congestion // After the model is updated, recalculate the pacing rate and congestion
// window. // window.
@ -406,53 +401,6 @@ func (b *bbrSender) OnCongestionEvent(number congestion.PacketNumber, lostBytes
b.CalculateRecoveryWindow(bytesAcked, bytesLost) b.CalculateRecoveryWindow(bytesAcked, bytesLost)
} }
//func (b *bbrSender) OnCongestionEvent(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets, lostPackets []*congestion.Packet) {
// totalBytesAckedBefore := b.sampler.totalBytesAcked
// isRoundStart, minRttExpired := false, false
//
// if lostPackets != nil {
// b.DiscardLostPackets(lostPackets)
// }
//
// // Input the new data into the BBR model of the connection.
// var excessAcked congestion.ByteCount
// if len(ackedPackets) > 0 {
// lastAckedPacket := ackedPackets[len(ackedPackets)-1].PacketNumber
// isRoundStart = b.UpdateRoundTripCounter(lastAckedPacket)
// minRttExpired = b.UpdateBandwidthAndMinRtt(eventTime, ackedPackets)
// b.UpdateRecoveryState(lastAckedPacket, len(lostPackets) > 0, isRoundStart)
// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore
// excessAcked = b.UpdateAckAggregationBytes(eventTime, bytesAcked)
// }
//
// // Handle logic specific to PROBE_BW mode.
// if b.mode == PROBE_BW {
// b.UpdateGainCyclePhase(eventTime, priorInFlight, len(lostPackets) > 0)
// }
//
// // Handle logic specific to STARTUP and DRAIN modes.
// if isRoundStart && !b.isAtFullBandwidth {
// b.CheckIfFullBandwidthReached()
// }
// b.MaybeExitStartupOrDrain(eventTime)
//
// // Handle logic specific to PROBE_RTT.
// b.MaybeEnterOrExitProbeRtt(eventTime, isRoundStart, minRttExpired)
//
// // Calculate number of packets acked and lost.
// bytesAcked := b.sampler.totalBytesAcked - totalBytesAckedBefore
// bytesLost := congestion.ByteCount(0)
// for _, packet := range lostPackets {
// bytesLost += packet.Length
// }
//
// // After the model is updated, recalculate the pacing rate and congestion
// // window.
// b.CalculatePacingRate()
// b.CalculateCongestionWindow(bytesAcked, excessAcked)
// b.CalculateRecoveryWindow(bytesAcked, bytesLost)
//}
//func (b *bbrSender) SetNumEmulatedConnections(n int) { //func (b *bbrSender) SetNumEmulatedConnections(n int) {
// //
//} //}
@ -553,30 +501,32 @@ func (b *bbrSender) UpdateRoundTripCounter(lastAckedPacket congestion.PacketNumb
return false return false
} }
func (b *bbrSender) UpdateBandwidthAndMinRtt(now time.Time, number congestion.PacketNumber, ackedBytes congestion.ByteCount) bool { func (b *bbrSender) UpdateBandwidthAndMinRtt(now time.Time, ackedPackets []congestion.AckedPacketInfo) bool {
sampleMinRtt := InfiniteRTT sampleMinRtt := InfiniteRTT
if !b.alwaysGetBwSampleWhenAcked && ackedBytes == 0 { for _, packet := range ackedPackets {
// Skip acked packets with 0 in flight bytes when updating bandwidth. if !b.alwaysGetBwSampleWhenAcked && packet.BytesAcked == 0 {
return false // Skip acked packets with 0 in flight bytes when updating bandwidth.
} return false
bandwidthSample := b.sampler.OnPacketAcked(now, number) }
if b.alwaysGetBwSampleWhenAcked && !bandwidthSample.stateAtSend.isValid { bandwidthSample := b.sampler.OnPacketAcked(now, packet.PacketNumber)
// From the sampler's perspective, the packet has never been sent, or the if b.alwaysGetBwSampleWhenAcked && !bandwidthSample.stateAtSend.isValid {
// packet has been acked or marked as lost previously. // From the sampler's perspective, the packet has never been sent, or the
return false // packet has been acked or marked as lost previously.
} return false
b.lastSampleIsAppLimited = bandwidthSample.stateAtSend.isAppLimited }
// has_non_app_limited_sample_ |= b.lastSampleIsAppLimited = bandwidthSample.stateAtSend.isAppLimited
// !bandwidth_sample.state_at_send.is_app_limited; // has_non_app_limited_sample_ |=
if !bandwidthSample.stateAtSend.isAppLimited { // !bandwidth_sample.state_at_send.is_app_limited;
b.hasNoAppLimitedSample = true if !bandwidthSample.stateAtSend.isAppLimited {
} b.hasNoAppLimitedSample = true
if bandwidthSample.rtt > 0 { }
sampleMinRtt = minRtt(sampleMinRtt, bandwidthSample.rtt) if bandwidthSample.rtt > 0 {
} sampleMinRtt = minRtt(sampleMinRtt, bandwidthSample.rtt)
if !bandwidthSample.stateAtSend.isAppLimited || bandwidthSample.bandwidth > b.BandwidthEstimate() { }
b.maxBandwidth.Update(int64(bandwidthSample.bandwidth), b.roundTripCount) if !bandwidthSample.stateAtSend.isAppLimited || bandwidthSample.bandwidth > b.BandwidthEstimate() {
b.maxBandwidth.Update(int64(bandwidthSample.bandwidth), b.roundTripCount)
}
} }
// If none of the RTT samples are valid, return immediately. // If none of the RTT samples are valid, return immediately.
@ -619,14 +569,16 @@ func (b *bbrSender) ShouldExtendMinRttExpiry() bool {
return false return false
} }
func (b *bbrSender) DiscardLostPackets(number congestion.PacketNumber, lostBytes congestion.ByteCount) { func (b *bbrSender) DiscardLostPackets(lostPackets []congestion.LostPacketInfo) {
b.sampler.OnCongestionEvent(number) for _, packet := range lostPackets {
if b.mode == STARTUP { b.sampler.OnCongestionEvent(packet.PacketNumber)
// if b.rttStats != nil { if b.mode == STARTUP {
// TODO: slow start. // if b.rttStats != nil {
// } // TODO: slow start.
if b.startupRateReductionMultiplier != 0 { // }
b.startupBytesLost += lostBytes if b.startupRateReductionMultiplier != 0 {
b.startupBytesLost += packet.BytesLost
}
} }
} }
} }

View file

@ -197,6 +197,10 @@ func (c *cubicSender) OnCongestionEvent(packetNumber congestion.PacketNumber, lo
c.numAckedPackets = 0 c.numAckedPackets = 0
} }
func (b *cubicSender) OnCongestionEventEx(priorInFlight congestion.ByteCount, eventTime time.Time, ackedPackets []congestion.AckedPacketInfo, lostPackets []congestion.LostPacketInfo) {
// Stub
}
// Called when we receive an ack. Normal TCP tracks how many packets one ack // Called when we receive an ack. Normal TCP tracks how many packets one ack
// represents, but quic has a separate ack for each packet. // represents, but quic has a separate ack for each packet.
func (c *cubicSender) maybeIncreaseCwnd( func (c *cubicSender) maybeIncreaseCwnd(