You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
12 KiB
C

// RFC2326 A.1 RTP Data Header Validity Checks
#include "rtp-queue.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#define MAX_PACKET 3000
#define RTP_MISORDER 300
#define RTP_DROPOUT 1000
#define RTP_SEQUENTIAL 3
#define RTP_SEQMOD (1 << 16)
#define MIN(a, b) ((a) < (b) ? (a) : (b))
#define MAX(a, b) ((a) > (b) ? (a) : (b))
struct rtp_item_t
{
struct rtp_packet_t* pkt;
// uint64_t clock;
};
struct rtp_queue_t
{
struct rtp_item_t* items;
int capacity;
int size;
int pos; // ring buffer read position
int probation;
int cycles;
uint16_t last_seq;
uint16_t first_seq;
int bad_count;
uint16_t bad_seq;
struct rtp_item_t bad_items[RTP_SEQUENTIAL+1];
int threshold;
int frequency;
void (*free)(void*, struct rtp_packet_t*);
void* param;
struct rtp_queue_stats_t stats;
};
static void rtp_queue_reset(struct rtp_queue_t* q);
static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq);
static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt);
struct rtp_queue_t* rtp_queue_create(int threshold, int frequency, void(*freepkt)(void*, struct rtp_packet_t*), void* param)
{
struct rtp_queue_t* q;
q = (struct rtp_queue_t*)calloc(1, sizeof(*q));
if(!q)
return NULL;
rtp_queue_reset(q);
q->probation = 1;
q->threshold = threshold;
q->frequency = frequency;
q->free = freepkt;
q->param = param;
return q;
}
int rtp_queue_destroy(struct rtp_queue_t* q)
{
rtp_queue_reset(q);
if (q->items)
{
assert(q->capacity > 0);
free(q->items);
q->items = 0;
}
free(q);
return 0;
}
static inline void rtp_queue_reset_bad_items(struct rtp_queue_t* q)
{
int i;
struct rtp_packet_t* pkt;
for (i = 0; i < q->bad_count; i++)
{
pkt = q->bad_items[i].pkt;
q->free(q->param, pkt);
}
q->bad_seq = 0;
q->bad_count = 0;
}
static void rtp_queue_reset(struct rtp_queue_t* q)
{
int i;
struct rtp_packet_t* pkt;
rtp_queue_reset_bad_items(q);
for (i = 0; i < q->size; i++)
{
pkt = q->items[(q->pos + i) % q->capacity].pkt;
q->free(q->param, pkt);
}
q->pos = 0;
q->size = 0;
q->probation = RTP_SEQUENTIAL;
}
static int rtp_queue_find(struct rtp_queue_t* q, uint16_t seq)
{
uint16_t v;
uint16_t vi;
int l, r, i;
l = q->pos;
r = q->pos + q->size;
v = q->last_seq - seq;
while (l < r)
{
i = (l + r) / 2;
vi = (uint16_t)q->last_seq - (uint16_t)q->items[i % q->capacity].pkt->rtp.seq;
if (vi == v)
{
return -1; // duplicate
}
else if (vi < v)
{
r = i;
}
else
{
assert(vi > v);
l = i + 1;
}
}
return l; // insert position
}
static int rtp_queue_insert(struct rtp_queue_t* q, int position, struct rtp_packet_t* pkt)
{
void* p;
int i, capacity;
assert(position >= q->pos && position <= q->pos + q->size);
if (q->size >= q->capacity)
{
if (q->size + 1 > MAX_PACKET)
return -E2BIG;
capacity = q->capacity + 250;
p = realloc(q->items, capacity * sizeof(struct rtp_item_t));
if (NULL == p)
return -ENOMEM;
q->items = (struct rtp_item_t*)p;
if (q->pos + q->size > q->capacity)
{
// move to tail
assert(q->pos < q->capacity);
memmove(&q->items[q->pos + capacity - q->capacity], &q->items[q->pos], (q->capacity - q->pos) * sizeof(struct rtp_item_t));
q->pos += capacity - q->capacity;
position += capacity - q->capacity;
}
q->capacity = capacity;
}
// move items
for (i = q->pos + q->size; i > position; i--)
memcpy(&q->items[i % q->capacity], &q->items[(i - 1) % q->capacity], sizeof(struct rtp_item_t));
q->items[position % q->capacity].pkt = pkt;
// q->items[position % q->capacity].clock = 0;
q->size++;
return 1;
}
/*
first last
^ ^
---too late---|------------------|----max drop---|-----another sequential---
--------------|------queue-------|-------------------------------------------->
*/
int rtp_queue_write(struct rtp_queue_t* q, struct rtp_packet_t* pkt)
{
int i, idx;
uint16_t delta;
q->stats.total++;
if (q->probation)
{
if (q->size > 0 && (uint16_t)pkt->rtp.seq == q->last_seq + 1)
{
if (0 == --q->probation)
q->first_seq = (uint16_t)q->items[q->pos].pkt->rtp.seq;
}
else if (q->size == 0 && q->probation == 1)
{
// init
q->first_seq = (uint16_t)pkt->rtp.seq;
--q->probation;
}
else
{
rtp_queue_reset(q);
}
q->last_seq = (uint16_t)pkt->rtp.seq;
return rtp_queue_insert(q, q->pos + q->size, pkt);
}
else
{
delta = (uint16_t)(pkt->rtp.seq - q->last_seq);
if (delta > 0 && delta < RTP_DROPOUT)
{
if (pkt->rtp.seq < q->last_seq)
q->cycles += RTP_SEQMOD;
rtp_queue_reset_bad_items(q);
q->last_seq = (uint16_t)pkt->rtp.seq;
return rtp_queue_insert(q, q->pos + q->size, pkt);
}
else if ( (int16_t)delta <= 0 && (int16_t)delta >= (int16_t)(q->first_seq - q->last_seq) )
{
// pkt->rtp.seq - q->first_seq < q->last_seq - q->first_seq
// duplicate or reordered packet
idx = rtp_queue_find(q, (uint16_t)pkt->rtp.seq);
if (-1 == idx)
{
++q->stats.duplicate;
return -1;
}
++q->stats.reorder;
rtp_queue_reset_bad_items(q);
return rtp_queue_insert(q, idx, pkt);
}
else if ((uint16_t)(q->first_seq - pkt->rtp.seq) < RTP_MISORDER)
{
// too late: pkt->req.seq < q->first_seq
++q->stats.late;
return -1;
}
else
{
if (q->bad_count > 0 && q->bad_seq == pkt->rtp.seq)
{
if (q->bad_count >= RTP_SEQUENTIAL)
{
// Two sequential packets -- assume that the other side
// restarted without telling us so just re-sync
// (i.e., pretend this was the first packet).
//rtp_queue_reset(q);
// copy saved items
for (i = 0; i < q->bad_count; i++)
rtp_queue_insert(q, q->pos + q->size, q->bad_items[i].pkt);
q->bad_count = 0;
q->last_seq = (uint16_t)pkt->rtp.seq;
return rtp_queue_insert(q, q->pos + q->size, pkt);
}
}
else
{
q->stats.bad++;
rtp_queue_reset_bad_items(q);
}
q->bad_seq = (pkt->rtp.seq + 1) % (RTP_SEQMOD-1);
q->bad_items[q->bad_count++].pkt = pkt;
return 1;
}
}
// for safety
assert(0);
return -1;
}
struct rtp_packet_t* rtp_queue_read(struct rtp_queue_t* q)
{
uint32_t threshold;
struct rtp_packet_t* pkt;
if (q->size < 1 || q->probation)
return NULL;
assert(q->pos < q->capacity);
pkt = q->items[q->pos].pkt;
if (q->first_seq == pkt->rtp.seq)
{
q->first_seq++;
q->size--;
q->pos = (q->pos + 1) % q->capacity;
return pkt;
}
else
{
threshold = (q->items[(q->pos + q->size - 1) % q->capacity].pkt->rtp.timestamp - pkt->rtp.timestamp);
threshold = (int32_t)threshold < 0 ? (uint32_t)(-(int32_t)threshold) : threshold; // fix h.264 b-frames pts order
threshold = (uint32_t)(((uint64_t)threshold) * 1000 / (uint64_t)q->frequency);
if (threshold < (uint32_t)q->threshold && q->size + 5 < MIN(RTP_DROPOUT, MAX_PACKET) )
return NULL;
q->stats.lost += pkt->rtp.seq - q->first_seq;
q->first_seq = (uint16_t)(pkt->rtp.seq + 1);
q->size--;
q->pos = (q->pos + 1) % q->capacity;
return pkt;
}
}
void rtp_queue_stats(struct rtp_queue_t* q, struct rtp_queue_stats_t* stats)
{
memcpy(stats, &q->stats, sizeof(*stats));
}
#if defined(_DEBUG) || defined(DEBUG)
#include <stdio.h>
static void rtp_queue_dump(struct rtp_queue_t* q)
{
int i;
printf("[%02d/%02d]: ", q->pos, q->size);
for (i = 0; i < q->size; i++)
{
printf("%u\t", (unsigned int)q->items[(i + q->pos) % q->capacity].pkt->rtp.seq);
}
printf("\n");
}
static void rtp_packet_free(void* param, struct rtp_packet_t* pkt)
{
free(pkt); (void)param;
}
static int rtp_queue_packet(rtp_queue_t* q, uint16_t seq)
{
struct rtp_packet_t* pkt;
pkt = (struct rtp_packet_t*)malloc(sizeof(*pkt));
if (pkt)
{
memset(pkt, 0, sizeof(*pkt));
pkt->rtp.seq = seq;
if (0 == rtp_queue_write(q, pkt))
free(pkt);
}
return 0;
}
static void rtp_queue_test2(void)
{
int i;
uint16_t seq;
rtp_queue_t* q;
struct rtp_packet_t* pkt;
static uint16_t s_seq[1000];
q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
for(i = 0; i < sizeof(s_seq)/sizeof(s_seq[0]); i++)
s_seq[i] = (uint16_t)(45000 + i);
// 45460, 45461, 45462, 45464, 45465, 45466, ...,
// 45490, 45491, 45492, 45503, 45504, 45505, 45463,
// 45506, 45507, 45493, 45494, 45495, 45496, 45497,
// 45498, 45499, 45500, 45501, 45502, 45508, 45509, ...
memmove(s_seq + 463, s_seq + 464, sizeof(s_seq[0]) * (509 - 464)); // lost 45463
s_seq[492] = 45503;
s_seq[493] = 45504;
s_seq[494] = 45505;
s_seq[495] = 45463;
s_seq[496] = 45506;
s_seq[497] = 45507;
s_seq[498] = 45493;
s_seq[499] = 45494;
s_seq[500] = 45495;
s_seq[501] = 45496;
s_seq[502] = 45497;
s_seq[503] = 45498;
s_seq[504] = 45499;
s_seq[505] = 45500;
s_seq[506] = 45501;
s_seq[507] = 45502;
s_seq[508] = 45508;
seq = s_seq[0];
for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
{
rtp_queue_packet(q, s_seq[i]);
pkt = rtp_queue_read(q);
if (pkt)
{
//printf("%u ", pkt->rtp.seq);
assert(0 == pkt->rtp.seq - seq++);
free(pkt);
}
}
assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 11 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0);
rtp_queue_destroy(q);
}
static void rtp_queue_test3(void)
{
int i;
uint16_t seq;
rtp_queue_t* q;
struct rtp_packet_t* pkt;
static uint16_t s_seq[] = {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11,12,13,14,15,16,17,18,19,20,
21,22,
31,32,33,34,35,36,37,38,39,40,
41,42,43,44,45,
23,24,25,26,27,28,29,30,
46,47,48,49,50
};
q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
seq = s_seq[0];
for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
{
rtp_queue_packet(q, s_seq[i]);
pkt = rtp_queue_read(q);
if (pkt)
{
//printf("%u ", pkt->rtp.seq);
assert(0 == pkt->rtp.seq - seq++);
free(pkt);
}
}
assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 8 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0);
rtp_queue_destroy(q);
}
static void rtp_queue_test4(void)
{
int i;
uint16_t seq;
rtp_queue_t* q;
struct rtp_packet_t* pkt;
// first packet
static uint16_t s_seq[] = {
1,
17,18,19,20,
21,22,
2, 3, 4, 5, 6, 7, 8, 9, 10,
11,12,13,14,15,16,
23,24,25,26,27,28,29,30,
31,32,33,34,35,36,37,38,39,40,
41,42,43,44,45,46,47,48,49,50
};
q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
seq = s_seq[0];
for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
{
rtp_queue_packet(q, s_seq[i]);
pkt = rtp_queue_read(q);
if (pkt)
{
//printf("%u ", pkt->rtp.seq);
assert(0 == pkt->rtp.seq - seq++);
free(pkt);
}
}
assert(q->stats.total == sizeof(s_seq) / sizeof(s_seq[0]) && q->stats.reorder == 15 && q->stats.lost == 0 && q->stats.bad == 0 && q->stats.duplicate == 0 && q->stats.late == 0);
rtp_queue_destroy(q);
}
void rtp_queue_test(void)
{
int i;
rtp_queue_t* q;
struct rtp_packet_t* pkt;
static uint16_t s_seq[] = {
836, 837, 859, 860, 822, 823, 824, 825,
826, 822, 830, 827, 831, 828, 829, 830,
832, 833, 834, 6000, 840, 841, 842, 843,
835, 836, 837, 838, 838, 844, 859, 811,
};
rtp_queue_test2();
rtp_queue_test3();
rtp_queue_test4();
q = rtp_queue_create(100, 90000, rtp_packet_free, NULL);
for (i = 0; i < sizeof(s_seq) / sizeof(s_seq[0]); i++)
{
rtp_queue_packet(q, s_seq[i]);
rtp_queue_dump(q);
pkt = rtp_queue_read(q);
if (pkt) free(pkt);
rtp_queue_dump(q);
}
assert(q->stats.total == sizeof(s_seq)/sizeof(s_seq[0]) && q->stats.lost == 0 && q->stats.bad == 1 && q->stats.duplicate == 1 && q->stats.late == 20 && q->stats.reorder == 6);
rtp_queue_destroy(q);
}
#endif