Optimize RTMP writer thread

Merge RTMP writer thread into FLV muxer thread as well as connection thread.
And seperate connection method from the frame cache fetch loop.

Signed-off-by: Leo Ma <begeekmyfriend@gmail.com>
camera2
Leo Ma 9 years ago
parent 9339e9d544
commit b1f35ffe89

@ -46,7 +46,6 @@ import net.ossrs.yasea.rtmp.RtmpPublisher;
*/
public class SrsFlvMuxer {
private volatile boolean connected = false;
private String rtmpUrl;
private SrsRtmpPublisher publisher;
private Thread worker;
@ -116,7 +115,8 @@ public class SrsFlvMuxer {
Log.i(TAG, "worker: disconnect SRS ok.");
}
private void connect(String url) throws IllegalStateException, IOException {
private void connect(String url) {
try {
if (!connected) {
Log.i(TAG, String.format("worker: connecting to RTMP server by url=%s\n", url));
publisher.connect(url);
@ -125,6 +125,10 @@ public class SrsFlvMuxer {
connected = true;
sequenceHeaderOk = false;
}
} catch (IOException ioe) {
ioe.printStackTrace();
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), ioe);
}
}
private void sendFlvTag(SrsFlvFrame frame) throws IllegalStateException, IOException {
@ -147,18 +151,18 @@ public class SrsFlvMuxer {
/**
* start to the remote SRS for remux.
*/
public void start(String url) throws IOException {
rtmpUrl = url;
public void start(final String rtmpUrl) throws IOException {
worker = new Thread(new Runnable() {
@Override
public void run() {
connect(rtmpUrl);
while (!Thread.interrupted()) {
// Keep at least one audio and video frame in cache to ensure monotonically increasing.
while (!frameCache.isEmpty()) {
SrsFlvFrame frame = frameCache.poll();
try {
connect(rtmpUrl);
// when sequence header required,
// adjust the dts by the current frame and sent it.
if (!sequenceHeaderOk) {

@ -1,14 +0,0 @@
package net.ossrs.yasea.rtmp.io;
import net.ossrs.yasea.rtmp.packets.RtmpPacket;
/**
* Handler interface for received RTMP packets
* @author francois
*/
public interface PacketRxHandler {
public void handleRxPacket(RtmpPacket rtmpPacket);
public void notifyWindowAckRequired(final int numBytesReadThusFar);
}

@ -20,9 +20,9 @@ public class ReadThread extends Thread {
private RtmpDecoder rtmpDecoder;
private InputStream in;
private PacketRxHandler packetRxHandler;
private RtmpConnection.PacketRxHandler packetRxHandler;
public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, PacketRxHandler packetRxHandler) {
public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, RtmpConnection.PacketRxHandler packetRxHandler) {
super("RtmpReadThread");
this.in = in;
this.packetRxHandler = packetRxHandler;

@ -8,6 +8,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -39,12 +40,30 @@ import net.ossrs.yasea.rtmp.packets.WindowAckSize;
*
* @author francois, leoma
*/
public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
public class RtmpConnection implements RtmpPublisher {
private static final String TAG = "RtmpConnection";
private static final Pattern rtmpUrlPattern = Pattern.compile("^rtmp://([^/:]+)(:(\\d+))*/([^/]+)(/(.*))*$");
private RtmpPublisher.EventHandler mHandler;
private PacketRxHandler rxHandler = new PacketRxHandler() {
@Override
public void handleRxPacket(RtmpPacket rtmpPacket) {
if (rtmpPacket != null) {
rxPacketQueue.add(rtmpPacket);
}
synchronized (rxPacketLock) {
rxPacketLock.notify();
}
}
@Override
public void notifyWindowAckRequired(final int numBytesReadThusFar) {
Log.i(TAG, "notifyWindowAckRequired() called");
// Create and send window bytes read acknowledgement
sendRtmpPacket(new Acknowledgement(numBytesReadThusFar));
}
};
private String appName;
private String streamName;
private String publishType;
@ -53,8 +72,9 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
private String pageUrl;
private Socket socket;
private RtmpSessionInfo rtmpSessionInfo;
private BufferedInputStream inputStream;
private BufferedOutputStream outputStream;
private ReadThread readThread;
private WriteThread writeThread;
private final ConcurrentLinkedQueue<RtmpPacket> rxPacketQueue = new ConcurrentLinkedQueue<>();
private final Object rxPacketLock = new Object();
private volatile boolean active = false;
@ -69,8 +89,11 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
private AmfString serverIpAddr;
private AmfNumber serverPid;
private AmfNumber serverId;
private String socketExceptionCause = "";
private int videoWidth;
private int videoHeight;
private int videoFrameCount;
private long lastTimeMillis;
public RtmpConnection(RtmpPublisher.EventHandler handler) {
mHandler = handler;
@ -110,17 +133,15 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
socket = new Socket();
SocketAddress socketAddress = new InetSocketAddress(host, port);
socket.connect(socketAddress, 3000);
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream());
inputStream = new BufferedInputStream(socket.getInputStream());
outputStream = new BufferedOutputStream(socket.getOutputStream());
Log.d(TAG, "connect(): socket connection established, doing handhake...");
handshake(in, out);
handshake(inputStream, outputStream);
active = true;
Log.d(TAG, "connect(): handshake done");
rtmpSessionInfo = new RtmpSessionInfo();
readThread = new ReadThread(rtmpSessionInfo, in, this);
writeThread = new WriteThread(rtmpSessionInfo, out, this);
readThread = new ReadThread(rtmpSessionInfo, inputStream, rxHandler);
readThread.start();
writeThread.start();
// Start the "main" handling thread
new Thread(new Runnable() {
@ -139,7 +160,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
rtmpConnect();
}
private void rtmpConnect() throws IOException, IllegalStateException {
private void rtmpConnect() throws IllegalStateException {
if (fullyConnected || connecting) {
throw new IllegalStateException("Already connected or connecting to RTMP server");
}
@ -164,7 +185,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
args.setProperty("pageUrl", pageUrl);
args.setProperty("objectEncoding", 0);
invoke.addData(args);
writeThread.send(invoke);
sendRtmpPacket(invoke);
connecting = true;
mHandler.onRtmpConnecting("connecting");
@ -200,7 +221,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
releaseStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
releaseStream.addData(new AmfNull()); // command object: null for "createStream"
releaseStream.addData(streamName); // command object: null for "releaseStream"
writeThread.send(releaseStream);
sendRtmpPacket(releaseStream);
Log.d(TAG, "createStream(): Sending FCPublish command...");
// transactionId == 3
@ -208,14 +229,14 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
FCPublish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
FCPublish.addData(new AmfNull()); // command object: null for "FCPublish"
FCPublish.addData(streamName);
writeThread.send(FCPublish);
sendRtmpPacket(FCPublish);
Log.d(TAG, "createStream(): Sending createStream command...");
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL);
// transactionId == 4
Command createStream = new Command("createStream", ++transactionIdCounter, chunkStreamInfo);
createStream.addData(new AmfNull()); // command object: null for "createStream"
writeThread.send(createStream);
sendRtmpPacket(createStream);
// Waiting for "NetStream.Publish.Start" response.
synchronized (publishLock) {
@ -243,7 +264,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
publish.addData(new AmfNull()); // command object: null for "publish"
publish.addData(streamName);
publish.addData(publishType);
writeThread.send(publish);
sendRtmpPacket(publish);
}
private void onMetaData() throws IllegalStateException {
@ -270,7 +291,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
ecmaArray.setProperty("stereo", true);
ecmaArray.setProperty("filesize", 0);
metadata.addData(ecmaArray);
writeThread.send(metadata);
sendRtmpPacket(metadata);
}
@Override
@ -289,7 +310,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
closeStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
closeStream.getHeader().setMessageStreamId(currentStreamId);
closeStream.addData(new AmfNull());
writeThread.send(closeStream);
sendRtmpPacket(closeStream);
mHandler.onRtmpStopped("stopped");
}
@ -297,7 +318,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
public void shutdown() {
if (active) {
readThread.shutdown();
writeThread.shutdown();
try {
// It will invoke EOFException in read thread
@ -315,13 +335,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
readThread.interrupt();
}
try {
writeThread.join();
} catch (InterruptedException ie) {
ie.printStackTrace();
writeThread.interrupt();
}
// shutdown handleRxPacketLoop
rxPacketQueue.clear();
active = false;
@ -359,19 +372,13 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
currentStreamId = -1;
transactionIdCounter = 0;
videoFrameCacheNumber.set(0);
socketExceptionCause = "";
serverIpAddr = null;
serverPid = null;
serverId = null;
rtmpSessionInfo = null;
}
@Override
public void notifyWindowAckRequired(final int numBytesReadThusFar) {
Log.i(TAG, "notifyWindowAckRequired() called");
// Create and send window bytes read acknowledgement
writeThread.send(new Acknowledgement(numBytesReadThusFar));
}
@Override
public void publishAudioData(byte[] data, int dts) throws IllegalStateException {
if (!fullyConnected) {
@ -387,7 +394,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
audio.setData(data);
audio.getHeader().setAbsoluteTimestamp(dts);
audio.getHeader().setMessageStreamId(currentStreamId);
writeThread.send(audio);
sendRtmpPacket(audio);
mHandler.onRtmpAudioStreaming("audio streaming");
}
@ -406,20 +413,60 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
video.setData(data);
video.getHeader().setAbsoluteTimestamp(dts);
video.getHeader().setMessageStreamId(currentStreamId);
writeThread.send(video);
sendRtmpPacket(video);
videoFrameCacheNumber.getAndIncrement();
mHandler.onRtmpVideoStreaming("video streaming");
}
@Override
public void handleRxPacket(RtmpPacket rtmpPacket) {
if (rtmpPacket != null) {
rxPacketQueue.add(rtmpPacket);
/** Transmit the specified RTMP packet */
public void sendRtmpPacket(RtmpPacket rtmpPacket) {
try {
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId());
chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader());
if (!(rtmpPacket instanceof Video || rtmpPacket instanceof Audio)) {
rtmpPacket.getHeader().setAbsoluteTimestamp((int) chunkStreamInfo.markAbsoluteTimestampTx());
}
rtmpPacket.writeTo(outputStream, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo);
Log.d(TAG, "wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength());
if (rtmpPacket instanceof Command) {
rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName());
}
if (rtmpPacket instanceof Video) {
getVideoFrameCacheNumber().getAndDecrement();
calcFps();
}
outputStream.flush();
} catch (SocketException se) {
if (!socketExceptionCause.contentEquals(se.getMessage())) {
socketExceptionCause = se.getMessage();
Log.e(TAG, "Caught SocketException during write loop, shutting down: " + se.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), se);
}
synchronized (rxPacketLock) {
rxPacketLock.notify();
} catch (IOException ioe) {
Log.e(TAG, "Caught IOException during write loop, shutting down: " + ioe.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), ioe);
}
}
private void calcFps() {
if (videoFrameCount == 0) {
lastTimeMillis = System.nanoTime() / 1000000;
videoFrameCount++;
} else {
if (++videoFrameCount >= 48) {
long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis;
mHandler.onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis);
videoFrameCount = 0;
}
}
}
public interface PacketRxHandler {
public void handleRxPacket(RtmpPacket rtmpPacket);
public void notifyWindowAckRequired(final int numBytesReadThusFar);
}
private void handleRxPacketLoop() throws IOException {
// Handle all queued received RTMP packets
@ -438,7 +485,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
ChunkStreamInfo channelInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL);
Log.d(TAG, "handleRxPacketLoop(): Sending PONG reply..");
UserControl pong = new UserControl(ping, channelInfo);
writeThread.send(pong);
sendRtmpPacket(pong);
break;
case STREAM_EOF:
Log.i(TAG, "handleRxPacketLoop(): Stream EOF reached, closing RTMP writer...");
@ -457,7 +504,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
int acknowledgementWindowsize = rtmpSessionInfo.getAcknowledgementWindowSize();
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL);
Log.d(TAG, "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize);
writeThread.send(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo));
sendRtmpPacket(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo));
break;
case COMMAND_AMF0:
handleRxInvoke((Command) rtmpPacket);
@ -488,7 +535,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
Log.d(TAG, "handleRxInvoke: Got result for invoked method: " + method);
if ("connect".equals(method)) {
// Capture server ip/pid/id information if any
String serverInfo = onSrsServerInfo(invoke);
String serverInfo = onXmlyServerInfo(invoke);
mHandler.onRtmpConnected("connected" + serverInfo);
// We can now send createStream commands
connecting = false;
@ -529,7 +576,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
}
}
private String onSrsServerInfo(Command invoke) {
private String onXmlyServerInfo(Command invoke) {
// SRS server special information
AmfObject objData = (AmfObject) invoke.getData().get(1);
if ((objData).getProperty("data") instanceof AmfObject) {

@ -1,124 +0,0 @@
package net.ossrs.yasea.rtmp.io;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import android.util.Log;
import net.ossrs.yasea.rtmp.RtmpPublisher;
import net.ossrs.yasea.rtmp.packets.Audio;
import net.ossrs.yasea.rtmp.packets.Command;
import net.ossrs.yasea.rtmp.packets.RtmpPacket;
import net.ossrs.yasea.rtmp.packets.Video;
/**
* RTMPConnection's write thread
*
* @author francois, leo
*/
public class WriteThread extends Thread {
private static final String TAG = "WriteThread";
private RtmpSessionInfo rtmpSessionInfo;
private OutputStream out;
private ConcurrentLinkedQueue<RtmpPacket> writeQueue = new ConcurrentLinkedQueue<RtmpPacket>();
private final Object txPacketLock = new Object();
private volatile boolean active = true;
private int videoFrameCount;
private long lastTimeMillis;
private RtmpPublisher publisher;
public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, RtmpPublisher publisher) {
super("RtmpWriteThread");
this.rtmpSessionInfo = rtmpSessionInfo;
this.out = out;
this.publisher = publisher;
}
@Override
public void run() {
while (active) {
try {
while (!writeQueue.isEmpty()) {
RtmpPacket rtmpPacket = writeQueue.poll();
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId());
chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader());
if (!(rtmpPacket instanceof Video || rtmpPacket instanceof Audio)) {
rtmpPacket.getHeader().setAbsoluteTimestamp((int) chunkStreamInfo.markAbsoluteTimestampTx());
}
rtmpPacket.writeTo(out, rtmpSessionInfo.getTxChunkSize(), chunkStreamInfo);
Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength());
if (rtmpPacket instanceof Command) {
rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName());
}
if (rtmpPacket instanceof Video) {
publisher.getVideoFrameCacheNumber().getAndDecrement();
calcFps();
}
}
out.flush();
} catch (SocketException se) {
Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down: " + se.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se);
active = false;
continue;
} catch (IOException ioe) {
Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down: " + ioe.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe);
active = false;
continue;
}
// Waiting for next packet
Log.d(TAG, "WriteThread: waiting...");
synchronized (txPacketLock) {
try {
// isEmpty() may take some time, so time out should be set to wait next offer
txPacketLock.wait(500);
} catch (InterruptedException ex) {
Log.w(TAG, "Interrupted", ex);
this.interrupt();
}
}
}
Log.d(TAG, "exit");
}
/** Transmit the specified RTMP packet (thread-safe) */
public void send(RtmpPacket rtmpPacket) {
if (rtmpPacket != null) {
writeQueue.add(rtmpPacket);
}
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
public void shutdown() {
Log.d(TAG, "Stopping");
writeQueue.clear();
active = false;
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
private void calcFps() {
if (videoFrameCount == 0) {
lastTimeMillis = System.nanoTime() / 1000000;
videoFrameCount++;
} else {
if (++videoFrameCount >= 48) {
long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis;
publisher.getEventHandler().onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis);
videoFrameCount = 0;
}
}
}
}
Loading…
Cancel
Save