diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt index 8e43a057e7..8853ae0fda 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/rtp/TransportCcEngine.kt @@ -22,7 +22,6 @@ import org.jitsi.nlj.util.DataSize import org.jitsi.nlj.util.NEVER import org.jitsi.nlj.util.Rfc3711IndexTracker import org.jitsi.nlj.util.formatMilli -import org.jitsi.nlj.util.instantOfEpochMicro import org.jitsi.rtp.rtcp.RtcpPacket import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.PacketReport import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.ReceivedPacketReport @@ -126,7 +125,7 @@ class TransportCcEngine( private fun tccReceived(tccPacket: RtcpFbTccPacket) { val now = clock.instant() - var currArrivalTimestamp = instantOfEpochMicro(tccPacket.GetBaseTimeUs()) + var currArrivalTimestamp = tccPacket.BaseTime() if (remoteReferenceTime == NEVER) { remoteReferenceTime = currArrivalTimestamp localReferenceTime = now diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt index e22fb088da..c12996ce82 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/transform/node/incoming/TccGeneratorNode.kt @@ -25,7 +25,6 @@ import org.jitsi.nlj.util.NEVER import org.jitsi.nlj.util.ReadOnlyStreamInformationStore import org.jitsi.nlj.util.Rfc3711IndexTracker import org.jitsi.nlj.util.bytes -import org.jitsi.nlj.util.toEpochMicro import org.jitsi.rtp.rtcp.RtcpPacket import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder @@ -182,20 +181,19 @@ class TccGeneratorNode( mediaSourceSsrc = mediaSsrc, feedbackPacketSeqNum = currTccSeqNum++ ) - currentTccPacket.SetBase(windowStartSeq, firstEntry.value.toEpochMicro()) + currentTccPacket.SetBase(windowStartSeq, firstEntry.value) var nextSequenceNumber = windowStartSeq val feedbackBlockPackets = packetArrivalTimes.tailMap(windowStartSeq) feedbackBlockPackets.forEach { (seq, timestamp) -> - val timestampUs = timestamp.toEpochMicro() - if (!currentTccPacket.AddReceivedPacket(seq, timestampUs)) { + if (!currentTccPacket.AddReceivedPacket(seq, timestamp)) { tccPackets.add(currentTccPacket.build()) currentTccPacket = RtcpFbTccPacketBuilder( mediaSourceSsrc = mediaSsrc, feedbackPacketSeqNum = currTccSeqNum++ ).apply { - SetBase(seq, timestampUs) - AddReceivedPacket(seq, timestampUs) + SetBase(seq, timestamp) + AddReceivedPacket(seq, timestamp) } } nextSequenceNumber = seq + 1 diff --git a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/util/ClockUtils.kt b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/util/ClockUtils.kt index d5f7a2573d..823ef31e90 100644 --- a/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/util/ClockUtils.kt +++ b/jitsi-media-transform/src/main/kotlin/org/jitsi/nlj/util/ClockUtils.kt @@ -29,47 +29,6 @@ fun Instant.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.epochS fun Duration.formatMilli(): String = TimeUtils.formatTimeAsFullMillis(this.seconds, this.nano) -/** - * Converts this instant to the number of microseconds from the epoch - * of 1970-01-01T00:00:00Z. - * - * If this instant represents a point on the time-line too far in the future - * or past to fit in a [Long] microseconds, then an exception is thrown. - * - * If this instant has greater than microsecond precision, then the conversion - * will drop any excess precision information as though the amount in nanoseconds - * was subject to integer division by one thousand. - * - * @return the number of microseconds since the epoch of 1970-01-01T00:00:00Z - * @throws ArithmeticException if numeric overflow occurs - */ -fun Instant.toEpochMicro(): Long { - return if (this.epochSecond < 0 && this.nano > 0) { - val micros = Math.multiplyExact(this.epochSecond + 1, 1000_000L) - val adjustment: Long = (this.nano / 1000 - 1000_000).toLong() - Math.addExact(micros, adjustment) - } else { - val micros = Math.multiplyExact(this.epochSecond, 1000_000L) - Math.addExact(micros, (this.nano / 1000).toLong()) - } -} - -/** - * Obtains an instance of [Instant] using microseconds from the - * epoch of 1970-01-01T00:00:00Z. - *

- * The seconds and nanoseconds are extracted from the specified milliseconds. - * - * @param epochMicro the number of microseconds from 1970-01-01T00:00:00Z - * @return an instant, not null - * @throws DateTimeException if the instant exceeds the maximum or minimum instant - */ -fun instantOfEpochMicro(epochMicro: Long): Instant { - val secs = Math.floorDiv(epochMicro, 1000_000L) - val micros = Math.floorMod(epochMicro, 1000_000L) - return Instant.ofEpochSecond(secs, micros * 1000L) -} - fun Iterable.sumOf(selector: (T) -> Duration): Duration { var sum: Duration = Duration.ZERO for (element in this) { diff --git a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt index 2bde7dd289..b0b480f01b 100644 --- a/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt +++ b/jitsi-media-transform/src/test/kotlin/org/jitsi/nlj/rtp/TransportCcEngineTest.kt @@ -23,6 +23,7 @@ import org.jitsi.nlj.resources.logging.StdoutLogger import org.jitsi.nlj.rtp.bandwidthestimation.BandwidthEstimator import org.jitsi.nlj.util.bytes import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacketBuilder +import org.jitsi.utils.instantOfEpochMicro import org.jitsi.utils.time.FakeClock import java.util.logging.Level @@ -58,11 +59,11 @@ class TransportCcEngineTest : FunSpec() { transportCcEngine.mediaPacketSent(4, 1300.bytes) val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 0)) { - SetBase(1, 100) - AddReceivedPacket(1, 100) - AddReceivedPacket(2, 110) - AddReceivedPacket(3, 120) - AddReceivedPacket(4, 130) + SetBase(1, instantOfEpochMicro(100)) + AddReceivedPacket(1, instantOfEpochMicro(100)) + AddReceivedPacket(2, instantOfEpochMicro(110)) + AddReceivedPacket(3, instantOfEpochMicro(120)) + AddReceivedPacket(4, instantOfEpochMicro(130)) build() } @@ -81,15 +82,15 @@ class TransportCcEngineTest : FunSpec() { transportCcEngine.mediaPacketSent(4, 1300.bytes) val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) { - SetBase(4, 130) - AddReceivedPacket(4, 130) + SetBase(4, instantOfEpochMicro(130)) + AddReceivedPacket(4, instantOfEpochMicro(130)) build() } transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant()) val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) { - SetBase(4, 130) - AddReceivedPacket(4, 130) + SetBase(4, instantOfEpochMicro(130)) + AddReceivedPacket(4, instantOfEpochMicro(130)) build() } transportCcEngine.rtcpPacketReceived(tccPacket2, clock.instant()) @@ -108,8 +109,8 @@ class TransportCcEngineTest : FunSpec() { transportCcEngine.mediaPacketSent(5, 1300.bytes) val tccPacket = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 1)) { - SetBase(4, 130) - AddReceivedPacket(5, 130) + SetBase(4, instantOfEpochMicro(130)) + AddReceivedPacket(5, instantOfEpochMicro(130)) build() } transportCcEngine.rtcpPacketReceived(tccPacket, clock.instant()) @@ -118,8 +119,8 @@ class TransportCcEngineTest : FunSpec() { lossListener.numLost shouldBe 1 val tccPacket2 = with(RtcpFbTccPacketBuilder(mediaSourceSsrc = 123, feedbackPacketSeqNum = 2)) { - SetBase(4, 130) - AddReceivedPacket(4, 130) + SetBase(4, instantOfEpochMicro(130)) + AddReceivedPacket(4, instantOfEpochMicro(130)) build() } diff --git a/pom.xml b/pom.xml index 3a00efafcb..631ee0f0d6 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 2.0.0 5.9.1 5.10.2 - 1.0-132-g83984af + 1.0-133-g6af1020 1.1-141-g30ec741 1.13.11 3.2.0 diff --git a/rtp/src/main/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacket.kt b/rtp/src/main/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacket.kt index 8566f34421..65a08be936 100644 --- a/rtp/src/main/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacket.kt +++ b/rtp/src/main/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacket.kt @@ -30,7 +30,7 @@ import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companio import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kDeltaScaleFactor import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxReportedPackets import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kMaxSizeBytes -import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriodUs +import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTimeWrapPeriod import org.jitsi.rtp.rtcp.rtcpfb.transport_layer_fb.tcc.RtcpFbTccPacket.Companion.kTransportFeedbackHeaderSizeBytes import org.jitsi.rtp.rtp.RtpSequenceNumber import org.jitsi.rtp.rtp.toRtpSequenceNumber @@ -39,8 +39,12 @@ import org.jitsi.rtp.util.RtpUtils import org.jitsi.rtp.util.get3BytesAsInt import org.jitsi.rtp.util.getByteAsInt import org.jitsi.rtp.util.getShortAsInt +import org.jitsi.utils.micros +import org.jitsi.utils.times +import org.jitsi.utils.toEpochMicro +import org.jitsi.utils.toMicros import java.time.Duration -import java.time.temporal.ChronoUnit +import java.time.Instant sealed class PacketReport(val seqNum: Int) @@ -52,7 +56,7 @@ typealias DeltaSize = Int class ReceivedPacketReport(seqNum: Int, val deltaTicks: Short) : PacketReport(seqNum) { val deltaDuration: Duration - get() = Duration.of(deltaTicks * 250L, ChronoUnit.MICROS) + get() = deltaTicks.toInt() * kDeltaScaleFactor } /** @@ -95,23 +99,25 @@ class RtcpFbTccPacketBuilder( // The size of the entire packet, in bytes private var size_bytes_ = kTransportFeedbackHeaderSizeBytes - private var last_timestamp_us_: Long = 0 + private var last_timestamp_: Instant = Instant.EPOCH private val packets_ = mutableListOf() - fun SetBase(base_sequence: Int, ref_timestamp_us: Long) { + fun SetBase(base_sequence: Int, ref_timestamp: Instant) { base_seq_no_ = base_sequence.toRtpSequenceNumber() - base_time_ticks_ = (ref_timestamp_us % kTimeWrapPeriodUs) / kBaseScaleFactor - last_timestamp_us_ = GetBaseTimeUs() + base_time_ticks_ = (ref_timestamp.toEpochMicro() % kTimeWrapPeriod.toMicros()) / kBaseScaleFactor.toMicros() + last_timestamp_ = BaseTime() } - fun AddReceivedPacket(seqNum: Int, timestamp_us: Long): Boolean { + fun AddReceivedPacket(seqNum: Int, timestamp: Instant): Boolean { val sequence_number = seqNum.toRtpSequenceNumber() - var delta_full = (timestamp_us - last_timestamp_us_) % kTimeWrapPeriodUs - if (delta_full > kTimeWrapPeriodUs / 2) { - delta_full -= kTimeWrapPeriodUs + var delta_full = Duration.between(last_timestamp_, timestamp).toMicros() % kTimeWrapPeriod.toMicros() + if (delta_full > kTimeWrapPeriod.toMicros() / 2) { + delta_full -= kTimeWrapPeriod.toMicros() + delta_full -= kDeltaScaleFactor.toMicros() / 2 + } else { + delta_full += kDeltaScaleFactor.toMicros() / 2 } - delta_full += if (delta_full < 0) -(kDeltaScaleFactor / 2) else kDeltaScaleFactor / 2 - delta_full /= kDeltaScaleFactor + delta_full /= kDeltaScaleFactor.toMicros() val delta = delta_full.toShort() // If larger than 16bit signed, we can't represent it - need new fb packet. @@ -137,13 +143,15 @@ class RtcpFbTccPacketBuilder( } packets_.add(ReceivedPacketReport(sequence_number.value, delta)) - last_timestamp_us_ += delta * kDeltaScaleFactor + last_timestamp_ += delta.toInt() * kDeltaScaleFactor size_bytes_ += delta_size return true } - fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor + fun BaseTime(): Instant { + return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor + } private fun AddDeltaSize(deltaSize: DeltaSize): Boolean { if (num_seq_no_ == kMaxReportedPackets) { @@ -295,7 +303,7 @@ class RtcpFbTccPacket( val encoded_chunks_: MutableList, var last_chunk_: LastChunk, var num_seq_no_: Int, - var last_timestamp_us_: Long, + var last_timestamp_: Instant, val packets_: MutableList ) @@ -305,7 +313,7 @@ class RtcpFbTccPacket( val encoded_chunks_ = mutableListOf() val last_chunk_ = LastChunk() val num_seq_no_: Int - var last_timestamp_us_: Long = 0 + var last_timestamp_: Instant = Instant.EPOCH val packets_ = mutableListOf() val base_time_ticks_ = getReferenceTimeTicks(buffer, offset) @@ -343,13 +351,13 @@ class RtcpFbTccPacket( 1 -> { val delta = buffer[index] packets_.add(ReceivedPacketReport(seq_no.value, delta.toPositiveShort())) - last_timestamp_us_ += delta * kDeltaScaleFactor + last_timestamp_ += delta.toInt() * kDeltaScaleFactor index += delta_size } 2 -> { val delta = buffer.getShortAsInt(index) packets_.add(ReceivedPacketReport(seq_no.value, delta.toShort())) - last_timestamp_us_ += delta * kDeltaScaleFactor + last_timestamp_ += delta * kDeltaScaleFactor index += delta_size } 3 -> { @@ -376,7 +384,7 @@ class RtcpFbTccPacket( encoded_chunks_, last_chunk_, num_seq_no_, - last_timestamp_us_, + last_timestamp_, packets_ ) } @@ -401,10 +409,10 @@ class RtcpFbTccPacket( } private val packets_: MutableList get() = data.packets_ - private var last_timestamp_us_: Long - get() = data.last_timestamp_us_ + private var last_timestamp_: Instant + get() = data.last_timestamp_ set(value) { - data.last_timestamp_us_ = value + data.last_timestamp_ = value } // The reference time, in ticks. @@ -416,7 +424,9 @@ class RtcpFbTccPacket( val feedbackSeqNum: Int = getFeedbackPacketCount(buffer, offset) - fun GetBaseTimeUs(): Long = base_time_ticks_ * kBaseScaleFactor + fun BaseTime(): Instant { + return Instant.EPOCH + base_time_ticks_ * kBaseScaleFactor + } override fun iterator(): Iterator = packets_.iterator() @@ -426,7 +436,7 @@ class RtcpFbTccPacket( const val FMT = 15 // Convert to multiples of 0.25ms - const val kDeltaScaleFactor = 250 + val kDeltaScaleFactor = 250.micros // Maximum number of packets_ (including missing) TransportFeedback can report. const val kMaxReportedPackets = 0xFFFF @@ -442,11 +452,11 @@ class RtcpFbTccPacket( const val kTransportFeedbackHeaderSizeBytes = 4 + 8 + 8 // Used to convert from microseconds to multiples of 64ms - const val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8) + val kBaseScaleFactor = kDeltaScaleFactor * (1 shl 8) // The reference time field is 24 bits and are represented as multiples of 64ms // When the reference time field would need to wrap around - const val kTimeWrapPeriodUs: Long = (1 shl 24).toLong() * kBaseScaleFactor + val kTimeWrapPeriod = (1 shl 24).toLong() * kBaseScaleFactor const val BASE_SEQ_NUM_OFFSET = RtcpFbPacket.HEADER_SIZE const val PACKET_STATUS_COUNT_OFFSET = RtcpFbPacket.HEADER_SIZE + 2 diff --git a/rtp/src/test/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacketTest.kt b/rtp/src/test/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacketTest.kt index 6144aa2230..12146eeff4 100644 --- a/rtp/src/test/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacketTest.kt +++ b/rtp/src/test/kotlin/org/jitsi/rtp/rtcp/rtcpfb/transport_layer_fb/tcc/RtcpFbTccPacketTest.kt @@ -24,6 +24,8 @@ import io.kotest.matchers.shouldBe import io.kotest.matchers.types.beInstanceOf import org.jitsi.rtp.rtcp.RtcpHeaderBuilder import org.jitsi.rtp.util.byteBufferOf +import org.jitsi.utils.instantOfEpochMicro +import org.jitsi.utils.micros import java.time.Duration class RtcpFbTccPacketTest : ShouldSpec() { @@ -177,13 +179,13 @@ class RtcpFbTccPacketTest : ShouldSpec() { mediaSourceSsrc = 2397376430, feedbackPacketSeqNum = 162 ) - rtcpFbTccPacketBuilder.SetBase(6227, 107784064) - rtcpFbTccPacketBuilder.AddReceivedPacket(6228, 107784064) shouldBe true + rtcpFbTccPacketBuilder.SetBase(6227, instantOfEpochMicro(107784064)) + rtcpFbTccPacketBuilder.AddReceivedPacket(6228, instantOfEpochMicro(107784064)) shouldBe true } context("Creating and parsing an RtcpFbTccPacket") { context("with missing packets") { val kBaseSeqNo = 1000 - val kBaseTimestampUs = 10000L + val kBaseTimestamp = instantOfEpochMicro(10000L) val rtcpFbTccPacketBuilder = RtcpFbTccPacketBuilder( rtcpHeader = RtcpHeaderBuilder( senderSsrc = 839852602 @@ -191,9 +193,9 @@ class RtcpFbTccPacketTest : ShouldSpec() { mediaSourceSsrc = 2397376430, feedbackPacketSeqNum = 163 ) - rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestampUs) - rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestampUs) - rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestampUs + 2000) + rtcpFbTccPacketBuilder.SetBase(kBaseSeqNo, kBaseTimestamp) + rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 0, kBaseTimestamp) + rtcpFbTccPacketBuilder.AddReceivedPacket(kBaseSeqNo + 3, kBaseTimestamp + 2000.micros) val coded = rtcpFbTccPacketBuilder.build()