Add metadata on publishing

Video resolution set method.

Signed-off-by: Leo Ma <begeekmyfriend@gmail.com>
camera2
Leo Ma 9 years ago
parent 0307650290
commit bbd0e7df2f

@ -70,7 +70,7 @@ public class SrsEncoder {
public int start() { public int start() {
try { try {
muxer.start(rtmpUrl); muxer.start(rtmpUrl, VCROP_WIDTH, VCROP_HEIGHT);
} catch (IOException e) { } catch (IOException e) {
Log.e(TAG, "start muxer failed."); Log.e(TAG, "start muxer failed.");
e.printStackTrace(); e.printStackTrace();
@ -207,7 +207,7 @@ public class SrsEncoder {
public void onGetYuvFrame(byte[] data) { public void onGetYuvFrame(byte[] data) {
// Check video frame cache number to judge the networking situation. // Check video frame cache number to judge the networking situation.
// Just cache GOP / FPS seconds data according to latency. // Just cache GOP / FPS seconds data according to latency.
if (muxer.getVideoFrameCacheNumber() < VGOP) { if (muxer.getVideoFrameCacheNumber().get() < VGOP) {
preProcessYuvFrame(data); preProcessYuvFrame(data);
ByteBuffer[] inBuffers = vencoder.getInputBuffers(); ByteBuffer[] inBuffers = vencoder.getInputBuffers();
ByteBuffer[] outBuffers = vencoder.getOutputBuffers(); ByteBuffer[] outBuffers = vencoder.getOutputBuffers();
@ -549,4 +549,4 @@ public class SrsEncoder {
} }
return sb.toString(); return sb.toString();
} }
} }

@ -10,6 +10,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Created by winlin on 5/2/15. * Created by winlin on 5/2/15.
@ -72,7 +73,7 @@ public class SrsFlvMuxer {
/** /**
* get cached video frame number in publisher * get cached video frame number in publisher
*/ */
public int getVideoFrameCacheNumber() { public final AtomicInteger getVideoFrameCacheNumber() {
return publisher.getVideoFrameCacheNumber(); return publisher.getVideoFrameCacheNumber();
} }
@ -133,8 +134,10 @@ public class SrsFlvMuxer {
/** /**
* start to the remote SRS for remux. * start to the remote SRS for remux.
*/ */
public void start(String url) throws IOException { public void start(String url, int width, int height) throws IOException {
rtmpUrl = url; rtmpUrl = url;
publisher.setVideoResolution(width, height);
worker = new Thread(new Runnable() { worker = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {

@ -1,6 +1,8 @@
package net.ossrs.sea; package net.ossrs.sea;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import net.ossrs.sea.rtmp.RtmpPublisher; import net.ossrs.sea.rtmp.RtmpPublisher;
import net.ossrs.sea.rtmp.io.RtmpConnection; import net.ossrs.sea.rtmp.io.RtmpConnection;
@ -57,10 +59,15 @@ public class SrsRtmpPublisher implements RtmpPublisher {
} }
@Override @Override
public final int getVideoFrameCacheNumber() { public final AtomicInteger getVideoFrameCacheNumber() {
return rtmpConnection.getVideoFrameCacheNumber(); return rtmpConnection.getVideoFrameCacheNumber();
} }
@Override
public final EventHandler getEventHandler() {
return rtmpConnection.getEventHandler();
}
@Override @Override
public final String getServerIpAddr() { public final String getServerIpAddr() {
return rtmpConnection.getServerIpAddr(); return rtmpConnection.getServerIpAddr();
@ -75,4 +82,9 @@ public class SrsRtmpPublisher implements RtmpPublisher {
public final int getServerId() { public final int getServerId() {
return rtmpConnection.getServerId(); return rtmpConnection.getServerId();
} }
@Override
public void setVideoResolution(int width, int height) {
rtmpConnection.setVideoResolution(width, height);
}
} }

@ -1,6 +1,7 @@
package net.ossrs.sea.rtmp; package net.ossrs.sea.rtmp;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Simple RTMP publisher, using vanilla Java networking (no NIO) * Simple RTMP publisher, using vanilla Java networking (no NIO)
@ -48,10 +49,15 @@ public interface RtmpPublisher {
*/ */
void publishAudioData(byte[] data) throws IllegalStateException; void publishAudioData(byte[] data) throws IllegalStateException;
/**
* obtain event handler in publisher
*/
EventHandler getEventHandler();
/** /**
* obtain video frame number cached in publisher * obtain video frame number cached in publisher
*/ */
int getVideoFrameCacheNumber(); AtomicInteger getVideoFrameCacheNumber();
/** /**
* obtain the IP address of the peer if any * obtain the IP address of the peer if any
@ -68,6 +74,14 @@ public interface RtmpPublisher {
*/ */
int getServerId(); int getServerId();
/**
* set video resolution
*
* @param width
* @param height
*/
void setVideoResolution(int width, int height);
/** /**
* RTMP event handler. * RTMP event handler.
*/ */

@ -45,10 +45,10 @@ public class ReadThread extends Thread {
// packetRxHandler.handleRxPacket(war.getRtmpPacket()); // packetRxHandler.handleRxPacket(war.getRtmpPacket());
// } // }
} catch (SocketException se) { } catch (SocketException se) {
Log.e(TAG, "ReadThread: Caught SocketException while reading/decoding packet, shutting down...", se); Log.e(TAG, "ReadThread: Caught SocketException while reading/decoding packet, shutting down: " + se.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se); Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se);
} catch (IOException ioe) { } catch (IOException ioe) {
Log.e(TAG, "ReadThread: Caught exception while reading/decoding packet, shutting down...", ioe); Log.e(TAG, "ReadThread: Caught exception while reading/decoding packet, shutting down: " + ioe.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe); Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe);
} }
} }

@ -67,6 +67,8 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
private AmfString serverIpAddr; private AmfString serverIpAddr;
private AmfNumber serverPid; private AmfNumber serverPid;
private AmfNumber serverId; private AmfNumber serverId;
private int videoWidth;
private int videoHeight;
public RtmpConnection(RtmpPublisher.EventHandler handler) { public RtmpConnection(RtmpPublisher.EventHandler handler) {
mHandler = handler; mHandler = handler;
@ -114,7 +116,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
Log.d(TAG, "connect(): handshake done"); Log.d(TAG, "connect(): handshake done");
rtmpSessionInfo = new RtmpSessionInfo(); rtmpSessionInfo = new RtmpSessionInfo();
readThread = new ReadThread(rtmpSessionInfo, in, this); readThread = new ReadThread(rtmpSessionInfo, in, this);
writeThread = new WriteThread(rtmpSessionInfo, out, videoFrameCacheNumber, mHandler); writeThread = new WriteThread(rtmpSessionInfo, out, this);
readThread.start(); readThread.start();
writeThread.start(); writeThread.start();
@ -251,11 +253,22 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
} }
Log.d(TAG, "onMetaData(): Sending empty onMetaData..."); Log.d(TAG, "onMetaData(): Sending empty onMetaData...");
Data emptyMetaData = new Data("@setDataFrame"); Data metadata = new Data("@setDataFrame");
emptyMetaData.addData("onMetaData"); metadata.getHeader().setMessageStreamId(currentStreamId);
emptyMetaData.addData(new AmfNull()); metadata.addData("onMetaData");
emptyMetaData.getHeader().setMessageStreamId(currentStreamId); AmfMap ecmaArray = new AmfMap();
writeThread.send(emptyMetaData); ecmaArray.setProperty("duration", 0);
ecmaArray.setProperty("width", videoWidth);
ecmaArray.setProperty("height", videoHeight);
ecmaArray.setProperty("videodatarate", 0);
ecmaArray.setProperty("framerate", 0);
ecmaArray.setProperty("audiodatarate", 0);
ecmaArray.setProperty("audiosamplerate", 44100);
ecmaArray.setProperty("audiosamplesize", 16);
ecmaArray.setProperty("stereo", true);
ecmaArray.setProperty("filesize", 0);
metadata.addData(ecmaArray);
writeThread.send(metadata);
} }
@Override @Override
@ -281,14 +294,18 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
@Override @Override
public void shutdown() { public void shutdown() {
if (active) { if (active) {
// shutdown read thread readThread.shutdown();
writeThread.shutdown();
try { try {
// It will invoke EOFException in read thread // It will invoke EOFException in read thread
socket.shutdownInput(); socket.shutdownInput();
// It will invoke SocketException in write thread
socket.shutdownOutput();
} catch (IOException ioe) { } catch (IOException ioe) {
ioe.printStackTrace(); ioe.printStackTrace();
} }
readThread.shutdown();
try { try {
readThread.join(); readThread.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -296,8 +313,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
readThread.interrupt(); readThread.interrupt();
} }
// shutdown write thread
writeThread.shutdown();
try { try {
writeThread.join(); writeThread.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
@ -412,7 +427,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
case ABORT: case ABORT:
rtmpSessionInfo.getChunkStreamInfo(((Abort) rtmpPacket).getChunkStreamId()).clearStoredChunks(); rtmpSessionInfo.getChunkStreamInfo(((Abort) rtmpPacket).getChunkStreamId()).clearStoredChunks();
break; break;
case USER_CONTROL_MESSAGE: { case USER_CONTROL_MESSAGE:
UserControl ping = (UserControl) rtmpPacket; UserControl ping = (UserControl) rtmpPacket;
switch (ping.getType()) { switch (ping.getType()) {
case PING_REQUEST: case PING_REQUEST:
@ -426,7 +441,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
break; break;
} }
break; break;
}
case WINDOW_ACKNOWLEDGEMENT_SIZE: case WINDOW_ACKNOWLEDGEMENT_SIZE:
WindowAckSize windowAckSize = (WindowAckSize) rtmpPacket; WindowAckSize windowAckSize = (WindowAckSize) rtmpPacket;
int size = windowAckSize.getAcknowledgementWindowSize(); int size = windowAckSize.getAcknowledgementWindowSize();
@ -499,6 +513,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
} else if (commandName.equals("onStatus")) { } else if (commandName.equals("onStatus")) {
String code = ((AmfString) ((AmfObject) invoke.getData().get(1)).getProperty("code")).getValue(); String code = ((AmfString) ((AmfObject) invoke.getData().get(1)).getProperty("code")).getValue();
if (code.equals("NetStream.Publish.Start")) { if (code.equals("NetStream.Publish.Start")) {
onMetaData();
// We can now publish AV data // We can now publish AV data
publishPermitted = true; publishPermitted = true;
synchronized (publishLock) { synchronized (publishLock) {
@ -526,19 +541,34 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
return info; return info;
} }
public final int getVideoFrameCacheNumber() { @Override
return videoFrameCacheNumber.get(); public AtomicInteger getVideoFrameCacheNumber() {
return videoFrameCacheNumber;
}
@Override
public EventHandler getEventHandler() {
return mHandler;
} }
@Override
public final String getServerIpAddr() { public final String getServerIpAddr() {
return serverIpAddr == null ? null : serverIpAddr.getValue(); return serverIpAddr == null ? null : serverIpAddr.getValue();
} }
@Override
public final int getServerPid() { public final int getServerPid() {
return serverPid == null ? 0 : (int) serverPid.getValue(); return serverPid == null ? 0 : (int) serverPid.getValue();
} }
@Override
public final int getServerId() { public final int getServerId() {
return serverId == null ? 0 : (int) serverId.getValue(); return serverId == null ? 0 : (int) serverId.getValue();
} }
@Override
public void setVideoResolution(int width, int height) {
videoWidth = width;
videoHeight = height;
}
} }

@ -30,14 +30,13 @@ public class WriteThread extends Thread {
private volatile boolean active = true; private volatile boolean active = true;
private int videoFrameCount; private int videoFrameCount;
private long lastTimeMillis; private long lastTimeMillis;
private AtomicInteger videoFrameCacheNumber; private RtmpPublisher publisher;
public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, AtomicInteger count, RtmpPublisher.EventHandler handler) { public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, RtmpPublisher publisher) {
super("RtmpWriteThread"); super("RtmpWriteThread");
this.rtmpSessionInfo = rtmpSessionInfo; this.rtmpSessionInfo = rtmpSessionInfo;
this.out = out; this.out = out;
this.videoFrameCacheNumber = count; this.publisher = publisher;
this.handler = handler;
} }
@Override @Override
@ -56,18 +55,18 @@ public class WriteThread extends Thread {
rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName()); rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName());
} }
if (rtmpPacket instanceof Video) { if (rtmpPacket instanceof Video) {
videoFrameCacheNumber.getAndDecrement(); publisher.getVideoFrameCacheNumber().getAndDecrement();
calcFps(); calcFps();
} }
} }
out.flush(); out.flush();
} catch (SocketException se) { } catch (SocketException se) {
Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down", se); Log.e(TAG, "WriteThread: Caught SocketException during write loop, shutting down: " + se.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se); Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se);
active = false; active = false;
continue; continue;
} catch (IOException ioe) { } catch (IOException ioe) {
Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down", ioe); Log.e(TAG, "WriteThread: Caught IOException during write loop, shutting down: " + ioe.getMessage());
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe); Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe);
active = false; active = false;
continue; continue;
@ -115,7 +114,7 @@ public class WriteThread extends Thread {
} else { } else {
if (++videoFrameCount >= 48) { if (++videoFrameCount >= 48) {
long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis; long diffTimeMillis = System.nanoTime() / 1000000 - lastTimeMillis;
handler.onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis); publisher.getEventHandler().onRtmpOutputFps((double) videoFrameCount * 1000 / diffTimeMillis);
videoFrameCount = 0; videoFrameCount = 0;
} }
} }

Loading…
Cancel
Save