From 9bc3bc2e73d33f4826b54ecfe48242aee4a8f0fe Mon Sep 17 00:00:00 2001 From: Leo Ma Date: Wed, 20 Apr 2016 15:26:39 +0800 Subject: [PATCH] Replace Looper with Queue for less CPU overhead Signed-off-by: Leo Ma --- .../main/java/net/ossrs/sea/MainActivity.java | 6 + app/src/main/java/net/ossrs/sea/SrsRtmp.java | 232 ++++++------------ .../net/ossrs/sea/rtmp/io/ReadThread.java | 2 +- .../net/ossrs/sea/rtmp/io/RtmpConnection.java | 4 +- .../net/ossrs/sea/rtmp/io/WriteThread.java | 2 +- 5 files changed, 88 insertions(+), 158 deletions(-) diff --git a/app/src/main/java/net/ossrs/sea/MainActivity.java b/app/src/main/java/net/ossrs/sea/MainActivity.java index 32544a8..6833b74 100644 --- a/app/src/main/java/net/ossrs/sea/MainActivity.java +++ b/app/src/main/java/net/ossrs/sea/MainActivity.java @@ -367,4 +367,10 @@ public class MainActivity extends Activity implements SurfaceHolder.Callback, Ca public void surfaceDestroyed(SurfaceHolder arg0) { Log.d(TAG, "surfaceDestroyed"); } + + @Override + protected void onDestroy() { + super.onDestroy(); + stopPublish(); + } } \ No newline at end of file diff --git a/app/src/main/java/net/ossrs/sea/SrsRtmp.java b/app/src/main/java/net/ossrs/sea/SrsRtmp.java index 9c4a729..8380f54 100644 --- a/app/src/main/java/net/ossrs/sea/SrsRtmp.java +++ b/app/src/main/java/net/ossrs/sea/SrsRtmp.java @@ -2,16 +2,12 @@ package net.ossrs.sea; import android.media.MediaCodec; import android.media.MediaFormat; -import android.os.Handler; -import android.os.Looper; -import android.os.Message; import android.util.Log; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; +import java.util.concurrent.ConcurrentLinkedQueue; /** * Created by winlin on 5/2/15. @@ -46,22 +42,20 @@ import java.util.Comparator; * muxer.release(); */ public class SrsRtmp { - private boolean connected; + private volatile boolean connected; private SrsRtmpPublisher publisher; private Thread worker; - private Looper looper; - private Handler handler; + private final Object txFrameLock = new Object(); private SrsFlv flv; private boolean sequenceHeaderOk; private SrsFlvFrame videoSequenceHeader; private SrsFlvFrame audioSequenceHeader; - // use cache queue to ensure audio and video monotonically increase. - private ArrayList cache; private int nb_videos; private int nb_audios; + private ConcurrentLinkedQueue frameCache = new ConcurrentLinkedQueue(); private static final int VIDEO_TRACK = 100; private static final int AUDIO_TRACK = 101; @@ -69,7 +63,7 @@ public class SrsRtmp { /** * constructor. - * @param path the rtmp url to post to. + * @param p the rtmp publisher. */ public SrsRtmp(SrsRtmpPublisher p) { nb_videos = 0; @@ -77,7 +71,6 @@ public class SrsRtmp { sequenceHeaderOk = false; connected = false; flv = new SrsFlv(); - cache = new ArrayList(); publisher = p; } @@ -140,11 +133,55 @@ public class SrsRtmp { worker = new Thread(new Runnable() { @Override public void run() { - try { - cycle(); - } catch (IOException e) { - Log.i(TAG, "worker: thread exception."); - e.printStackTrace(); + while (!Thread.interrupted()) { + // Keep at least one audio and video frame in cache to ensure monotonically increasing. + while (!frameCache.isEmpty() && nb_videos > 1 && nb_audios > 1) { + SrsFlvFrame frame = frameCache.poll(); + try { + // only connect when got keyframe. + if (frame.is_keyframe()) { + connect(); + } + // when sequence header required, + // adjust the dts by the current frame and sent it. + if (!sequenceHeaderOk) { + if (videoSequenceHeader != null) { + videoSequenceHeader.dts = frame.dts; + } + if (audioSequenceHeader != null) { + audioSequenceHeader.dts = frame.dts; + } + + sendFlvTag(audioSequenceHeader); + sendFlvTag(videoSequenceHeader); + sequenceHeaderOk = true; + } + + // try to send, ignore when not connected. + if (sequenceHeaderOk) { + sendFlvTag(frame); + } + + // cache the sequence header. + if (frame.type == SrsCodecFlvTag.Video && frame.avc_aac_type == SrsCodecVideoAVCType.SequenceHeader) { + videoSequenceHeader = frame; + } else if (frame.type == SrsCodecFlvTag.Audio && frame.avc_aac_type == 0) { + audioSequenceHeader = frame; + } + } catch (IOException ioe) { + ioe.printStackTrace(); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(worker, ioe); + } + } + // Waiting for next frame + synchronized (txFrameLock) { + try { + // isEmpty() may take some time, so we set timeout to detect next frame + txFrameLock.wait(1000); + } catch (InterruptedException ie) { + worker.interrupt(); + } + } } } }); @@ -168,21 +205,11 @@ public class SrsRtmp { * stop the muxer, disconnect RTMP connection from SRS. */ public void stop() throws IllegalStateException { - clearCache(); - if (worker == null || publisher == null) { - throw new IllegalStateException("Not init!"); + throw new IllegalStateException("SrsRtmp Not init!"); } - if (publisher != null) { - publisher.closeStream(); - publisher.shutdown(); - connected = false; - } - - if (looper != null) { - looper.quit(); - } + disconnect(); if (worker != null) { worker.interrupt(); @@ -228,7 +255,6 @@ public class SrsRtmp { if (publisher != null) { publisher.closeStream(); publisher.shutdown(); - publisher = null; connected = false; } Log.i(TAG, "worker: disconnect SRS ok."); @@ -237,7 +263,6 @@ public class SrsRtmp { private void clearCache() { nb_videos = 0; nb_audios = 0; - cache.clear(); sequenceHeaderOk = false; } @@ -252,116 +277,27 @@ public class SrsRtmp { clearCache(); } - - private void cycle() throws IOException { - // create the handler. - Looper.prepare(); - looper = Looper.myLooper(); - handler = new Handler(looper) { - @Override - public void handleMessage(Message msg) { - if (msg.what != SrsMessageType.FLV) { - Log.w(TAG, String.format("worker: drop unkown message, what=%d", msg.what)); - return; - } - SrsFlvFrame frame = (SrsFlvFrame)msg.obj; - // only reconnect when got keyframe. - try { - // only connect when got keyframe. - if (frame.is_keyframe()) { - connect(); - } - } catch (IOException e) { - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(worker, e); - } - - try { - // when sequence header required, - // adjust the dts by the current frame and sent it. - if (!sequenceHeaderOk) { - if (videoSequenceHeader != null) { - videoSequenceHeader.dts = frame.dts; - } - if (audioSequenceHeader != null) { - audioSequenceHeader.dts = frame.dts; - } - - sendFlvTag(audioSequenceHeader); - sendFlvTag(videoSequenceHeader); - sequenceHeaderOk = true; - } - - // try to send, igore when not connected. - if (sequenceHeaderOk) { - sendFlvTag(frame); - } - - // cache the sequence header. - if (frame.type == SrsCodecFlvTag.Video && frame.avc_aac_type == SrsCodecVideoAVCType.SequenceHeader) { - videoSequenceHeader = frame; - } else if (frame.type == SrsCodecFlvTag.Audio && frame.avc_aac_type == 0) { - audioSequenceHeader = frame; - } - } catch (IOException e) { - e.printStackTrace(); - Log.e(TAG, String.format("worker: send flv tag failed, e=%s", e.getMessage())); - disconnect(); - } - } - }; - flv.setHandler(handler); - Looper.loop(); - } - - private void sendFlvTag(SrsFlvFrame frame) throws IOException { - if (frame == null) { - return; - } - - if (frame.tag.size <= 0) { + + private void sendFlvTag(SrsFlvFrame frame) throws IllegalStateException, IOException { + if (!connected || frame == null || frame.tag.size <= 0) { return; } if (frame.is_video()) { - nb_videos++; - } else if (frame.is_audio()) { - nb_audios++; - } - cache.add(frame); - - // always keep one audio and one videos in cache. - if (nb_videos > 1 && nb_audios > 1) { - sendCachedFrames(); - } - } - - private void sendCachedFrames() throws IllegalStateException, IOException { - Collections.sort(cache, new Comparator() { - @Override - public int compare(SrsFlvFrame lhs, SrsFlvFrame rhs) { - return lhs.dts - rhs.dts; + nb_videos--; + if (publisher != null) { + publisher.publishVideoData(frame.tag.frame.array(), frame.dts); } - }); - - while (nb_videos > 1 && nb_audios > 1) { - SrsFlvFrame frame = cache.remove(0); - - if (frame.is_video()) { - nb_videos--; - if (publisher != null) { - publisher.publishVideoData(frame.tag.frame.array(), frame.dts); - } - } else if (frame.is_audio()) { - nb_audios--; - if (publisher != null) { - publisher.publishAudioData(frame.tag.frame.array(), frame.dts); - } + } else if (frame.is_audio()) { + nb_audios--; + if (publisher != null) { + publisher.publishAudioData(frame.tag.frame.array(), frame.dts); } + } - if (frame.is_keyframe()) { - Log.i(TAG, String.format("worker: send frame type=%d, dts=%d, size=%dB", - frame.type, frame.dts, frame.tag.size)); - } + if (frame.is_keyframe()) { + Log.i(TAG, String.format("worker: send frame type=%d, dts=%d, size=%dB", + frame.type, frame.dts, frame.tag.size)); } } @@ -935,7 +871,6 @@ public class SrsRtmp { private int asample_rate; private SrsUtils utils; - private Handler handler; private SrsRawH264Stream avc; private byte[] h264_sps; @@ -943,7 +878,6 @@ public class SrsRtmp { private byte[] h264_pps; private boolean h264_pps_changed; private boolean h264_sps_pps_sent; - private byte[] aac_specific_config; public SrsFlv() { @@ -959,14 +893,6 @@ public class SrsRtmp { aac_specific_config = null; } - /** - * set the handler to send message to work thread. - * @param h the handler to send the message. - */ - public void setHandler(Handler h) { - handler = h; - } - public void setVideoTrack(MediaFormat format) { videoTrack = format; } @@ -1183,17 +1109,15 @@ public class SrsRtmp { frame.frame_type = frame_type; frame.avc_aac_type = avc_aac_type; - // use handler to send the message. - // TODO: FIXME: we must wait for the handler to ready, for the sps/pps cannot be dropped. - if (handler == null) { - Log.w(TAG, "flv: drop frame for handler not ready."); - return; + frameCache.add(frame); + if (frame.is_video()) { + nb_videos++; + } else if (frame.is_audio()) { + nb_audios++; + } + synchronized (txFrameLock) { + txFrameLock.notifyAll(); } - - Message msg = Message.obtain(); - msg.what = SrsMessageType.FLV; - msg.obj = frame; - handler.sendMessage(msg); //Log.i(TAG, String.format("flv: enqueue frame type=%d, dts=%d, size=%dB", frame.type, frame.dts, frame.tag.size)); } } diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/ReadThread.java b/app/src/main/java/net/ossrs/sea/rtmp/io/ReadThread.java index be97fa1..f699491 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/ReadThread.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/ReadThread.java @@ -70,7 +70,7 @@ public class ReadThread extends Thread { // Close inputstream try { in.close(); - } catch (Exception ex) { + } catch (IOException ex) { Log.w(TAG, "Failed to close inputstream", ex); } Log.i(TAG, "exiting"); diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java b/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java index ec8faac..67e8f96 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/RtmpConnection.java @@ -405,7 +405,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon if (socket != null) { try { socket.close(); - } catch (Exception ex) { + } catch (IOException ex) { Log.w(TAG, "shutdown(): failed to close socket", ex); } } @@ -456,4 +456,4 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon public final String getRtmpUrl() { return rtmpUrl; } -} \ No newline at end of file +} diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java index 9f9eabb..98acf37 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java @@ -83,7 +83,7 @@ public class WriteThread extends Thread { // Close outputstream try { out.close(); - } catch (Exception ex) { + } catch (IOException ex) { Log.w(TAG, "WriteThread: Failed to close outputstream", ex); } Log.d(TAG, "exiting");