// RFC2326 A.1 RTP Data Header Validity Checks #include "rtp-queue.h" #include #include #include #include #define MAX_PACKET 3000 #define RTP_MISORDER 300 #define RTP_DROPOUT 1000 #define RTP_SEQUENTIAL 3 #define RTP_SEQMOD (1 << 16) #define MIN(a, b) ((a) < (b) ? (a) : (b)) #define MAX(a, b) ((a) > (b) ? (a) : (b)) struct rtp_item_t { struct rtp_packet_t* pkt; // uint64_t clock; }; struct rtp_queue_t { struct rtp_item_t* items; int capacity; int size; int pos; // ring buffer read position int probation; int cycles; uint16_t last_seq; uint16_t first_seq; int bad_count; uint16_t bad_seq; struct rtp_item_t bad_items[RTP_SEQUENTIAL+1]; int threshold; int frequency; void (*free)(void*, struct rtp_packet_t*); void* param; struct rtp_queue_stats_t stats; }; static void rtp_queue_reset(struct rtp_queue_t* q); static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq); static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt); struct rtp_queue_t* rtp_queue_create(int threshold, int frequency, void(*freepkt)(void*, struct rtp_packet_t*), void* param) { struct rtp_queue_t* q; q = (struct rtp_queue_t*)calloc(1, sizeof(*q)); if(!q) return NULL; rtp_queue_reset(q); q->probation = 1; q->threshold = threshold; q->frequency = frequency; q->free = freepkt; q->param = param; return q; } int rtp_queue_destroy(struct rtp_queue_t* q) { rtp_queue_reset(q); if (q->items) { assert(q->capacity > 0); free(q->items); q->items = 0; } free(q); return 0; } static inline void rtp_queue_reset_bad_items(struct rtp_queue_t* q) { int i; struct rtp_packet_t* pkt; for (i = 0; i < q->bad_count; i++) { pkt = q->bad_items[i].pkt; q->free(q->param, pkt); } q->bad_seq = 0; q->bad_count = 0; } static void rtp_queue_reset(struct rtp_queue_t* q) { int i; struct rtp_packet_t* pkt; rtp_queue_reset_bad_items(q); for (i = 0; i < q->size; i++) { pkt = q->items[(q->pos + i) % q->capacity].pkt; q->free(q->param, pkt); } q->pos = 0; q->size = 0; q->probation = RTP_SEQUENTIAL; } static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq) { uint16_t v; uint16_t vi; int l, r, i; l = q->pos; r = q->pos + q->size; v = q->last_seq - seq; while (l < r) { i = (l + r) / 2; vi = (uint16_t)q->last_seq - (uint16_t)q->items[i % q->capacity].pkt->rtp.seq; if (vi == v) { return -1; // duplicate } else if (vi < v) { r = i; } else { assert(vi > v); l = i + 1; } } return l; // insert position } static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt) { void* p; int i, capacity; assert(position >= q->pos && position <= q->pos + q->size); if (q->size >= q->capacity) { if (q->size + 1 > MAX_PACKET) return -E2BIG; capacity = q->capacity + 250; p = realloc(q->items, capacity * sizeof(struct rtp_item_t)); if (NULL == p) return -ENOMEM; q->items = (struct rtp_item_t*)p; if (q->pos + q->size > q->capacity) { // move to tail assert(q->pos < q->capacity); memmove(&q->items[q->pos + capacity - q->capacity], &q->items[q->pos], (q->capacity - q->pos) * sizeof(struct rtp_item_t)); q->pos += capacity - q->capacity; position += capacity - q->capacity; } q->capacity = capacity; } // move items for (i = q->pos + q->size; i > position; i--) memcpy(&q->items[i % q->capacity], &q->items[(i - 1) % q->capacity], sizeof(struct rtp_item_t)); q->items[position % q->capacity].pkt = pkt; // q->items[position % q->capacity].clock = 0; q->size++; return 1; } /* first last ^ ^ ---too late---|------------------|----max drop---|-----another sequential--- --------------|------queue-------|--------------------------------------------> */ int rtp_queue_write(struct rtp_queue_t* q, struct rtp_packet_t* pkt) { int i, idx; uint16_t delta; q->stats.total++; if (q->probation) { if (q->size > 0 && (uint16_t)pkt->rtp.seq == q->last_seq + 1) { if (0 == --q->probation) q->first_seq = (uint16_t)q->items[q->pos].pkt->rtp.seq; } else if (q->size == 0 && q->probation == 1) { // init q->first_seq = (uint16_t)pkt->rtp.seq; --q->probation; } else { rtp_queue_reset(q); } q->last_seq = (uint16_t)pkt->rtp.seq; return rtp_queue_insert(q, q->pos + q->size, pkt); } else { delta = (uint16_t)(pkt->rtp.seq - q->last_seq); if (delta > 0 && delta < RTP_DROPOUT) { if (pkt->rtp.seq < q->last_seq) q->cycles += RTP_SEQMOD; rtp_queue_reset_bad_items(q); q->last_seq = (uint16_t)pkt->rtp.seq; return rtp_queue_insert(q, q->pos + q->size, pkt); } else if ( (int16_t)delta <= 0 && (int16_t)delta >= (int16_t)(q->first_seq - q->last_seq) ) { // pkt->rtp.seq - q->first_seq < q->last_seq - q->first_seq // duplicate or reordered packet idx = rtp_queue_find(q, (uint16_t)pkt->rtp.seq); if (-1 == idx) { ++q->stats.duplicate; return -1; } ++q->stats.reorder; rtp_queue_reset_bad_items(q); return rtp_queue_insert(q, idx, pkt); } else if ((uint16_t)(q->first_seq - pkt->rtp.seq) < RTP_MISORDER) { // too late: pkt->req.seq < q->first_seq ++q->stats.late; return -1; } else { if (q->bad_count > 0 && q->bad_seq == pkt->rtp.seq) { if (q->bad_count >= RTP_SEQUENTIAL) { // Two sequential packets -- assume that the other side // restarted without telling us so just re-sync // (i.e., pretend this was the first packet). //rtp_queue_reset(q); // copy saved items for (i = 0; i < q->bad_count; i++) rtp_queue_insert(q, q->pos + q->size, q->bad_items[i].pkt); q->bad_count = 0; q->last_seq = (uint16_t)pkt->rtp.seq; return rtp_queue_insert(q, q->pos + q->size, pkt); } } else { q->stats.bad++; rtp_queue_reset_bad_items(q); } q->bad_seq = (pkt->rtp.seq + 1) % (RTP_SEQMOD-1); q->bad_items[q->bad_count++].pkt = pkt; return 1; } } // for safety assert(0); return -1; } struct rtp_packet_t* rtp_queue_read(struct rtp_queue_t* q) { uint32_t threshold; struct rtp_packet_t* pkt; if (q->size < 1 || q->probation) return NULL; assert(q->pos < q->capacity); pkt = q->items[q->pos].pkt; if (q->first_seq == pkt->rtp.seq) { q->first_seq++; q->size--; q->pos = (q->pos + 1) % q->capacity; return pkt; } else { threshold = (q->items[(q->pos + q->size - 1) % q->capacity].pkt->rtp.timestamp - pkt->rtp.timestamp); threshold = (int32_t)threshold < 0 ? (uint32_t)(-(int32_t)threshold) : threshold; // fix h.264 b-frames pts order threshold = (uint32_t)(((uint64_t)threshold) * 1000 / (uint64_t)q->frequency); if (threshold < (uint32_t)q->threshold && q->size + 5 < MIN(RTP_DROPOUT, MAX_PACKET) ) return NULL; q->stats.lost += pkt->rtp.seq - q->first_seq; q->first_seq = (uint16_t)(pkt->rtp.seq + 1); q->size--; q->pos = (q->pos + 1) % q->capacity; return pkt; } } void rtp_queue_stats(struct rtp_queue_t* q, struct rtp_queue_stats_t* stats) { memcpy(stats, &q->stats, sizeof(*stats)); } #if defined(_DEBUG) || defined(DEBUG) #include static void rtp_queue_dump(struct rtp_queue_t* q) { int i; printf("[%02d/%02d]: ", q->pos, q->size); for (i = 0; i < q->size; i++) { printf("%u\t", (unsigned int)q->items[(i + q->pos) % q->capacity].pkt->rtp.seq); } printf("\n"); } static void rtp_packet_free(void* param, struct rtp_packet_t* pkt) { free(pkt); (void)param; } static int rtp_queue_packet(rtp_queue_t* q, uint16_t seq) { struct rtp_packet_t* pkt; pkt = (struct rtp_packet_t*)malloc(sizeof(*pkt)); if (pkt) { memset(pkt, 0, sizeof(*pkt)); pkt->rtp.seq = seq; if (0 == rtp_queue_write(q, pkt)) free(pkt); } return 0; } static void rtp_queue_test2(void) { int i; uint16_t seq; rtp_queue_t* q; struct rtp_packet_t* pkt; static uint16_t s_seq[1000]; q = rtp_queue_create(100, 90000, rtp_packet_free, NULL); for(i = 0; i < sizeof(s_seq)/sizeof(s_seq[0]); i++) s_seq[i] = (uint16_t)(45000 + i); // 45460, 45461, 45462, 45464, 45465, 45466, ..., // 45490, 45491, 45492, 45503, 45504, 45505, 45463, // 45506, 45507, 45493, 45494, 45495, 45496, 45497, // 45498, 45499, 45500, 45501, 45502, 45508, 45509, ... memmove(s_seq + 463, s_seq + 464, sizeof(s_seq[0]) * (509 - 464)); // lost 45463 s_seq[492] = 45503; s_seq[493] = 45504; s_seq[494] = 45505; s_seq[495] = 45463; s_seq[496] = 45506; s_seq[497] = 45507; s_seq[498] = 45493; s_seq[499] = 45494; s_seq[500] = 45495; s_seq[501] = 45496; s_seq[502] = 45497; s_seq[503] = 45498; s_seq[504] = 45499; s_seq[505] = 45500; s_seq[506] = 45501; s_seq[507] = 45502; s_seq[508] = 45508; seq = s_seq[0]; for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++) { rtp_queue_packet(q, s_seq[i]); pkt = rtp_queue_read(q); if (pkt) { //printf("%u ", pkt->rtp.seq); assert(0 == pkt->rtp.seq - seq++); free(pkt); } } assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 11 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0); rtp_queue_destroy(q); } static void rtp_queue_test3(void) { int i; uint16_t seq; rtp_queue_t* q; struct rtp_packet_t* pkt; static uint16_t s_seq[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,12,13,14,15,16,17,18,19,20, 21,22, 31,32,33,34,35,36,37,38,39,40, 41,42,43,44,45, 23,24,25,26,27,28,29,30, 46,47,48,49,50 }; q = rtp_queue_create(100, 90000, rtp_packet_free, NULL); seq = s_seq[0]; for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++) { rtp_queue_packet(q, s_seq[i]); pkt = rtp_queue_read(q); if (pkt) { //printf("%u ", pkt->rtp.seq); assert(0 == pkt->rtp.seq - seq++); free(pkt); } } assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 8 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0); rtp_queue_destroy(q); } static void rtp_queue_test4(void) { int i; uint16_t seq; rtp_queue_t* q; struct rtp_packet_t* pkt; // first packet static uint16_t s_seq[] = { 1, 17,18,19,20, 21,22, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,12,13,14,15,16, 23,24,25,26,27,28,29,30, 31,32,33,34,35,36,37,38,39,40, 41,42,43,44,45,46,47,48,49,50 }; q = rtp_queue_create(100, 90000, rtp_packet_free, NULL); seq = s_seq[0]; for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++) { rtp_queue_packet(q, s_seq[i]); pkt = rtp_queue_read(q); if (pkt) { //printf("%u ", pkt->rtp.seq); assert(0 == pkt->rtp.seq - seq++); free(pkt); } } assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 15 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0); rtp_queue_destroy(q); } void rtp_queue_test(void) { int i; rtp_queue_t* q; struct rtp_packet_t* pkt; static uint16_t s_seq[] = { 836, 837, 859, 860, 822, 823, 824, 825, 826, 822, 830, 827, 831, 828, 829, 830, 832, 833, 834, 6000, 840, 841, 842, 843, 835, 836, 837, 838, 838, 844, 859, 811, }; rtp_queue_test2(); rtp_queue_test3(); rtp_queue_test4(); q = rtp_queue_create(100, 90000, rtp_packet_free, NULL); for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++) { rtp_queue_packet(q, s_seq[i]); rtp_queue_dump(q); pkt = rtp_queue_read(q); if (pkt) free(pkt); rtp_queue_dump(q); } assert(q->stats.total == sizeof(s_seq)/sizeof(s_seq[0]) && q->stats.lost == 0 && q->stats.bad == 1 && q->stats.duplicate == 1 && q->stats.late == 20 && q->stats.reorder == 6); rtp_queue_destroy(q); } #endif