Replace Looper with Queue for less CPU overhead

Signed-off-by: Leo Ma <begeekmyfriend@gmail.com>
camera2
Leo Ma 9 years ago
parent 73c6c8e451
commit 9bc3bc2e73

@ -367,4 +367,10 @@ public class MainActivity extends Activity implements SurfaceHolder.Callback, Ca
public void surfaceDestroyed(SurfaceHolder arg0) { public void surfaceDestroyed(SurfaceHolder arg0) {
Log.d(TAG, "surfaceDestroyed"); Log.d(TAG, "surfaceDestroyed");
} }
@Override
protected void onDestroy() {
super.onDestroy();
stopPublish();
}
} }

@ -2,16 +2,12 @@ package net.ossrs.sea;
import android.media.MediaCodec; import android.media.MediaCodec;
import android.media.MediaFormat; import android.media.MediaFormat;
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log; import android.util.Log;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Comparator;
/** /**
* Created by winlin on 5/2/15. * Created by winlin on 5/2/15.
@ -46,22 +42,20 @@ import java.util.Comparator;
* muxer.release(); * muxer.release();
*/ */
public class SrsRtmp { public class SrsRtmp {
private boolean connected; private volatile boolean connected;
private SrsRtmpPublisher publisher; private SrsRtmpPublisher publisher;
private Thread worker; private Thread worker;
private Looper looper; private final Object txFrameLock = new Object();
private Handler handler;
private SrsFlv flv; private SrsFlv flv;
private boolean sequenceHeaderOk; private boolean sequenceHeaderOk;
private SrsFlvFrame videoSequenceHeader; private SrsFlvFrame videoSequenceHeader;
private SrsFlvFrame audioSequenceHeader; private SrsFlvFrame audioSequenceHeader;
// use cache queue to ensure audio and video monotonically increase.
private ArrayList<SrsFlvFrame> cache;
private int nb_videos; private int nb_videos;
private int nb_audios; private int nb_audios;
private ConcurrentLinkedQueue<SrsFlvFrame> frameCache = new ConcurrentLinkedQueue<SrsFlvFrame>();
private static final int VIDEO_TRACK = 100; private static final int VIDEO_TRACK = 100;
private static final int AUDIO_TRACK = 101; private static final int AUDIO_TRACK = 101;
@ -69,7 +63,7 @@ public class SrsRtmp {
/** /**
* constructor. * constructor.
* @param path the rtmp url to post to. * @param p the rtmp publisher.
*/ */
public SrsRtmp(SrsRtmpPublisher p) { public SrsRtmp(SrsRtmpPublisher p) {
nb_videos = 0; nb_videos = 0;
@ -77,7 +71,6 @@ public class SrsRtmp {
sequenceHeaderOk = false; sequenceHeaderOk = false;
connected = false; connected = false;
flv = new SrsFlv(); flv = new SrsFlv();
cache = new ArrayList<SrsFlvFrame>();
publisher = p; publisher = p;
} }
@ -140,11 +133,55 @@ public class SrsRtmp {
worker = new Thread(new Runnable() { worker = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
try { while (!Thread.interrupted()) {
cycle(); // Keep at least one audio and video frame in cache to ensure monotonically increasing.
} catch (IOException e) { while (!frameCache.isEmpty() && nb_videos > 1 && nb_audios > 1) {
Log.i(TAG, "worker: thread exception."); SrsFlvFrame frame = frameCache.poll();
e.printStackTrace(); try {
// only connect when got keyframe.
if (frame.is_keyframe()) {
connect();
}
// when sequence header required,
// adjust the dts by the current frame and sent it.
if (!sequenceHeaderOk) {
if (videoSequenceHeader != null) {
videoSequenceHeader.dts = frame.dts;
}
if (audioSequenceHeader != null) {
audioSequenceHeader.dts = frame.dts;
}
sendFlvTag(audioSequenceHeader);
sendFlvTag(videoSequenceHeader);
sequenceHeaderOk = true;
}
// try to send, ignore when not connected.
if (sequenceHeaderOk) {
sendFlvTag(frame);
}
// cache the sequence header.
if (frame.type == SrsCodecFlvTag.Video && frame.avc_aac_type == SrsCodecVideoAVCType.SequenceHeader) {
videoSequenceHeader = frame;
} else if (frame.type == SrsCodecFlvTag.Audio && frame.avc_aac_type == 0) {
audioSequenceHeader = frame;
}
} catch (IOException ioe) {
ioe.printStackTrace();
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(worker, ioe);
}
}
// Waiting for next frame
synchronized (txFrameLock) {
try {
// isEmpty() may take some time, so we set timeout to detect next frame
txFrameLock.wait(1000);
} catch (InterruptedException ie) {
worker.interrupt();
}
}
} }
} }
}); });
@ -168,21 +205,11 @@ public class SrsRtmp {
* stop the muxer, disconnect RTMP connection from SRS. * stop the muxer, disconnect RTMP connection from SRS.
*/ */
public void stop() throws IllegalStateException { public void stop() throws IllegalStateException {
clearCache();
if (worker == null || publisher == null) { if (worker == null || publisher == null) {
throw new IllegalStateException("Not init!"); throw new IllegalStateException("SrsRtmp Not init!");
} }
if (publisher != null) { disconnect();
publisher.closeStream();
publisher.shutdown();
connected = false;
}
if (looper != null) {
looper.quit();
}
if (worker != null) { if (worker != null) {
worker.interrupt(); worker.interrupt();
@ -228,7 +255,6 @@ public class SrsRtmp {
if (publisher != null) { if (publisher != null) {
publisher.closeStream(); publisher.closeStream();
publisher.shutdown(); publisher.shutdown();
publisher = null;
connected = false; connected = false;
} }
Log.i(TAG, "worker: disconnect SRS ok."); Log.i(TAG, "worker: disconnect SRS ok.");
@ -237,7 +263,6 @@ public class SrsRtmp {
private void clearCache() { private void clearCache() {
nb_videos = 0; nb_videos = 0;
nb_audios = 0; nb_audios = 0;
cache.clear();
sequenceHeaderOk = false; sequenceHeaderOk = false;
} }
@ -253,115 +278,26 @@ public class SrsRtmp {
clearCache(); clearCache();
} }
private void cycle() throws IOException { private void sendFlvTag(SrsFlvFrame frame) throws IllegalStateException, IOException {
// create the handler. if (!connected || frame == null || frame.tag.size <= 0) {
Looper.prepare();
looper = Looper.myLooper();
handler = new Handler(looper) {
@Override
public void handleMessage(Message msg) {
if (msg.what != SrsMessageType.FLV) {
Log.w(TAG, String.format("worker: drop unkown message, what=%d", msg.what));
return;
}
SrsFlvFrame frame = (SrsFlvFrame)msg.obj;
// only reconnect when got keyframe.
try {
// only connect when got keyframe.
if (frame.is_keyframe()) {
connect();
}
} catch (IOException e) {
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(worker, e);
}
try {
// when sequence header required,
// adjust the dts by the current frame and sent it.
if (!sequenceHeaderOk) {
if (videoSequenceHeader != null) {
videoSequenceHeader.dts = frame.dts;
}
if (audioSequenceHeader != null) {
audioSequenceHeader.dts = frame.dts;
}
sendFlvTag(audioSequenceHeader);
sendFlvTag(videoSequenceHeader);
sequenceHeaderOk = true;
}
// try to send, igore when not connected.
if (sequenceHeaderOk) {
sendFlvTag(frame);
}
// cache the sequence header.
if (frame.type == SrsCodecFlvTag.Video && frame.avc_aac_type == SrsCodecVideoAVCType.SequenceHeader) {
videoSequenceHeader = frame;
} else if (frame.type == SrsCodecFlvTag.Audio && frame.avc_aac_type == 0) {
audioSequenceHeader = frame;
}
} catch (IOException e) {
e.printStackTrace();
Log.e(TAG, String.format("worker: send flv tag failed, e=%s", e.getMessage()));
disconnect();
}
}
};
flv.setHandler(handler);
Looper.loop();
}
private void sendFlvTag(SrsFlvFrame frame) throws IOException {
if (frame == null) {
return;
}
if (frame.tag.size <= 0) {
return; return;
} }
if (frame.is_video()) { if (frame.is_video()) {
nb_videos++; nb_videos--;
} else if (frame.is_audio()) { if (publisher != null) {
nb_audios++; publisher.publishVideoData(frame.tag.frame.array(), frame.dts);
}
cache.add(frame);
// always keep one audio and one videos in cache.
if (nb_videos > 1 && nb_audios > 1) {
sendCachedFrames();
}
}
private void sendCachedFrames() throws IllegalStateException, IOException {
Collections.sort(cache, new Comparator<SrsFlvFrame>() {
@Override
public int compare(SrsFlvFrame lhs, SrsFlvFrame rhs) {
return lhs.dts - rhs.dts;
} }
}); } else if (frame.is_audio()) {
nb_audios--;
while (nb_videos > 1 && nb_audios > 1) { if (publisher != null) {
SrsFlvFrame frame = cache.remove(0); publisher.publishAudioData(frame.tag.frame.array(), frame.dts);
if (frame.is_video()) {
nb_videos--;
if (publisher != null) {
publisher.publishVideoData(frame.tag.frame.array(), frame.dts);
}
} else if (frame.is_audio()) {
nb_audios--;
if (publisher != null) {
publisher.publishAudioData(frame.tag.frame.array(), frame.dts);
}
} }
}
if (frame.is_keyframe()) { if (frame.is_keyframe()) {
Log.i(TAG, String.format("worker: send frame type=%d, dts=%d, size=%dB", Log.i(TAG, String.format("worker: send frame type=%d, dts=%d, size=%dB",
frame.type, frame.dts, frame.tag.size)); frame.type, frame.dts, frame.tag.size));
}
} }
} }
@ -935,7 +871,6 @@ public class SrsRtmp {
private int asample_rate; private int asample_rate;
private SrsUtils utils; private SrsUtils utils;
private Handler handler;
private SrsRawH264Stream avc; private SrsRawH264Stream avc;
private byte[] h264_sps; private byte[] h264_sps;
@ -943,7 +878,6 @@ public class SrsRtmp {
private byte[] h264_pps; private byte[] h264_pps;
private boolean h264_pps_changed; private boolean h264_pps_changed;
private boolean h264_sps_pps_sent; private boolean h264_sps_pps_sent;
private byte[] aac_specific_config; private byte[] aac_specific_config;
public SrsFlv() { public SrsFlv() {
@ -959,14 +893,6 @@ public class SrsRtmp {
aac_specific_config = null; aac_specific_config = null;
} }
/**
* set the handler to send message to work thread.
* @param h the handler to send the message.
*/
public void setHandler(Handler h) {
handler = h;
}
public void setVideoTrack(MediaFormat format) { public void setVideoTrack(MediaFormat format) {
videoTrack = format; videoTrack = format;
} }
@ -1183,17 +1109,15 @@ public class SrsRtmp {
frame.frame_type = frame_type; frame.frame_type = frame_type;
frame.avc_aac_type = avc_aac_type; frame.avc_aac_type = avc_aac_type;
// use handler to send the message. frameCache.add(frame);
// TODO: FIXME: we must wait for the handler to ready, for the sps/pps cannot be dropped. if (frame.is_video()) {
if (handler == null) { nb_videos++;
Log.w(TAG, "flv: drop frame for handler not ready."); } else if (frame.is_audio()) {
return; nb_audios++;
}
synchronized (txFrameLock) {
txFrameLock.notifyAll();
} }
Message msg = Message.obtain();
msg.what = SrsMessageType.FLV;
msg.obj = frame;
handler.sendMessage(msg);
//Log.i(TAG, String.format("flv: enqueue frame type=%d, dts=%d, size=%dB", frame.type, frame.dts, frame.tag.size)); //Log.i(TAG, String.format("flv: enqueue frame type=%d, dts=%d, size=%dB", frame.type, frame.dts, frame.tag.size));
} }
} }

@ -70,7 +70,7 @@ public class ReadThread extends Thread {
// Close inputstream // Close inputstream
try { try {
in.close(); in.close();
} catch (Exception ex) { } catch (IOException ex) {
Log.w(TAG, "Failed to close inputstream", ex); Log.w(TAG, "Failed to close inputstream", ex);
} }
Log.i(TAG, "exiting"); Log.i(TAG, "exiting");

@ -405,7 +405,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
if (socket != null) { if (socket != null) {
try { try {
socket.close(); socket.close();
} catch (Exception ex) { } catch (IOException ex) {
Log.w(TAG, "shutdown(): failed to close socket", ex); Log.w(TAG, "shutdown(): failed to close socket", ex);
} }
} }

@ -83,7 +83,7 @@ public class WriteThread extends Thread {
// Close outputstream // Close outputstream
try { try {
out.close(); out.close();
} catch (Exception ex) { } catch (IOException ex) {
Log.w(TAG, "WriteThread: Failed to close outputstream", ex); Log.w(TAG, "WriteThread: Failed to close outputstream", ex);
} }
Log.d(TAG, "exiting"); Log.d(TAG, "exiting");

Loading…
Cancel
Save