From e54db1cd8b7ae506f2198cb4265fb1f3d2705af1 Mon Sep 17 00:00:00 2001 From: Leo Ma Date: Tue, 19 Apr 2016 18:01:33 +0800 Subject: [PATCH] Use ConcurrentLinkedQueue to reduce CPU overhead Since isEmpty method of ConcurrentLinkedQueue may take some time, and therefore it would not detect new elements offered, we have to wait a time out to ensure ConcurrentLinkedQueue can detect new elements. Signed-off-by: Leo Ma --- .../net/ossrs/sea/rtmp/io/WriteThread.java | 76 ++++++++++++------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java index bf37926..748c886 100644 --- a/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java +++ b/app/src/main/java/net/ossrs/sea/rtmp/io/WriteThread.java @@ -3,9 +3,8 @@ package net.ossrs.sea.rtmp.io; import java.io.IOException; import java.io.OutputStream; import java.net.SocketException; -import android.os.Handler; -import android.os.Looper; -import android.os.Message; +import java.util.Arrays; +import java.util.concurrent.ConcurrentLinkedQueue; import android.util.Log; import net.ossrs.sea.rtmp.packets.Command; import net.ossrs.sea.rtmp.packets.RtmpPacket; @@ -21,9 +20,10 @@ public class WriteThread extends Thread { private RtmpSessionInfo rtmpSessionInfo; private OutputStream out; - private Handler handler; + private ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue(); + private final Object txPacketLock = new Object(); + private volatile boolean active = true; private ThreadController threadController; - private Thread t; public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, ThreadController threadController) { super("RtmpWriteThread"); @@ -34,31 +34,43 @@ public class WriteThread extends Thread { @Override public void run() { - t = this; - Looper.prepare(); - handler = new Handler(Looper.myLooper()) { - @Override - public void handleMessage(Message msg) { - try { - RtmpPacket rtmpPacket = (RtmpPacket) msg.obj; - ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); + + while (active) { + try { + while (!writeQueue.isEmpty()) { + RtmpPacket rtmpPacket = writeQueue.poll(); + final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader()); rtmpPacket.writeTo(out, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo); - out.flush(); Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength()); if (rtmpPacket instanceof Command) { rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName()); } - } catch (SocketException se) { - Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down", se); - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(t, se); - } catch (IOException ex) { - Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down", ex); - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(t, ex); + } + out.flush(); + } catch (SocketException se) { + Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down", se); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se); + active = false; + continue; + } catch (IOException ioe) { + Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down", ioe); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe); + active = false; + continue; + } + + // Waiting for next packet + Log.d(TAG, "WriteThread: waiting..."); + synchronized (txPacketLock) { + try { + // isEmpty() may take some time, so time out should be set to wait next offer + txPacketLock.wait(1000); + } catch (InterruptedException ex) { + Log.w(TAG, "Interrupted", ex); } } - }; - Looper.loop(); + } // Close outputstream try { @@ -75,14 +87,26 @@ public class WriteThread extends Thread { /** Transmit the specified RTMP packet (thread-safe) */ public void send(RtmpPacket rtmpPacket) { if (rtmpPacket != null) { - Message msg = Message.obtain(); - msg.obj = rtmpPacket; - handler.sendMessage(msg); + writeQueue.offer(rtmpPacket); + } + synchronized (txPacketLock) { + txPacketLock.notify(); + } + } + + /** Transmit the specified RTMP packet (thread-safe) */ + public void send(RtmpPacket... rtmpPackets) { + writeQueue.addAll(Arrays.asList(rtmpPackets)); + synchronized (txPacketLock) { + txPacketLock.notify(); } } public void shutdown() { Log.d(TAG, "Stopping write thread..."); - Looper.myLooper().quit(); + active = false; + synchronized (txPacketLock) { + txPacketLock.notify(); + } } }