diff --git a/app/src/main/java/net/ossrs/yasea/SrsFlvMuxer.java b/app/src/main/java/net/ossrs/yasea/SrsFlvMuxer.java index 4e184a1..784271a 100644 --- a/app/src/main/java/net/ossrs/yasea/SrsFlvMuxer.java +++ b/app/src/main/java/net/ossrs/yasea/SrsFlvMuxer.java @@ -46,7 +46,6 @@ import net.ossrs.yasea.rtmp.RtmpPublisher; */ public class SrsFlvMuxer { private volatile boolean connected = false; - private String rtmpUrl; private SrsRtmpPublisher publisher; private Thread worker; @@ -116,14 +115,19 @@ public class SrsFlvMuxer { Log.i(TAG, "worker: disconnect SRS ok."); } - private void connect(String url) throws IllegalStateException, IOException { - if (!connected) { - Log.i(TAG, String.format("worker: connecting to RTMP server by url=%s\n", url)); - publisher.connect(url); - publisher.publish("live"); - Log.i(TAG, String.format("worker: connect to RTMP server by url=%s\n", url)); - connected = true; - sequenceHeaderOk = false; + private void connect(String url) { + try { + if (!connected) { + Log.i(TAG, String.format("worker: connecting to RTMP server by url=%s\n", url)); + publisher.connect(url); + publisher.publish("live"); + Log.i(TAG, String.format("worker: connect to RTMP server by url=%s\n", url)); + connected = true; + sequenceHeaderOk = false; + } + } catch (IOException ioe) { + ioe.printStackTrace(); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), ioe); } } @@ -147,18 +151,18 @@ public class SrsFlvMuxer { /** * start to the remote SRS for remux. */ - public void start(String url) throws IOException { - rtmpUrl = url; + public void start(final String rtmpUrl) throws IOException { worker = new Thread(new Runnable() { @Override public void run() { + connect(rtmpUrl); + while (!Thread.interrupted()) { // Keep at least one audio and video frame in cache to ensure monotonically increasing. while (!frameCache.isEmpty()) { SrsFlvFrame frame = frameCache.poll(); try { - connect(rtmpUrl); // when sequence header required, // adjust the dts by the current frame and sent it. if (!sequenceHeaderOk) { diff --git a/app/src/main/java/net/ossrs/yasea/rtmp/io/PacketRxHandler.java b/app/src/main/java/net/ossrs/yasea/rtmp/io/PacketRxHandler.java deleted file mode 100644 index 6d2328f..0000000 --- a/app/src/main/java/net/ossrs/yasea/rtmp/io/PacketRxHandler.java +++ /dev/null @@ -1,14 +0,0 @@ -package net.ossrs.yasea.rtmp.io; - -import net.ossrs.yasea.rtmp.packets.RtmpPacket; - -/** - * Handler interface for received RTMP packets - * @author francois - */ -public interface PacketRxHandler { - - public void handleRxPacket(RtmpPacket rtmpPacket); - - public void notifyWindowAckRequired(final int numBytesReadThusFar); -} diff --git a/app/src/main/java/net/ossrs/yasea/rtmp/io/ReadThread.java b/app/src/main/java/net/ossrs/yasea/rtmp/io/ReadThread.java index b475f06..7969bfa 100644 --- a/app/src/main/java/net/ossrs/yasea/rtmp/io/ReadThread.java +++ b/app/src/main/java/net/ossrs/yasea/rtmp/io/ReadThread.java @@ -20,9 +20,9 @@ public class ReadThread extends Thread { private RtmpDecoder rtmpDecoder; private InputStream in; - private PacketRxHandler packetRxHandler; + private RtmpConnection.PacketRxHandler packetRxHandler; - public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, PacketRxHandler packetRxHandler) { + public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, RtmpConnection.PacketRxHandler packetRxHandler) { super("RtmpReadThread"); this.in = in; this.packetRxHandler = packetRxHandler; diff --git a/app/src/main/java/net/ossrs/yasea/rtmp/io/RtmpConnection.java b/app/src/main/java/net/ossrs/yasea/rtmp/io/RtmpConnection.java index 96b78f8..12f6c56 100644 --- a/app/src/main/java/net/ossrs/yasea/rtmp/io/RtmpConnection.java +++ b/app/src/main/java/net/ossrs/yasea/rtmp/io/RtmpConnection.java @@ -8,6 +8,7 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; +import java.net.SocketException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -39,12 +40,30 @@ import net.ossrs.yasea.rtmp.packets.WindowAckSize; * * @author francois, leoma */ -public class RtmpConnection implements RtmpPublisher, PacketRxHandler { +public class RtmpConnection implements RtmpPublisher { private static final String TAG = "RtmpConnection"; private static final Pattern rtmpUrlPattern = Pattern.compile("^rtmp://([^/:]+)(:(\\d+))*/([^/]+)(/(.*))*$"); private RtmpPublisher.EventHandler mHandler; + private PacketRxHandler rxHandler = new PacketRxHandler() { + @Override + public void handleRxPacket(RtmpPacket rtmpPacket) { + if (rtmpPacket != null) { + rxPacketQueue.add(rtmpPacket); + } + synchronized (rxPacketLock) { + rxPacketLock.notify(); + } + } + + @Override + public void notifyWindowAckRequired(final int numBytesReadThusFar) { + Log.i(TAG, "notifyWindowAckRequired() called"); + // Create and send window bytes read acknowledgement + sendRtmpPacket(new Acknowledgement(numBytesReadThusFar)); + } + }; private String appName; private String streamName; private String publishType; @@ -53,8 +72,9 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { private String pageUrl; private Socket socket; private RtmpSessionInfo rtmpSessionInfo; + private BufferedInputStream inputStream; + private BufferedOutputStream outputStream; private ReadThread readThread; - private WriteThread writeThread; private final ConcurrentLinkedQueue rxPacketQueue = new ConcurrentLinkedQueue<>(); private final Object rxPacketLock = new Object(); private volatile boolean active = false; @@ -69,8 +89,11 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { private AmfString serverIpAddr; private AmfNumber serverPid; private AmfNumber serverId; + private String socketExceptionCause = ""; private int videoWidth; private int videoHeight; + private int videoFrameCount; + private long lastTimeMillis; public RtmpConnection(RtmpPublisher.EventHandler handler) { mHandler = handler; @@ -110,17 +133,15 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { socket = new Socket(); SocketAddress socketAddress = new InetSocketAddress(host, port); socket.connect(socketAddress, 3000); - BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); - BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream()); + inputStream = new BufferedInputStream(socket.getInputStream()); + outputStream = new BufferedOutputStream(socket.getOutputStream()); Log.d(TAG, "connect(): socket connection established, doing handhake..."); - handshake(in, out); + handshake(inputStream, outputStream); active = true; Log.d(TAG, "connect(): handshake done"); rtmpSessionInfo = new RtmpSessionInfo(); - readThread = new ReadThread(rtmpSessionInfo, in, this); - writeThread = new WriteThread(rtmpSessionInfo, out, this); + readThread = new ReadThread(rtmpSessionInfo, inputStream, rxHandler); readThread.start(); - writeThread.start(); // Start the "main" handling thread new Thread(new Runnable() { @@ -139,7 +160,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { rtmpConnect(); } - private void rtmpConnect() throws IOException, IllegalStateException { + private void rtmpConnect() throws IllegalStateException { if (fullyConnected || connecting) { throw new IllegalStateException("Already connected or connecting to RTMP server"); } @@ -164,7 +185,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { args.setProperty("pageUrl", pageUrl); args.setProperty("objectEncoding", 0); invoke.addData(args); - writeThread.send(invoke); + sendRtmpPacket(invoke); connecting = true; mHandler.onRtmpConnecting("connecting"); @@ -200,7 +221,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { releaseStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL); releaseStream.addData(new AmfNull()); // command object: null for "createStream" releaseStream.addData(streamName); // command object: null for "releaseStream" - writeThread.send(releaseStream); + sendRtmpPacket(releaseStream); Log.d(TAG, "createStream(): Sending FCPublish command..."); // transactionId == 3 @@ -208,14 +229,14 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { FCPublish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL); FCPublish.addData(new AmfNull()); // command object: null for "FCPublish" FCPublish.addData(streamName); - writeThread.send(FCPublish); + sendRtmpPacket(FCPublish); Log.d(TAG, "createStream(): Sending createStream command..."); ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL); // transactionId == 4 Command createStream = new Command("createStream", ++transactionIdCounter, chunkStreamInfo); createStream.addData(new AmfNull()); // command object: null for "createStream" - writeThread.send(createStream); + sendRtmpPacket(createStream); // Waiting for "NetStream.Publish.Start" response. synchronized (publishLock) { @@ -243,7 +264,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { publish.addData(new AmfNull()); // command object: null for "publish" publish.addData(streamName); publish.addData(publishType); - writeThread.send(publish); + sendRtmpPacket(publish); } private void onMetaData() throws IllegalStateException { @@ -270,7 +291,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { ecmaArray.setProperty("stereo", true); ecmaArray.setProperty("filesize", 0); metadata.addData(ecmaArray); - writeThread.send(metadata); + sendRtmpPacket(metadata); } @Override @@ -289,7 +310,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { closeStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL); closeStream.getHeader().setMessageStreamId(currentStreamId); closeStream.addData(new AmfNull()); - writeThread.send(closeStream); + sendRtmpPacket(closeStream); mHandler.onRtmpStopped("stopped"); } @@ -297,7 +318,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { public void shutdown() { if (active) { readThread.shutdown(); - writeThread.shutdown(); try { // It will invoke EOFException in read thread @@ -315,13 +335,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { readThread.interrupt(); } - try { - writeThread.join(); - } catch (InterruptedException ie) { - ie.printStackTrace(); - writeThread.interrupt(); - } - // shutdown handleRxPacketLoop rxPacketQueue.clear(); active = false; @@ -359,19 +372,13 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { currentStreamId = -1; transactionIdCounter = 0; videoFrameCacheNumber.set(0); + socketExceptionCause = ""; serverIpAddr = null; serverPid = null; serverId = null; rtmpSessionInfo = null; } - @Override - public void notifyWindowAckRequired(final int numBytesReadThusFar) { - Log.i(TAG, "notifyWindowAckRequired() called"); - // Create and send window bytes read acknowledgement - writeThread.send(new Acknowledgement(numBytesReadThusFar)); - } - @Override public void publishAudioData(byte[] data, int dts) throws IllegalStateException { if (!fullyConnected) { @@ -387,7 +394,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { audio.setData(data); audio.getHeader().setAbsoluteTimestamp(dts); audio.getHeader().setMessageStreamId(currentStreamId); - writeThread.send(audio); + sendRtmpPacket(audio); mHandler.onRtmpAudioStreaming("audio streaming"); } @@ -406,21 +413,61 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { video.setData(data); video.getHeader().setAbsoluteTimestamp(dts); video.getHeader().setMessageStreamId(currentStreamId); - writeThread.send(video); + sendRtmpPacket(video); videoFrameCacheNumber.getAndIncrement(); mHandler.onRtmpVideoStreaming("video streaming"); } - @Override - public void handleRxPacket(RtmpPacket rtmpPacket) { - if (rtmpPacket != null) { - rxPacketQueue.add(rtmpPacket); + /** Transmit the specified RTMP packet */ + public void sendRtmpPacket(RtmpPacket rtmpPacket) { + try { + ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); + chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader()); + if (!(rtmpPacket instanceof Video || rtmpPacket instanceof Audio)) { + rtmpPacket.getHeader().setAbsoluteTimestamp((int) chunkStreamInfo.markAbsoluteTimestampTx()); + } + rtmpPacket.writeTo(outputStream, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo); + Log.d(TAG, "wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength()); + if (rtmpPacket instanceof Command) { + rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName()); + } + if (rtmpPacket instanceof Video) { + getVideoFrameCacheNumber().getAndDecrement(); + calcFps(); + } + outputStream.flush(); + } catch (SocketException se) { + if (!socketExceptionCause.contentEquals(se.getMessage())) { + socketExceptionCause = se.getMessage(); + Log.e(TAG, "Caught SocketException during write loop, shutting down: " + se.getMessage()); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), se); + } + } catch (IOException ioe) { + Log.e(TAG, "Caught IOException during write loop, shutting down: " + ioe.getMessage()); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), ioe); } - synchronized (rxPacketLock) { - rxPacketLock.notify(); + } + + private void calcFps() { + if (videoFrameCount == 0) { + lastTimeMillis = System.nanoTime() / 1000000; + videoFrameCount++; + } else { + if (++videoFrameCount >= 48) { + long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis; + mHandler.onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis); + videoFrameCount = 0; + } } } + public interface PacketRxHandler { + + public void handleRxPacket(RtmpPacket rtmpPacket); + + public void notifyWindowAckRequired(final int numBytesReadThusFar); + } + private void handleRxPacketLoop() throws IOException { // Handle all queued received RTMP packets while (active) { @@ -438,7 +485,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { ChunkStreamInfo channelInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL); Log.d(TAG, "handleRxPacketLoop(): Sending PONG reply.."); UserControl pong = new UserControl(ping, channelInfo); - writeThread.send(pong); + sendRtmpPacket(pong); break; case STREAM_EOF: Log.i(TAG, "handleRxPacketLoop(): Stream EOF reached, closing RTMP writer..."); @@ -457,7 +504,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { int acknowledgementWindowsize = rtmpSessionInfo.getAcknowledgementWindowSize(); final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL); Log.d(TAG, "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize); - writeThread.send(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo)); + sendRtmpPacket(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo)); break; case COMMAND_AMF0: handleRxInvoke((Command) rtmpPacket); @@ -488,7 +535,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { Log.d(TAG, "handleRxInvoke: Got result for invoked method: " + method); if ("connect".equals(method)) { // Capture server ip/pid/id information if any - String serverInfo = onSrsServerInfo(invoke); + String serverInfo = onXmlyServerInfo(invoke); mHandler.onRtmpConnected("connected" + serverInfo); // We can now send createStream commands connecting = false; @@ -529,7 +576,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler { } } - private String onSrsServerInfo(Command invoke) { + private String onXmlyServerInfo(Command invoke) { // SRS server special information AmfObject objData = (AmfObject) invoke.getData().get(1); if ((objData).getProperty("data") instanceof AmfObject) { diff --git a/app/src/main/java/net/ossrs/yasea/rtmp/io/WriteThread.java b/app/src/main/java/net/ossrs/yasea/rtmp/io/WriteThread.java deleted file mode 100644 index a46c209..0000000 --- a/app/src/main/java/net/ossrs/yasea/rtmp/io/WriteThread.java +++ /dev/null @@ -1,124 +0,0 @@ -package net.ossrs.yasea.rtmp.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.SocketException; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import android.util.Log; - -import net.ossrs.yasea.rtmp.RtmpPublisher; -import net.ossrs.yasea.rtmp.packets.Audio; -import net.ossrs.yasea.rtmp.packets.Command; -import net.ossrs.yasea.rtmp.packets.RtmpPacket; -import net.ossrs.yasea.rtmp.packets.Video; - -/** - * RTMPConnection's write thread - * - * @author francois, leo - */ -public class WriteThread extends Thread { - - private static final String TAG = "WriteThread"; - - private RtmpSessionInfo rtmpSessionInfo; - private OutputStream out; - private ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue(); - private final Object txPacketLock = new Object(); - private volatile boolean active = true; - private int videoFrameCount; - private long lastTimeMillis; - private RtmpPublisher publisher; - - public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, RtmpPublisher publisher) { - super("RtmpWriteThread"); - this.rtmpSessionInfo = rtmpSessionInfo; - this.out = out; - this.publisher = publisher; - } - - @Override - public void run() { - - while (active) { - try { - while (!writeQueue.isEmpty()) { - RtmpPacket rtmpPacket = writeQueue.poll(); - ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId()); - chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader()); - if (!(rtmpPacket instanceof Video || rtmpPacket instanceof Audio)) { - rtmpPacket.getHeader().setAbsoluteTimestamp((int) chunkStreamInfo.markAbsoluteTimestampTx()); - } - rtmpPacket.writeTo(out, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo); - Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength()); - if (rtmpPacket instanceof Command) { - rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName()); - } - if (rtmpPacket instanceof Video) { - publisher.getVideoFrameCacheNumber().getAndDecrement(); - calcFps(); - } - } - out.flush(); - } catch (SocketException se) { - Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down: " + se.getMessage()); - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se); - active = false; - continue; - } catch (IOException ioe) { - Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down: " + ioe.getMessage()); - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe); - active = false; - continue; - } - - // Waiting for next packet - Log.d(TAG, "WriteThread: waiting..."); - synchronized (txPacketLock) { - try { - // isEmpty() may take some time, so time out should be set to wait next offer - txPacketLock.wait(500); - } catch (InterruptedException ex) { - Log.w(TAG, "Interrupted", ex); - this.interrupt(); - } - } - } - - Log.d(TAG, "exit"); - } - - /** Transmit the specified RTMP packet (thread-safe) */ - public void send(RtmpPacket rtmpPacket) { - if (rtmpPacket != null) { - writeQueue.add(rtmpPacket); - } - synchronized (txPacketLock) { - txPacketLock.notify(); - } - } - - public void shutdown() { - Log.d(TAG, "Stopping"); - writeQueue.clear(); - active = false; - synchronized (txPacketLock) { - txPacketLock.notify(); - } - } - - private void calcFps() { - if (videoFrameCount == 0) { - lastTimeMillis = System.nanoTime() / 1000000; - videoFrameCount++; - } else { - if (++videoFrameCount >= 48) { - long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis; - publisher.getEventHandler().onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis); - videoFrameCount = 0; - } - } - } -}