From 73c6c8e451b3876309ecc43253d31ea7954c9a79 Mon Sep 17 00:00:00 2001 From: Leo Ma Date: Wed, 20 Apr 2016 10:30:51 +0800 Subject: [PATCH] Add video cached frame counting To check networking situation according to cached frame counting. Signed-off-by: Leo Ma --- .../main/java/net/ossrs/sea/SrsEncoder.java | 86 +++++++++++-------- app/src/main/java/net/ossrs/sea/SrsRtmp.java | 13 ++- .../java/net/ossrs/sea/SrsRtmpPublisher.java | 8 ++ .../net/ossrs/sea/rtmp/io/RtmpConnection.java | 17 +++- .../net/ossrs/sea/rtmp/io/WriteThread.java | 12 ++- 5 files changed, 87 insertions(+), 49 deletions(-) diff --git a/app/src/main/java/net/ossrs/sea/SrsEncoder.java b/app/src/main/java/net/ossrs/sea/SrsEncoder.java index d35b41a..df5ccc5 100644 --- a/app/src/main/java/net/ossrs/sea/SrsEncoder.java +++ b/app/src/main/java/net/ossrs/sea/SrsEncoder.java @@ -35,6 +35,7 @@ public class SrsEncoder { public static final int ABITRATE = 128 * 1000; // 128kbps private SrsRtmp muxer; + private SrsRtmpPublisher publisher; private MediaCodec vencoder; private MediaCodecInfo vmci; @@ -47,15 +48,15 @@ public class SrsEncoder { private byte[] mCroppedFrameBuffer; private boolean mCameraFaceFront = true; private long mPresentTimeUs; + private int vfmt_color; private int vtrack; - private int vcolor; private int atrack; public SrsEncoder() { - vcolor = chooseVideoEncoder(); - if (vcolor == MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar) { + vfmt_color = chooseVideoEncoder(); + if (vfmt_color == MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar) { VFORMAT = ImageFormat.YV12; - } else if (vcolor == MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420SemiPlanar) { + } else if (vfmt_color == MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420SemiPlanar) { VFORMAT = ImageFormat.NV21; } else { throw new IllegalStateException("Unsupported color format!"); @@ -66,7 +67,8 @@ public class SrsEncoder { } public int start() { - muxer = new SrsRtmp(rtmpUrl); + publisher = new SrsRtmpPublisher(rtmpUrl); + muxer = new SrsRtmp(publisher); try { muxer.start(); } catch (IOException e) { @@ -113,7 +115,7 @@ public class SrsEncoder { // setup the vencoder. // Note: landscape to portrait, 90 degree rotation, so we need to switch VWIDTH and VHEIGHT in configuration MediaFormat videoFormat = MediaFormat.createVideoFormat(MediaFormat.MIMETYPE_VIDEO_AVC, VENC_WIDTH, VENC_HEIGHT); - videoFormat.setInteger(MediaFormat.KEY_COLOR_FORMAT, vcolor); + videoFormat.setInteger(MediaFormat.KEY_COLOR_FORMAT, vfmt_color); videoFormat.setInteger(MediaFormat.KEY_MAX_INPUT_SIZE, 0); videoFormat.setInteger(MediaFormat.KEY_BIT_RATE, vbitrate); videoFormat.setInteger(MediaFormat.KEY_FRAME_RATE, VFPS); @@ -171,7 +173,7 @@ public class SrsEncoder { private int preProcessYuvFrame(byte[] data) { if (mCameraFaceFront) { - switch (vcolor) { + switch (vfmt_color) { case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar: flipYUV420PlannerFrame(data, mFlippedFrameBuffer, VWIDTH, VHEIGHT); rotateYUV420PlannerFrame(mFlippedFrameBuffer, mRotatedFrameBuffer, VWIDTH, VHEIGHT); @@ -188,7 +190,7 @@ public class SrsEncoder { throw new IllegalStateException("Unsupported color format!"); } } else { - switch (vcolor) { + switch (vfmt_color) { case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar: rotateYUV420PlannerFrame(data, mRotatedFrameBuffer, VWIDTH, VHEIGHT); cropYUV420PlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH, @@ -208,27 +210,30 @@ public class SrsEncoder { } public void onGetYuvFrame(byte[] data) { - preProcessYuvFrame(data); - ByteBuffer[] inBuffers = vencoder.getInputBuffers(); - ByteBuffer[] outBuffers = vencoder.getOutputBuffers(); - - int inBufferIndex = vencoder.dequeueInputBuffer(-1); - if (inBufferIndex >= 0) { - ByteBuffer bb = inBuffers[inBufferIndex]; - bb.clear(); - bb.put(mCroppedFrameBuffer, 0, mCroppedFrameBuffer.length); - long pts = System.nanoTime() / 1000 - mPresentTimeUs; - vencoder.queueInputBuffer(inBufferIndex, 0, mCroppedFrameBuffer.length, pts, 0); - } + // Check if the networking is good enough. + if (publisher.getVideoFrameCacheNumber() < 128) { + preProcessYuvFrame(data); + ByteBuffer[] inBuffers = vencoder.getInputBuffers(); + ByteBuffer[] outBuffers = vencoder.getOutputBuffers(); + + int inBufferIndex = vencoder.dequeueInputBuffer(-1); + if (inBufferIndex >= 0) { + ByteBuffer bb = inBuffers[inBufferIndex]; + bb.clear(); + bb.put(mCroppedFrameBuffer, 0, mCroppedFrameBuffer.length); + long pts = System.nanoTime() / 1000 - mPresentTimeUs; + vencoder.queueInputBuffer(inBufferIndex, 0, mCroppedFrameBuffer.length, pts, 0); + } - for (; ; ) { - int outBufferIndex = vencoder.dequeueOutputBuffer(vebi, 0); - if (outBufferIndex >= 0) { - ByteBuffer bb = outBuffers[outBufferIndex]; - onEncodedAnnexbFrame(bb, vebi); - vencoder.releaseOutputBuffer(outBufferIndex, false); - } else { - break; + for (; ; ) { + int outBufferIndex = vencoder.dequeueOutputBuffer(vebi, 0); + if (outBufferIndex >= 0) { + ByteBuffer bb = outBuffers[outBufferIndex]; + onEncodedAnnexbFrame(bb, vebi); + vencoder.releaseOutputBuffer(outBufferIndex, false); + } else { + break; + } } } } @@ -278,12 +283,15 @@ public class SrsEncoder { // NV12 -> YUV420SP yyyy*2 uv uv // NV16 -> YUV422SP yyyy uv uv // YUY2 -> YUV422SP yuyv yuyv - private byte[] cropYUV420SemiPlannerFrame(byte[] input, int iw, int ih, byte[] output, int ow, int oh) { - assert(iw >= ow && ih >= oh); - // Note: the stride of resolution must be set as 16x for hard encoding with some chip like MTK - // Since Y component is quadruple size as U and V component, the stride must be set as 32x - assert(ow % 32 == 0 && oh % 32 == 0); + if (iw < ow || ih < oh) { + throw new AssertionError(); + } + if (ow % 32 != 0 || oh % 32 != 0) { + // Note: the stride of resolution must be set as 16x for hard encoding with some chip like MTK + // Since Y component is quadruple size as U and V component, the stride must be set as 32x + throw new AssertionError(); + } int iFrameSize = iw * ih; int oFrameSize = ow * oh; @@ -308,10 +316,14 @@ public class SrsEncoder { } private byte[] cropYUV420PlannerFrame(byte[] input, int iw, int ih, byte[] output, int ow, int oh) { - assert(iw >= ow && ih >= oh); - // Note: the stride of resolution must be set as 16x for hard encoding with some chip like MTK - // Since Y component is quadruple size as U and V component, the stride must be set as 32x - assert(ow % 32 == 0 && oh % 32 == 0); + if (iw < ow || ih < oh) { + throw new AssertionError(); + } + if (ow % 32 != 0 || oh % 32 != 0) { + // Note: the stride of resolution must be set as 16x for hard encoding with some chip like MTK + // Since Y component is quadruple size as U and V component, the stride must be set as 32x + throw new AssertionError(); + } int iFrameSize = iw * ih; int iQFrameSize = iFrameSize / 4; diff --git a/app/src/main/java/net/ossrs/sea/SrsRtmp.java b/app/src/main/java/net/ossrs/sea/SrsRtmp.java index de1a8a0..9c4a729 100644 --- a/app/src/main/java/net/ossrs/sea/SrsRtmp.java +++ b/app/src/main/java/net/ossrs/sea/SrsRtmp.java @@ -46,7 +46,6 @@ import java.util.Comparator; * muxer.release(); */ public class SrsRtmp { - private String url; private boolean connected; private SrsRtmpPublisher publisher; @@ -72,14 +71,14 @@ public class SrsRtmp { * constructor. * @param path the rtmp url to post to. */ - public SrsRtmp(String path) { + public SrsRtmp(SrsRtmpPublisher p) { nb_videos = 0; nb_audios = 0; sequenceHeaderOk = false; connected = false; - url = path; flv = new SrsFlv(); cache = new ArrayList(); + publisher = p; } /** @@ -162,7 +161,6 @@ public class SrsRtmp { stop(); } catch (IllegalStateException e) { // Ignore illegal state. - //Log.e(TAG, String.format("worker: stop failed. e=%s", e.getMessage())); } } @@ -198,7 +196,7 @@ public class SrsRtmp { publisher = null; } - Log.i(TAG, String.format("worker: muxer closed, url=%s", url)); + Log.i(TAG, String.format("worker: muxer closed, url=%s", publisher.getRtmpUrl())); } /** @@ -245,11 +243,10 @@ public class SrsRtmp { private void connect() throws IllegalStateException, IOException { if (!connected) { - Log.i(TAG, String.format("worker: connecting to RTMP server by url=%s\n", url)); - publisher = new SrsRtmpPublisher(url); + Log.i(TAG, String.format("worker: connecting to RTMP server by url=%s\n", publisher.getRtmpUrl())); publisher.connect(); publisher.publish("live"); - Log.i(TAG, String.format("worker: connect to RTMP server by url=%s\n", url)); + Log.i(TAG, String.format("worker: connect to RTMP server by url=%s\n", publisher.getRtmpUrl())); connected = true; } diff --git a/app/src/main/java/net/ossrs/sea/SrsRtmpPublisher.java b/app/src/main/java/net/ossrs/sea/SrsRtmpPublisher.java index 6a2721e..f63e393 100644 --- a/app/src/main/java/net/ossrs/sea/SrsRtmpPublisher.java +++ b/app/src/main/java/net/ossrs/sea/SrsRtmpPublisher.java @@ -66,4 +66,12 @@ public class SrsRtmpPublisher implements RtmpPublisher { } rtmpConnection.publishAudioData(data, dts); } + + public final int getVideoFrameCacheNumber() { + return ((RtmpConnection) rtmpConnection).getVideoFrameCacheNumber(); + } + + public final String getRtmpUrl() { + return ((RtmpConnection) rtmpConnection).getRtmpUrl(); + } } diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java b/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java index fb0f8da..ec8faac 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java @@ -13,6 +13,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.concurrent.atomic.AtomicInteger; import android.util.Log; import net.ossrs.sea.rtmp.RtmpPublisher; import net.ossrs.sea.rtmp.amf.AmfNull; @@ -42,6 +43,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon private String host; private String streamName; private String publishType; + private String rtmpUrl = ""; private String swfUrl = ""; private String tcUrl = ""; private String pageUrl = ""; @@ -58,10 +60,12 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon private final Object connectingLock = new Object(); private final Object publishLock = new Object(); private volatile boolean connecting = false; + private AtomicInteger videoFrameCacheNumber = new AtomicInteger(0); private int currentStreamId = -1; public RtmpConnection(String url) { - this.tcUrl = url.substring(0, url.lastIndexOf('/')); + rtmpUrl = url; + this.tcUrl = rtmpUrl.substring(0, url.lastIndexOf('/')); Matcher matcher = rtmpUrlPattern.matcher(url); if (matcher.matches()) { this.host = matcher.group(1); @@ -89,7 +93,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon active = true; Log.d(TAG, "connect(): handshake done"); ReadThread readThread = new ReadThread(rtmpSessionInfo, in, this, this); - writeThread = new WriteThread(rtmpSessionInfo, out, this); + writeThread = new WriteThread(rtmpSessionInfo, out, videoFrameCacheNumber, this); readThread.start(); writeThread.start(); @@ -427,6 +431,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon video.getHeader().setMessageStreamId(currentStreamId); video.getHeader().setAbsoluteTimestamp(dts); writeThread.send(video); + videoFrameCacheNumber.getAndIncrement(); } @Override @@ -443,4 +448,12 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon audio.getHeader().setAbsoluteTimestamp(dts); writeThread.send(audio); } + + public final int getVideoFrameCacheNumber() { + return videoFrameCacheNumber.get(); + } + + public final String getRtmpUrl() { + return rtmpUrl; + } } \ No newline at end of file diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java index 748c886..9f9eabb 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java @@ -5,9 +5,12 @@ import java.io.OutputStream; import java.net.SocketException; import java.util.Arrays; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + import android.util.Log; import net.ossrs.sea.rtmp.packets.Command; import net.ossrs.sea.rtmp.packets.RtmpPacket; +import net.ossrs.sea.rtmp.packets.Video; /** * RTMPConnection's write thread @@ -23,12 +26,14 @@ public class WriteThread extends Thread { private ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue(); private final Object txPacketLock = new Object(); private volatile boolean active = true; + private AtomicInteger videoFrameCacheNumber; private ThreadController threadController; - public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, ThreadController threadController) { + public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, AtomicInteger i, ThreadController threadController) { super("RtmpWriteThread"); this.rtmpSessionInfo = rtmpSessionInfo; this.out = out; + this.videoFrameCacheNumber = i; this.threadController = threadController; } @@ -39,13 +44,16 @@ public class WriteThread extends Thread { try { while (!writeQueue.isEmpty()) { RtmpPacket rtmpPacket = writeQueue.poll(); - final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); + ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader()); rtmpPacket.writeTo(out, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo); Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength()); if (rtmpPacket instanceof Command) { rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName()); } + if (rtmpPacket instanceof Video) { + videoFrameCacheNumber.getAndDecrement(); + } } out.flush(); } catch (SocketException se) {