camera2
Leo Ma 9 years ago
commit a2a499efb8

8
.gitignore vendored

@ -0,0 +1,8 @@
*.iml
.gradle
/local.properties
/.idea/workspace.xml
/.idea/libraries
.DS_Store
/build
/captures

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<resourceExtensions />
<wildcardResourcePatterns>
<entry name="!?*.java" />
<entry name="!?*.form" />
<entry name="!?*.class" />
<entry name="!?*.groovy" />
<entry name="!?*.scala" />
<entry name="!?*.flex" />
<entry name="!?*.kt" />
<entry name="!?*.clj" />
<entry name="!?*.aj" />
</wildcardResourcePatterns>
<annotationProcessing>
<profile default="true" name="Default" enabled="false">
<processorPath useClasspath="true" />
</profile>
</annotationProcessing>
</component>
</project>

@ -0,0 +1,3 @@
<component name="CopyrightManager">
<settings default="" />
</component>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Encoding">
<file url="PROJECT" charset="UTF-8" />
</component>
</project>

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GradleSettings">
<option name="linkedExternalProjectsSettings">
<GradleProjectSettings>
<option name="distributionType" value="DEFAULT_WRAPPED" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="modules">
<set>
<option value="$PROJECT_DIR$" />
<option value="$PROJECT_DIR$/app" />
</set>
</option>
<option name="myModules">
<set>
<option value="$PROJECT_DIR$" />
<option value="$PROJECT_DIR$/app" />
</set>
</option>
</GradleProjectSettings>
</option>
</component>
</project>

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EntryPointsManager">
<entry_points version="2.0" />
</component>
<component name="NullableNotNullManager">
<option name="myDefaultNullable" value="android.support.annotation.Nullable" />
<option name="myDefaultNotNull" value="android.support.annotation.NonNull" />
<option name="myNullables">
<value>
<list size="4">
<item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.Nullable" />
<item index="1" class="java.lang.String" itemvalue="javax.annotation.Nullable" />
<item index="2" class="java.lang.String" itemvalue="edu.umd.cs.findbugs.annotations.Nullable" />
<item index="3" class="java.lang.String" itemvalue="android.support.annotation.Nullable" />
</list>
</value>
</option>
<option name="myNotNulls">
<value>
<list size="4">
<item index="0" class="java.lang.String" itemvalue="org.jetbrains.annotations.NotNull" />
<item index="1" class="java.lang.String" itemvalue="javax.annotation.Nonnull" />
<item index="2" class="java.lang.String" itemvalue="edu.umd.cs.findbugs.annotations.NonNull" />
<item index="3" class="java.lang.String" itemvalue="android.support.annotation.NonNull" />
</list>
</value>
</option>
</component>
<component name="ProjectLevelVcsManager" settingsEditedManually="false">
<OptionsSetting value="true" id="Add" />
<OptionsSetting value="true" id="Remove" />
<OptionsSetting value="true" id="Checkout" />
<OptionsSetting value="true" id="Update" />
<OptionsSetting value="true" id="Status" />
<OptionsSetting value="true" id="Edit" />
<ConfirmationsSetting value="0" id="Add" />
<ConfirmationsSetting value="0" id="Remove" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" assert-keyword="true" jdk-15="true" project-jdk-name="1.8" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/build/classes" />
</component>
<component name="ProjectType">
<option name="id" value="Android" />
</component>
<component name="masterDetails">
<states>
<state key="ProjectJDKs.UI">
<settings>
<last-edited>1.8</last-edited>
<splitter-proportions>
<option name="proportions">
<list>
<option value="0.2" />
</list>
</option>
</splitter-proportions>
</settings>
</state>
</states>
</component>
</project>

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/Sea.iml" filepath="$PROJECT_DIR$/Sea.iml" />
<module fileurl="file://$PROJECT_DIR$/app/app.iml" filepath="$PROJECT_DIR$/app/app.iml" />
</modules>
</component>
</project>

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RunConfigurationProducerService">
<option name="ignoredProducers">
<set>
<option value="org.jetbrains.plugins.gradle.execution.test.runner.AllInPackageGradleConfigurationProducer" />
<option value="org.jetbrains.plugins.gradle.execution.test.runner.TestClassGradleConfigurationProducer" />
<option value="org.jetbrains.plugins.gradle.execution.test.runner.TestMethodGradleConfigurationProducer" />
</set>
</option>
</component>
</project>

@ -0,0 +1,17 @@
Yet Another Stream Encoder for Android
======================================
**yasea** is an RTMP streaming client in pure Java for Android for those who
hate JNI development. It combines the source code of both [!srs-sea](https://github.com/ossrs/srs-sea)
and [!SimpleRtmp](https://github.com/faucamp/SimpleRtmp) to encode video in
H.264 and audio in AAC by hardware and upload packets to server over RTMP.
Moreover, hardware encoding produces less CPU overhead than software does. And
the code does not depend on any native library.
Help
----
The project now can sample both video from camera and audio from microphone of
Android mobile and connect and handshake with the remote. Unfortunately it has
some problems with the correct format of RTMP packets which still can not be
identified by the server and Wireshark. Any help is welcome.

1
app/.gitignore vendored

@ -0,0 +1 @@
/build

@ -0,0 +1,26 @@
apply plugin: 'com.android.application'
android {
compileSdkVersion 23
buildToolsVersion "23.0.2"
defaultConfig {
applicationId "net.ossrs.sea"
minSdkVersion 16
targetSdkVersion 23
versionCode 1
versionName "1.0"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
}
}
}
dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
testCompile 'junit:junit:4.12'
compile 'com.android.support:appcompat-v7:23.2.0'
}

@ -0,0 +1,13 @@
package net.ossrs.sea;
import android.app.Application;
import android.test.ApplicationTestCase;
/**
* <a href="http://d.android.com/tools/testing/testing_android.html">Testing Fundamentals</a>
*/
public class ApplicationTest extends ApplicationTestCase<Application> {
public ApplicationTest() {
super(Application.class);
}
}

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="net.ossrs.sea">
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.CAMERA" />
<uses-permission android:name="android.permission.RECORD_AUDIO" />
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" />
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
</application>
</manifest>

@ -0,0 +1,388 @@
package net.ossrs.sea;
import android.app.Activity;
import android.content.SharedPreferences;
import android.hardware.Camera;
import android.hardware.Camera.Size;
import android.media.AudioRecord;
import android.media.MediaRecorder;
import android.os.Bundle;
import android.text.Editable;
import android.text.TextWatcher;
import android.util.Log;
import android.view.Menu;
import android.view.MenuItem;
import android.view.SurfaceHolder;
import android.view.SurfaceView;
import android.view.View;
import android.view.WindowManager;
import android.widget.Button;
import android.widget.EditText;
import java.io.IOException;
import java.util.List;
public class MainActivity extends Activity implements SurfaceHolder.Callback, Camera.PreviewCallback {
private static final String TAG = "SrsPublisher";
private AudioRecord mic = null;
private boolean aloop = false;
private Thread aworker = null;
private SurfaceView mCameraView = null;
private Camera mCamera = null;
private int mPreviewRotation = 90;
private int mDisplayRotation = 90;
private int mCamId = Camera.getNumberOfCameras() - 1; // default camera
private byte[] mYuvFrameBuffer;
private SrsEncoder mEncoder;
// settings storage
private SharedPreferences sp;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
sp = getSharedPreferences("SrsPublisher", MODE_PRIVATE);
getWindow().addFlags(WindowManager.LayoutParams.FLAG_KEEP_SCREEN_ON);
setContentView(R.layout.activity_main);
mEncoder = new SrsEncoder();
mYuvFrameBuffer = new byte[SrsEncoder.VWIDTH * SrsEncoder.VHEIGHT * 3 / 2];
// restore data.
SrsEncoder.rtmpUrl = sp.getString("SrsEncoder.rtmpUrl", SrsEncoder.rtmpUrl);
SrsEncoder.vbitrate = sp.getInt("VBITRATE", SrsEncoder.vbitrate);
Log.i(TAG, String.format("initialize rtmp url to %s, vbitrate=%dkbps", SrsEncoder.rtmpUrl, SrsEncoder.vbitrate));
// initialize url.
final EditText efu = (EditText) findViewById(R.id.url);
efu.setText(SrsEncoder.rtmpUrl);
efu.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
String fu = efu.getText().toString();
if (fu == SrsEncoder.rtmpUrl || fu.isEmpty()) {
return;
}
SrsEncoder.rtmpUrl = fu;
Log.i(TAG, String.format("flv url changed to %s", SrsEncoder.rtmpUrl));
SharedPreferences.Editor editor = sp.edit();
editor.putString("SrsEncoder.rtmpUrl", SrsEncoder.rtmpUrl);
editor.commit();
}
});
final EditText evb = (EditText) findViewById(R.id.vbitrate);
evb.setText(String.format("%dkbps", SrsEncoder.vbitrate / 1000));
evb.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
int vb = Integer.parseInt(evb.getText().toString().replaceAll("kbps", ""));
if (vb * 1000 != SrsEncoder.vbitrate) {
SrsEncoder.vbitrate = vb * 1000;
SharedPreferences.Editor editor = sp.edit();
editor.putInt("VBITRATE", SrsEncoder.vbitrate);
editor.commit();
}
}
});
// for camera, @see https://developer.android.com/reference/android/hardware/Camera.html
final Button btnPublish = (Button) findViewById(R.id.publish);
final Button btnStop = (Button) findViewById(R.id.stop);
final Button btnSwitch = (Button) findViewById(R.id.swCam);
final Button btnRotate = (Button) findViewById(R.id.rotate);
mCameraView = (SurfaceView) findViewById(R.id.preview);
mCameraView.getHolder().addCallback(this);
// mCameraView.getHolder().setFormat(SurfaceHolder.SURFACE_TYPE_HARDWARE);
btnPublish.setEnabled(true);
btnStop.setEnabled(false);
btnPublish.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startPublish();
btnPublish.setEnabled(false);
btnStop.setEnabled(true);
}
});
btnStop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
stopPublish();
btnPublish.setEnabled(true);
btnStop.setEnabled(false);
}
});
btnSwitch.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (mCamera != null && mEncoder != null) {
mCamId = (mCamId + 1) % Camera.getNumberOfCameras();
stopCamera();
mEncoder.swithCameraFace();
startCamera();
}
}
});
btnRotate.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (mCamera != null) {
mPreviewRotation = (mPreviewRotation + 90) % 360;
mCamera.setDisplayOrientation(mPreviewRotation);
}
}
});
}
@Override
protected void onResume() {
super.onResume();
final Button btn = (Button) findViewById(R.id.publish);
btn.setEnabled(true);
}
@Override
protected void onPause() {
super.onPause();
}
@Override
public boolean onCreateOptionsMenu(Menu menu) {
// Inflate the menu; this adds items to the action bar if it is present.
getMenuInflater().inflate(R.menu.menu_main, menu);
return true;
}
@Override
public boolean onOptionsItemSelected(MenuItem item) {
// Handle action bar item clicks here. The action bar will
// automatically handle clicks on the Home/Up button, so long
// as you specify a parent activity in AndroidManifest.xml.
int id = item.getItemId();
//noinspection SimplifiableIfStatement
if (id == R.id.action_settings) {
return true;
}
return super.onOptionsItemSelected(item);
}
private void startCamera() {
if (mCamera != null) {
Log.d(TAG, "start camera, already started. return");
return;
}
if (mCamId > (Camera.getNumberOfCameras() - 1) || mCamId < 0) {
Log.e(TAG, "####### start camera failed, inviald params, camera No.="+ mCamId);
return;
}
mCamera = Camera.open(mCamId);
Camera.CameraInfo info = new Camera.CameraInfo();
Camera.getCameraInfo(mCamId, info);
if (info.facing == Camera.CameraInfo.CAMERA_FACING_FRONT){
mDisplayRotation = (mPreviewRotation + 180) % 360;
mDisplayRotation = (360 - mDisplayRotation) % 360;
} else {
mDisplayRotation = mPreviewRotation;
}
Camera.Parameters params = mCamera.getParameters();
/* supported preview fps range */
// List<int[]> spfr = params.getSupportedPreviewFpsRange();
// Log.i("Cam", "! Supported Preview Fps Range:");
// int rn = 0;
// for (int[] r : spfr) {
// Log.i("Cam", "\tRange [" + rn++ + "]: " + r[0] + "~" + r[1]);
// }
// /* preview size */
List<Size> sizes = params.getSupportedPreviewSizes();
Log.i("Cam", "! Supported Preview Size:");
for (int i = 0; i < sizes.size(); i++) {
Log.i("Cam", "\tSize [" + i + "]: " + sizes.get(i).width + "x" + sizes.get(i).height);
}
/* picture size */
sizes = params.getSupportedPictureSizes();
Log.i("Cam", "! Supported Picture Size:");
for (int i = 0; i < sizes.size(); i++) {
Log.i("Cam", "\tSize [" + i + "]: " + sizes.get(i).width + "x" + sizes.get(i).height);
}
/***** set parameters *****/
//params.set("orientation", "portrait");
//params.set("orientation", "landscape");
//params.setRotation(90);
params.setPictureSize(SrsEncoder.VWIDTH, SrsEncoder.VHEIGHT);
params.setPreviewSize(SrsEncoder.VWIDTH, SrsEncoder.VHEIGHT);
int[] range = findClosestFpsRange(SrsEncoder.VFPS, params.getSupportedPreviewFpsRange());
params.setPreviewFpsRange(range[0], range[1]);
params.setPreviewFormat(SrsEncoder.VFORMAT);
params.setFlashMode(Camera.Parameters.FLASH_MODE_OFF);
params.setWhiteBalance(Camera.Parameters.WHITE_BALANCE_AUTO);
params.setSceneMode(Camera.Parameters.SCENE_MODE_AUTO);
mCamera.setParameters(params);
mCamera.setDisplayOrientation(mPreviewRotation);
mCamera.addCallbackBuffer(mYuvFrameBuffer);
mCamera.setPreviewCallbackWithBuffer(this);
try {
mCamera.setPreviewDisplay(mCameraView.getHolder());
} catch (IOException e) {
e.printStackTrace();
}
mCamera.startPreview();
}
private void stopCamera() {
if (mCamera != null) {
// need to SET NULL CB before stop preview!!!
mCamera.setPreviewCallback(null);
mCamera.stopPreview();
mCamera.release();
mCamera = null;
}
}
private void onGetYuvFrame(byte[] data) {
mEncoder.onGetYuvFrame(data);
}
@Override
public void onPreviewFrame(byte[] data, Camera c) {
onGetYuvFrame(data);
c.addCallbackBuffer(mYuvFrameBuffer);
}
private void onGetPcmFrame(byte[] pcmBuffer, int size) {
mEncoder.onGetPcmFrame(pcmBuffer, size);
}
private void startAudio() {
if (mic != null) {
return;
}
int bufferSize = 2 * AudioRecord.getMinBufferSize(SrsEncoder.ASAMPLERATE, SrsEncoder.ACHANNEL, SrsEncoder.AFORMAT);
mic = new AudioRecord(MediaRecorder.AudioSource.MIC, SrsEncoder.ASAMPLERATE, SrsEncoder.ACHANNEL, SrsEncoder.AFORMAT, bufferSize);
mic.startRecording();
byte pcmBuffer[] = new byte[4096];
while (aloop && !Thread.interrupted()) {
int size = mic.read(pcmBuffer, 0, pcmBuffer.length);
if (size <= 0) {
Log.e(TAG, "***** audio ignored, no data to read.");
break;
}
onGetPcmFrame(pcmBuffer, size);
}
}
private void stopAudio() {
aloop = false;
if (aworker != null) {
Log.i(TAG, "stop audio worker thread");
aworker.interrupt();
try {
aworker.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
aworker = null;
}
if (mic != null) {
mic.setRecordPositionUpdateListener(null);
mic.stop();
mic.release();
mic = null;
}
}
private void startPublish() {
int ret = mEncoder.start();
if (ret < 0) {
return;
}
startCamera();
aworker = new Thread(new Runnable() {
@Override
public void run() {
android.os.Process.setThreadPriority(android.os.Process.THREAD_PRIORITY_AUDIO);
startAudio();
}
});
aloop = true;
aworker.start();
}
private void stopPublish() {
stopAudio();
stopCamera();
mEncoder.stop();
}
private int[] findClosestFpsRange(int expectedFps, List<int[]> fpsRanges) {
expectedFps *= 1000;
int[] closestRange = fpsRanges.get(0);
int measure = Math.abs(closestRange[0] - expectedFps) + Math.abs(closestRange[1] - expectedFps);
for (int[] range : fpsRanges) {
if (range[0] <= expectedFps && range[1] >= expectedFps) {
int curMeasure = Math.abs(range[0] - expectedFps) + Math.abs(range[1] - expectedFps);
if (curMeasure < measure) {
closestRange = range;
measure = curMeasure;
}
}
}
return closestRange;
}
@Override
public void surfaceChanged(SurfaceHolder holder, int format, int width, int height) {
Log.d(TAG, "surfaceChanged");
}
@Override
public void surfaceCreated(SurfaceHolder arg0) {
Log.d(TAG, "surfaceCreated");
}
@Override
public void surfaceDestroyed(SurfaceHolder arg0) {
Log.d(TAG, "surfaceDestroyed");
}
}

@ -0,0 +1,515 @@
package net.ossrs.sea;
import android.graphics.ImageFormat;
import android.media.AudioFormat;
import android.media.MediaCodec;
import android.media.MediaCodecInfo;
import android.media.MediaCodecList;
import android.media.MediaFormat;
import android.util.Log;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Created by Leo Ma on 4/1/2016.
*/
public class SrsEncoder {
private static final String TAG = "SrsEncoder";
public static final String VCODEC = "video/avc";
public static final String ACODEC = "audio/mp4a-latm";
public static String rtmpUrl = "rtmp://10.10.10.135/ivp/test";//"ossrs.net:1935/live/sea"
public static final int VWIDTH = 640;
public static final int VHEIGHT = 480;
public static int vbitrate = 800 * 1000; // 800kbps
public static final int VENC_WIDTH = 368;
public static final int VENC_HEIGHT = 640;
public static final int VFPS = 24;
public static final int VGOP = 60;
public static final int VFORMAT = ImageFormat.NV21;
public static final int ASAMPLERATE = 44100;
public static final int ACHANNEL = AudioFormat.CHANNEL_IN_STEREO;
public static final int AFORMAT = AudioFormat.ENCODING_PCM_16BIT;
public static final int ABITRATE = 128 * 1000; // 128kbps
private SrsRtmp muxer;
private MediaCodec vencoder;
private MediaCodecInfo vmci;
private MediaCodec.BufferInfo vebi;
private MediaCodec aencoder;
private MediaCodec.BufferInfo aebi;
private byte[] mRotatedFrameBuffer;
private byte[] mFlippedFrameBuffer;
private byte[] mCroppedFrameBuffer;
private boolean mCameraFaceFront = true;
private long mPresentTimeMs;
private int vtrack;
private int vcolor;
private int atrack;
public SrsEncoder() {
vcolor = chooseVideoEncoder();
mRotatedFrameBuffer = new byte[VWIDTH * VHEIGHT * 3 / 2];
mFlippedFrameBuffer = new byte[VWIDTH * VHEIGHT * 3 / 2];
mCroppedFrameBuffer = new byte[VENC_WIDTH * VENC_HEIGHT * 3 / 2];
}
public int start() {
muxer = new SrsRtmp(rtmpUrl);
try {
muxer.start();
} catch (IOException e) {
Log.e(TAG, "start muxer failed.");
e.printStackTrace();
return -1;
}
// the referent PTS for video and audio encoder.
mPresentTimeMs = System.currentTimeMillis();
// aencoder yuv to aac raw stream.
// requires sdk level 16+, Android 4.1, 4.1.1, the JELLY_BEAN
try {
aencoder = MediaCodec.createEncoderByType(ACODEC);
} catch (IOException e) {
Log.e(TAG, "create aencoder failed.");
e.printStackTrace();
return -1;
}
aebi = new MediaCodec.BufferInfo();
// setup the aencoder.
// @see https://developer.android.com/reference/android/media/MediaCodec.html
int ach = ACHANNEL == AudioFormat.CHANNEL_IN_STEREO ? 2 : 1;
MediaFormat audioFormat = MediaFormat.createAudioFormat(MediaFormat.MIMETYPE_AUDIO_AAC, ASAMPLERATE, ach);
audioFormat.setInteger(MediaFormat.KEY_BIT_RATE, ABITRATE);
audioFormat.setInteger(MediaFormat.KEY_MAX_INPUT_SIZE, 0);
aencoder.configure(audioFormat, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE);
// add the audio tracker to muxer.
atrack = muxer.addTrack(audioFormat);
// vencoder yuv to 264 es stream.
// requires sdk level 16+, Android 4.1, 4.1.1, the JELLY_BEAN
try {
vencoder = MediaCodec.createByCodecName(vmci.getName());
} catch (IOException e) {
Log.e(TAG, "create vencoder failed.");
e.printStackTrace();
return -1;
}
vebi = new MediaCodec.BufferInfo();
// setup the vencoder.
// Note: landscape to portrait, 90 degree rotation, so we need to switch VWIDTH and VHEIGHT in configuration
MediaFormat videoFormat = MediaFormat.createVideoFormat(MediaFormat.MIMETYPE_VIDEO_AVC, VENC_WIDTH, VENC_HEIGHT);
videoFormat.setInteger(MediaFormat.KEY_COLOR_FORMAT, vcolor);
videoFormat.setInteger(MediaFormat.KEY_MAX_INPUT_SIZE, 0);
videoFormat.setInteger(MediaFormat.KEY_BIT_RATE, 300000);
videoFormat.setInteger(MediaFormat.KEY_FRAME_RATE, VFPS);
videoFormat.setInteger(MediaFormat.KEY_I_FRAME_INTERVAL, VGOP);
vencoder.configure(videoFormat, null, null, MediaCodec.CONFIGURE_FLAG_ENCODE);
// add the video tracker to muxer.
vtrack = muxer.addTrack(videoFormat);
// start device and encoder.
try {
Log.i(TAG, "start avc vencoder");
vencoder.start();
Log.i(TAG, "start aac aencoder");
aencoder.start();
} catch (Exception e) {
Log.e(TAG, "Encoder start failed!");
}
return 0;
}
public void stop() {
if (aencoder != null) {
Log.i(TAG, "stop aencoder");
aencoder.stop();
aencoder.release();
aencoder = null;
}
if (vencoder != null) {
Log.i(TAG, "stop vencoder");
vencoder.stop();
vencoder.release();
vencoder = null;
}
if (muxer != null) {
Log.i(TAG, "stop muxer to SRS over RTMP");
muxer.release();
muxer = null;
}
}
public void swithCameraFace() {
if (mCameraFaceFront) {
mCameraFaceFront = false;
} else {
mCameraFaceFront = true;
}
}
// when got encoded h264 es stream.
private void onEncodedAnnexbFrame(ByteBuffer es, MediaCodec.BufferInfo bi) {
try {
muxer.writeSampleData(vtrack, es, bi);
} catch (Exception e) {
Log.e(TAG, "muxer write video sample failed.");
e.printStackTrace();
}
}
private int preProcessYuvFrame(byte[] data) {
if (mCameraFaceFront) {
switch (vcolor) {
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar:
flipYUV420PlannerFrame(data, mFlippedFrameBuffer, VHEIGHT, VWIDTH);
rotateYUV420PlannerFrame(mFlippedFrameBuffer, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420PlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420PackedPlanar:
flipYUV420SemiPlannerFrame(data, mFlippedFrameBuffer, VWIDTH, VHEIGHT);
rotateYUV420SemiPlannerFrame(mFlippedFrameBuffer, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420SemiPlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420SemiPlanar:
flipYUV420SemiPlannerFrame(data, mFlippedFrameBuffer, VWIDTH, VHEIGHT);
rotateYUV420SemiPlannerFrame(mFlippedFrameBuffer, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420SemiPlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
default:
return -1;
}
} else {
switch (vcolor) {
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420Planar:
rotateYUV420PlannerFrame(data, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420PlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420PackedPlanar:
rotateYUV420SemiPlannerFrame(data, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420SemiPlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
case MediaCodecInfo.CodecCapabilities.COLOR_FormatYUV420SemiPlanar:
rotateYUV420SemiPlannerFrame(data, mRotatedFrameBuffer, VWIDTH, VHEIGHT);
cropYUV420SemiPlannerFrame(mRotatedFrameBuffer, VHEIGHT, VWIDTH,
mCroppedFrameBuffer, VENC_WIDTH, VENC_HEIGHT);
break;
default:
return -1;
}
}
return 0;
}
public void onGetYuvFrame(byte[] data) {
if (preProcessYuvFrame(data) >= 0) {
ByteBuffer[] inBuffers = vencoder.getInputBuffers();
ByteBuffer[] outBuffers = vencoder.getOutputBuffers();
int inBufferIndex = vencoder.dequeueInputBuffer(-1);
if (inBufferIndex >= 0) {
ByteBuffer bb = inBuffers[inBufferIndex];
bb.clear();
bb.put(mCroppedFrameBuffer, 0, mCroppedFrameBuffer.length);
long pts = System.currentTimeMillis() - mPresentTimeMs;
vencoder.queueInputBuffer(inBufferIndex, 0, mCroppedFrameBuffer.length, pts, 0);
}
for (; ;) {
int outBufferIndex = vencoder.dequeueOutputBuffer(vebi, 0);
if (outBufferIndex >= 0) {
ByteBuffer bb = outBuffers[outBufferIndex];
onEncodedAnnexbFrame(bb, vebi);
vencoder.releaseOutputBuffer(outBufferIndex, false);
} else {
break;
}
}
}
}
// when got encoded aac raw stream.
private void onEncodedAacFrame(ByteBuffer es, MediaCodec.BufferInfo bi) {
try {
muxer.writeSampleData(atrack, es, bi);
} catch (Exception e) {
Log.e(TAG, "muxer write audio sample failed.");
e.printStackTrace();
}
}
public void onGetPcmFrame(byte[] data, int size) {
ByteBuffer[] inBuffers = aencoder.getInputBuffers();
ByteBuffer[] outBuffers = aencoder.getOutputBuffers();
int inBufferIndex = aencoder.dequeueInputBuffer(-1);
if (inBufferIndex >= 0) {
ByteBuffer bb = inBuffers[inBufferIndex];
bb.clear();
bb.put(data, 0, size);
long pts = System.currentTimeMillis() - mPresentTimeMs;
aencoder.queueInputBuffer(inBufferIndex, 0, size, pts, 0);
}
for (; ;) {
int outBufferIndex = aencoder.dequeueOutputBuffer(aebi, 0);
if (outBufferIndex >= 0) {
ByteBuffer bb = outBuffers[outBufferIndex];
onEncodedAacFrame(bb, aebi);
aencoder.releaseOutputBuffer(outBufferIndex, false);
} else {
break;
}
}
}
// Y, U (Cb) and V (Cr)
// yuv420 yuv yuv yuv yuv
// yuv420p (平面模式 planar) yyyy*2 uu vv
// yuv420sp(打包模式 packed) yyyy*2 uv uv SP(Semi-Planar)指的是YUV不是分成3个平面而是分成2个平面。Y数据一个平面UV数据合用一个平面数据格式UVUVUV
// I420 -> YUV420P yyyy*2 uu vv
// YV12 -> YUV420P yyyy*2 vv uu
// NV21 -> YUV420SP yyyy*2 vu vu
// NV12 -> YUV420SP yyyy*2 uv uv
// NV16 -> YUV422SP yyyy uv uv
// YUY2 -> YUV422SP yuyv yuyv
private byte[] cropYUV420SemiPlannerFrame(byte[] input, int iw, int ih, byte[] output, int ow, int oh) {
assert(iw >= ow && ih >= oh);
int i = 0;
int iFrameSize = iw * ih;
int oFrameSize = ow * oh;
for (int row = (ih - oh) / 2; row < oh + (ih - oh) / 2; row++) {
for (int col = (iw - ow) / 2; col < ow + (iw - ow) / 2; col++) {
output[i++] = input[iw * row + col]; // Y
}
}
i = 0;
for (int row = (ih - oh) / 4; row < oh / 2 + (ih - oh) / 4; row++) {
for (int col = (iw - ow) / 4; col < ow / 2 + (iw - ow) / 4; col++) {
output[oFrameSize + 2 * i] = input[iFrameSize + iw * row + 2 * col]; // U
output[oFrameSize + 2 * i + 1] = input[iFrameSize + iw * row + 2 * col + 1]; // V
i++;
}
}
return output;
}
private byte[] cropYUV420PlannerFrame(byte[] input, int iw, int ih, byte[] output, int ow, int oh) {
assert(iw >= ow && ih >= oh);
int i = 0;
int iFrameSize = iw * ih;
int iQFrameSize = iFrameSize / 4;
int oFrameSize = ow * oh;
int oQFrameSize = oFrameSize / 4;
for (int row = (ih - oh) / 2; row < oh + (ih - oh) / 2; row++) {
for (int col = (iw - ow) / 2; col < ow + (iw - ow) / 2; col++) {
output[i++] = input[iw * row + col]; // Y
}
}
i = 0;
for (int row = (ih - oh) / 4; row < oh / 2 + (ih - oh) / 4; row++) {
for (int col = (iw - ow) / 4; col < ow / 2 + (iw - ow) / 4; col++) {
output[oFrameSize + i++] = input[iFrameSize + iw * row + col]; // U
}
}
i = 0;
for (int row = (ih - oh) / 4; row < oh / 2 + (ih - oh) / 4; row++) {
for (int col = (iw - ow) / 4; col < ow / 2 + (iw - ow) / 4; col++) {
output[oFrameSize + oQFrameSize + i++] = input[iFrameSize + iQFrameSize + iw * row + col]; // V
}
}
return output;
}
// 1. rotate 90 degree clockwise
// 2. convert NV21 to NV12
private byte[] rotateYUV420SemiPlannerFrame(byte[] input, byte[] output, int width, int height) {
int frameSize = width * height;
int i = 0;
for (int col = 0; col < width; col++) {
for (int row = height - 1; row >= 0; row--) {
output[i++] = input[width * row + col]; // Y
}
}
i = 0;
for (int col = 0; col < width / 2; col++) {
for (int row = height / 2 - 1; row >= 0; row--) {
output[frameSize + i * 2 + 1] = input[frameSize + width * row + col * 2]; // Cb (U)
output[frameSize + i * 2] = input[frameSize + width * row + col * 2 + 1]; // Cr (V)
i++;
}
}
return output;
}
// 1. rotate 90 degree clockwise
// 2. convert NV21 to I420
private byte[] rotateYUV420PlannerFrame(byte[] input, byte[] output, int width, int height) {
int frameSize = width * height;
int qFrameSize = frameSize / 4;
int i = 0;
for (int col = width - 1; col >= 0; col--) {
for (int row = 0; row < height; row++) {
output[i++] = input[width * row + col];
}
}
i = 0;
for (int col = width / 2 - 1; col >= 0; col--) {
for (int row = 0; row < height / 2; row++) {
output[frameSize + i++] = input[frameSize + width * row + col];
}
}
i = 0;
for (int col = width / 2 - 1; col >= 0; col--) {
for (int row = 0; row < height / 2; row++) {
output[frameSize + qFrameSize + i++] = input[frameSize + qFrameSize + width * row + col];
}
}
return output;
}
private byte[] flipYUV420SemiPlannerFrame(byte[] input, byte[] output, int width, int height) {
int frameSize = width * height;
int i = 0;
for (int row = 0; row < height; row++) {
for (int col = width - 1; col >= 0; col--) {
output[i++] = input[width * row + col]; // Y
}
}
i = 0;
for (int row = 0; row < height / 2; row++) {
for (int col = width / 2 - 1; col >= 0; col--) {
output[frameSize + i * 2] = input[frameSize + width * row + col * 2]; // Cb (U)
output[frameSize + i * 2 + 1] = input[frameSize + width * row + col * 2 + 1]; // Cr (V)
i++;
}
}
return output;
}
private byte[] flipYUV420PlannerFrame(byte[] input, byte[] output, int width, int height) {
int frameSize = width * height;
int qFrameSize = frameSize / 4;
int i = 0;
for (int row = 0; row < height; row++) {
for (int col = width - 1; col >= 0; col--) {
output[i++] = input[width * row + col]; // Y
}
}
i = 0;
for (int row = 0; row < height / 2; row++) {
for (int col = width / 2 - 1; col >= 0; col--) {
output[frameSize + i] = input[frameSize + width * row + col]; // Cb (U)
i++;
}
}
i = 0;
for (int row = 0; row < height / 2; row++) {
for (int col = width / 2 - 1; col >= 0; col--) {
output[frameSize + qFrameSize + i] = input[frameSize + qFrameSize + width * row + col]; // Cr (V)
i++;
}
}
return output;
}
// choose the video encoder by name.
private MediaCodecInfo chooseVideoEncoder(String name) {
int nbCodecs = MediaCodecList.getCodecCount();
for (int i = 0; i < nbCodecs; i++) {
MediaCodecInfo mci = MediaCodecList.getCodecInfoAt(i);
if (!mci.isEncoder()) {
continue;
}
String[] types = mci.getSupportedTypes();
for (int j = 0; j < types.length; j++) {
if (types[j].equalsIgnoreCase(VCODEC)) {
//Log.i(TAG, String.format("vencoder %s types: %s", mci.getName(), types[j]));
if (name == null) {
return mci;
}
if (mci.getName().contains(name)) {
return mci;
}
}
}
}
return null;
}
// choose the right supported color format. @see below:
private int chooseVideoEncoder() {
// choose the encoder "video/avc":
// 1. select one when type matched.
// 2. perfer google avc.
// 3. perfer qcom avc.
vmci = chooseVideoEncoder(null);
//vmci = chooseVideoEncoder("google");
//vmci = chooseVideoEncoder("qcom");
int matchedColorFormat = 0;
MediaCodecInfo.CodecCapabilities cc = vmci.getCapabilitiesForType(VCODEC);
for (int i = 0; i < cc.colorFormats.length; i++) {
int cf = cc.colorFormats[i];
Log.i(TAG, String.format("vencoder %s supports color fomart 0x%x(%d)", vmci.getName(), cf, cf));
// choose YUV for h.264, prefer the bigger one.
// corresponding to the color space transform in onPreviewFrame
if ((cf >= cc.COLOR_FormatYUV420Planar && cf <= cc.COLOR_FormatYUV420SemiPlanar)) {
if (cf > matchedColorFormat) {
matchedColorFormat = cf;
}
}
}
for (int i = 0; i < cc.profileLevels.length; i++) {
MediaCodecInfo.CodecProfileLevel pl = cc.profileLevels[i];
Log.i(TAG, String.format("vencoder %s support profile %d, level %d", vmci.getName(), pl.profile, pl.level));
}
Log.i(TAG, String.format("vencoder %s choose color format 0x%x(%d)", vmci.getName(), matchedColorFormat, matchedColorFormat));
return matchedColorFormat;
}
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,69 @@
package net.ossrs.sea;
import java.io.IOException;
import net.ossrs.sea.rtmp.RtmpPublisher;
import net.ossrs.sea.rtmp.io.RtmpConnection;
/**
* Srs implementation of an RTMP publisher
*
* @author francois, leoma
*/
public class SrsRtmpPublisher implements RtmpPublisher {
private RtmpPublisher rtmpConnection;
/**
* Constructor for URLs in the format: rtmp://host[:port]/application[?streamName]
*
* @param url a RTMP URL in the format: rtmp://host[:port]/application[?streamName]
*/
public SrsRtmpPublisher(String url) {
rtmpConnection = new RtmpConnection(url);
}
@Override
public void connect() throws IOException {
rtmpConnection.connect();
}
@Override
public void shutdown() {
rtmpConnection.shutdown();
}
@Override
public void publish(String publishType) throws IllegalStateException, IOException {
if (publishType == null) {
throw new IllegalStateException("No publish type specified");
}
rtmpConnection.publish(publishType);
}
@Override
public void closeStream() throws IllegalStateException {
rtmpConnection.closeStream();
}
@Override
public void publishVideoData(byte[] data, int dts) throws IllegalStateException {
if (data == null || data.length == 0) {
throw new IllegalStateException("Invalid Video Data");
}
if (dts < 0) {
throw new IllegalStateException("Invalid DTS");
}
rtmpConnection.publishVideoData(data, dts);
}
@Override
public void publishAudioData(byte[] data, int dts) throws IllegalStateException {
if (data == null || data.length == 0) {
throw new IllegalStateException("Invalid Audio Data");
}
if (dts < 0) {
throw new IllegalStateException("Invalid DTS");
}
rtmpConnection.publishAudioData(data, dts);
}
}

@ -0,0 +1,72 @@
package net.ossrs.sea.rtmp;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import android.util.Log;
/**
* Some helper utilities for SHA256, mostly (used during handshake)
* This is separated in order to be more easily replaced on platforms that
* do not have the javax.crypto.* and/or java.security.* packages
*
* This implementation is directly inspired by the RTMPHandshake class of the
* Red5 Open Source Flash Server project
*
* @author francois
*/
public class Crypto {
private static final String TAG = "Crypto";
private Mac hmacSHA256;
public Crypto() {
try {
hmacSHA256 = Mac.getInstance("HmacSHA256");
} catch (SecurityException e) {
Log.e(TAG, "Security exception when getting HMAC", e);
} catch (NoSuchAlgorithmException e) {
Log.e(TAG, "HMAC SHA256 does not exist");
}
}
/**
* Calculates an HMAC SHA256 hash using a default key length.
*
*
* @param input
* @param key
* @return hmac hashed bytes
*/
public byte[] calculateHmacSHA256(byte[] input, byte[] key) {
byte[] output = null;
try {
hmacSHA256.init(new SecretKeySpec(key, "HmacSHA256"));
output = hmacSHA256.doFinal(input);
} catch (InvalidKeyException e) {
Log.e(TAG, "Invalid key", e);
}
return output;
}
/**
* Calculates an HMAC SHA256 hash using a set key length.
*
* @param input
* @param key
* @param length
* @return hmac hashed bytes
*/
public byte[] calculateHmacSHA256(byte[] input, byte[] key, int length) {
byte[] output = null;
try {
hmacSHA256.init(new SecretKeySpec(key, 0, length, "HmacSHA256"));
output = hmacSHA256.doFinal(input);
} catch (InvalidKeyException e) {
Log.e(TAG, "Invalid key", e);
}
return output;
}
}

@ -0,0 +1,165 @@
GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.

@ -0,0 +1,46 @@
package net.ossrs.sea.rtmp;
import java.io.IOException;
/**
* Simple RTMP publisher, using vanilla Java networking (no NIO)
* This was created primarily to address a NIO bug in Android 2.2 when
* used with Apache Mina, but also to provide an easy-to-use way to access
* RTMP streams
*
* @author francois, leo
*/
public interface RtmpPublisher {
void connect() throws IOException;
/**
* Issues an RTMP "publish" command and write the media content stream packets (audio and video).
*
* @param publishType specify the way to publish raw RTMP packets among "live", "record" and "append"
* @return An outputStream allowing you to write the incoming media content data
* @throws IllegalStateException if the client is not connected to a RTMP server
* @throws IOException if a network/IO error occurs
*/
void publish(String publishType) throws IllegalStateException, IOException;
/**
* Stops and closes the current RTMP stream
*/
void closeStream() throws IllegalStateException;
/**
* Shuts down the RTMP client and stops all threads associated with it
*/
void shutdown();
/**
* publish a video content packet to server
*/
void publishVideoData(byte[] data, int dts) throws IllegalStateException;
/**
* publish an audio content packet to server
*/
void publishAudioData(byte[] data, int dts) throws IllegalStateException;
}

@ -0,0 +1,138 @@
package net.ossrs.sea.rtmp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Misc utility method
* @author francois
*/
public class Util {
private static final String HEXES = "0123456789ABCDEF";
public static void writeUnsignedInt32(OutputStream out, int value) throws IOException {
out.write((byte) (value >>> 24));
out.write((byte) (value >>> 16));
out.write((byte) (value >>> 8));
out.write((byte) value);
}
public static int readUnsignedInt32(InputStream in) throws IOException {
return ((in.read() & 0xff) << 24) | ((in.read() & 0xff) << 16) | ((in.read() & 0xff) << 8) | (in.read() & 0xff);
}
public static int readUnsignedInt24(InputStream in) throws IOException {
return ((in.read() & 0xff) << 16) | ((in.read() & 0xff) << 8) | (in.read() & 0xff);
}
public static int readUnsignedInt16(InputStream in) throws IOException {
return ((in.read() & 0xff) << 8) | (in.read() & 0xff);
}
public static void writeUnsignedInt24(OutputStream out, int value) throws IOException {
out.write((byte) (value >>> 16));
out.write((byte) (value >>> 8));
out.write((byte) value);
}
public static void writeUnsignedInt16(OutputStream out, int value) throws IOException {
out.write((byte) (value >>> 8));
out.write((byte) value);
}
public static int toUnsignedInt32(byte[] bytes) {
return (((int) bytes[0] & 0xff) << 24) | (((int)bytes[1] & 0xff) << 16) | (((int)bytes[2] & 0xff) << 8) | ((int)bytes[3] & 0xff);
}
public static int toUnsignedInt32LittleEndian(byte[] bytes) {
return ((bytes[3] & 0xff) << 24) | ((bytes[2] & 0xff) << 16) | ((bytes[1] & 0xff) << 8) | (bytes[0] & 0xff);
}
public static void writeUnsignedInt32LittleEndian(OutputStream out, int value) throws IOException {
out.write((byte) value);
out.write((byte) (value >>> 8));
out.write((byte) (value >>> 16));
out.write((byte) (value >>> 24));
}
public static int toUnsignedInt24(byte[] bytes) {
return ((bytes[1] & 0xff) << 16) | ((bytes[2] & 0xff) << 8) | (bytes[3] & 0xff);
}
public static int toUnsignedInt16(byte[] bytes) {
return ((bytes[2] & 0xff) << 8) | (bytes[3] & 0xff);
}
public static String toHexString(byte[] raw) {
if (raw == null) {
return null;
}
final StringBuilder hex = new StringBuilder(2 * raw.length);
for (final byte b : raw) {
hex.append(HEXES.charAt((b & 0xF0) >> 4)).append(HEXES.charAt((b & 0x0F)));
}
return hex.toString();
}
public static String toHexString(byte b) {
return new StringBuilder().append(HEXES.charAt((b & 0xF0) >> 4)).append(HEXES.charAt((b & 0x0F))).toString();
}
/**
* Reads bytes from the specified inputstream into the specified target buffer until it is filled up
*/
public static void readBytesUntilFull(InputStream in, byte[] targetBuffer) throws IOException {
int totalBytesRead = 0;
int read;
final int targetBytes = targetBuffer.length;
do {
read = in.read(targetBuffer, totalBytesRead, (targetBytes - totalBytesRead));
if (read != -1) {
totalBytesRead += read;
} else {
throw new IOException("Unexpected EOF reached before read buffer was filled");
}
} while (totalBytesRead < targetBytes);
}
public static byte[] toByteArray(double d) {
long l = Double.doubleToRawLongBits(d);
return new byte[]{
(byte) ((l >> 56) & 0xff),
(byte) ((l >> 48) & 0xff),
(byte) ((l >> 40) & 0xff),
(byte) ((l >> 32) & 0xff),
(byte) ((l >> 24) & 0xff),
(byte) ((l >> 16) & 0xff),
(byte) ((l >> 8) & 0xff),
(byte) (l & 0xff),};
}
public static byte[] unsignedInt32ToByteArray(int value) throws IOException {
return new byte[]{
(byte) (value >>> 24),
(byte) (value >>> 16),
(byte) (value >>> 8),
(byte) value};
}
public static double readDouble(InputStream in) throws IOException {
long bits = ((long) (in.read() & 0xff) << 56) | ((long) (in.read() & 0xff) << 48) | ((long) (in.read() & 0xff) << 40) | ((long) (in.read() & 0xff) << 32) | ((in.read() & 0xff) << 24) | ((in.read() & 0xff) << 16) | ((in.read() & 0xff) << 8) | (in.read() & 0xff);
return Double.longBitsToDouble(bits);
}
public static void writeDouble(OutputStream out, double d) throws IOException {
long l = Double.doubleToRawLongBits(d);
out.write(new byte[]{
(byte) ((l >> 56) & 0xff),
(byte) ((l >> 48) & 0xff),
(byte) ((l >> 40) & 0xff),
(byte) ((l >> 32) & 0xff),
(byte) ((l >> 24) & 0xff),
(byte) ((l >> 16) & 0xff),
(byte) ((l >> 8) & 0xff),
(byte) (l & 0xff)});
}
}

@ -0,0 +1,66 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import net.ossrs.sea.rtmp.Util;
/**
* AMF Array
*
* @author francois
*/
public class AmfArray implements AmfData {
private List<AmfData> items;
private int size = -1;
@Override
public void writeTo(OutputStream out) throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
int length = Util.readUnsignedInt32(in);
size = 5; // 1 + 4
items = new ArrayList<AmfData>(length);
for (int i = 0; i < length; i++) {
AmfData dataItem = AmfDecoder.readFrom(in);
size += dataItem.getSize();
items.add(dataItem);
}
}
@Override
public int getSize() {
if (size == -1) {
size = 5; // 1 + 4
if (items != null) {
for (AmfData dataItem : items) {
size += dataItem.getSize();
}
}
}
return size;
}
/** @return the amount of items in this the array */
public int getLength() {
return items != null ? items.size() : 0;
}
public List<AmfData> getItems() {
if (items == null) {
items = new ArrayList<AmfData>();
}
return items;
}
public void addItem(AmfData dataItem) {
getItems().add(this);
}
}

@ -0,0 +1,50 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
* @author francois
*/
public class AmfBoolean implements AmfData {
private boolean value;
public boolean isValue() {
return value;
}
public void setValue(boolean value) {
this.value = value;
}
public AmfBoolean(boolean value) {
this.value = value;
}
public AmfBoolean() {
}
@Override
public void writeTo(OutputStream out) throws IOException {
out.write(AmfType.BOOLEAN.getValue());
out.write(value ? 0x01 : 0x00);
}
@Override
public void readFrom(InputStream in) throws IOException {
value = (in.read() == 0x01) ? true : false;
}
public static boolean readBooleanFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
return (in.read() == 0x01) ? true : false;
}
@Override
public int getSize() {
return 2;
}
}

@ -0,0 +1,31 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* Base AMF data object. All other AMF data type instances derive from this
* (including AmfObject)
*
* @author francois
*/
public interface AmfData {
/**
* Write/Serialize this AMF data intance (Object/string/integer etc) to
* the specified OutputStream
*/
void writeTo(OutputStream out) throws IOException;
/**
* Read and parse bytes from the specified input stream to populate this
* AMFData instance (deserialize)
*
* @return the amount of bytes read
*/
void readFrom(InputStream in) throws IOException;
/** @return the amount of bytes required for this object */
int getSize();
}

@ -0,0 +1,48 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
/**
*
* @author francois
*/
public class AmfDecoder {
public static AmfData readFrom(InputStream in) throws IOException {
byte amfTypeByte = (byte) in.read();
AmfType amfType = AmfType.valueOf(amfTypeByte);
AmfData amfData;
switch (amfType) {
case NUMBER:
amfData = new AmfNumber();
break;
case BOOLEAN:
amfData = new AmfBoolean();
break;
case STRING:
amfData = new AmfString();
break;
case OBJECT:
amfData = new AmfObject();
break;
case NULL:
return new AmfNull();
case UNDEFINED:
return new AmfUndefined();
case MAP:
amfData = new AmfMap();
break;
case ARRAY:
amfData = new AmfArray();
break;
default:
throw new IOException("Unknown/unimplemented AMF data type: " + amfType);
}
amfData.readFrom(in);
return amfData;
}
}

@ -0,0 +1,52 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import net.ossrs.sea.rtmp.Util;
/**
* AMF map; that is, an "object"-like structure of key/value pairs, but with
* an array-like size indicator at the start (which is seemingly always 0)
*
* @author francois
*/
public class AmfMap extends AmfObject {
@Override
public void writeTo(OutputStream out) throws IOException {
// Begin the map/object/array/whatever exactly this is
out.write(AmfType.MAP.getValue());
// Write the "array size" == 0
Util.writeUnsignedInt32(out, 0);
// Write key/value pairs in this object
for (Map.Entry<String, AmfData> entry : properties.entrySet()) {
// The key must be a STRING type, and thus the "type-definition" byte is implied (not included in message)
AmfString.writeStringTo(out, entry.getKey(), true);
entry.getValue().writeTo(out);
}
// End the object
out.write(OBJECT_END_MARKER);
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
int length = Util.readUnsignedInt32(in); // Seems this is always 0
super.readFrom(in);
size += 4; // Add the bytes read for parsing the array size (length)
}
@Override
public int getSize() {
if (size == -1) {
size = super.getSize();
size += 4; // array length bytes
}
return size;
}
}

@ -0,0 +1,35 @@
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
* @author francois
*/
public class AmfNull implements AmfData {
@Override
public void writeTo(OutputStream out) throws IOException {
out.write(AmfType.NULL.getValue());
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
}
public static void writeNullTo(OutputStream out) throws IOException {
out.write(AmfType.NULL.getValue());
}
@Override
public int getSize() {
return 1;
}
}

@ -0,0 +1,62 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
/**
* AMF0 Number data type
*
* @author francois
*/
public class AmfNumber implements AmfData {
double value;
/** Size of an AMF number, in bytes (including type bit) */
public static final int SIZE = 9;
public AmfNumber(double value) {
this.value = value;
}
public AmfNumber() {
}
public double getValue() {
return value;
}
public void setValue(double value) {
this.value = value;
}
@Override
public void writeTo(OutputStream out) throws IOException {
out.write(AmfType.NUMBER.getValue());
Util.writeDouble(out, value);
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
value = Util.readDouble(in);
}
public static double readNumberFrom(InputStream in) throws IOException {
// Skip data type byte
in.read();
return Util.readDouble(in);
}
public static void writeNumberTo(OutputStream out, double number) throws IOException {
out.write(AmfType.NUMBER.getValue());
Util.writeDouble(out, number);
}
@Override
public int getSize() {
return SIZE;
}
}

@ -0,0 +1,109 @@
package net.ossrs.sea.rtmp.amf;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* AMF object
*
* @author francois
*/
public class AmfObject implements AmfData {
protected Map<String, AmfData> properties = new LinkedHashMap<String, AmfData>();
protected int size = -1;
/** Byte sequence that marks the end of an AMF object */
protected static final byte[] OBJECT_END_MARKER = new byte[]{0x00, 0x00, 0x09};
public AmfObject() {
}
public AmfData getProperty(String key) {
return properties.get(key);
}
public void setProperty(String key, AmfData value) {
properties.put(key, value);
}
public void setProperty(String key, boolean value) {
properties.put(key, new AmfBoolean(value));
}
public void setProperty(String key, String value) {
properties.put(key, new AmfString(value, false));
}
public void setProperty(String key, int value) {
properties.put(key, new AmfNumber(value));
}
public void setProperty(String key, double value) {
properties.put(key, new AmfNumber(value));
}
@Override
public void writeTo(OutputStream out) throws IOException {
// Begin the object
out.write(AmfType.OBJECT.getValue());
// Write key/value pairs in this object
for (Map.Entry<String, AmfData> entry : properties.entrySet()) {
// The key must be a STRING type, and thus the "type-definition" byte is implied (not included in message)
AmfString.writeStringTo(out, entry.getKey(), true);
entry.getValue().writeTo(out);
}
// End the object
out.write(OBJECT_END_MARKER);
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
size = 1;
InputStream markInputStream = in.markSupported() ? in : new BufferedInputStream(in);
while (true) {
// Look for the 3-byte object end marker [0x00 0x00 0x09]
markInputStream.mark(3);
byte[] endMarker = new byte[3];
markInputStream.read(endMarker);
if (endMarker[0] == OBJECT_END_MARKER[0] && endMarker[1] == OBJECT_END_MARKER[1] && endMarker[2] == OBJECT_END_MARKER[2]) {
// End marker found
size += 3;
return;
} else {
// End marker not found; reset the stream to the marked position and read an AMF property
markInputStream.reset();
// Read the property key...
String key = AmfString.readStringFrom(in, true);
size += AmfString.sizeOf(key, true);
// ...and the property value
AmfData value = AmfDecoder.readFrom(markInputStream);
size += value.getSize();
properties.put(key, value);
}
}
}
@Override
public int getSize() {
if (size == -1) {
size = 1; // object marker
for (Map.Entry<String, AmfData> entry : properties.entrySet()) {
size += AmfString.sizeOf(entry.getKey(), true);
size += entry.getValue().getSize();
}
size += 3; // end of object marker
}
return size;
}
}

@ -0,0 +1,128 @@
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.String;
import android.util.Log;
import net.ossrs.sea.rtmp.Util;
/**
*
* @author francois
*/
public class AmfString implements AmfData {
private static final String TAG = "AmfString";
private String value;
private boolean key;
private int size = -1;
public AmfString() {
}
public AmfString(String value, boolean isKey) {
this.value = value;
this.key = isKey;
}
public AmfString(String value) {
this(value, false);
}
public AmfString(boolean isKey) {
this.key = isKey;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public boolean isKey() {
return key;
}
public void setKey(boolean key) {
this.key = key;
}
@Override
public void writeTo(OutputStream out) throws IOException {
// Strings are ASCII encoded
byte[] byteValue = this.value.getBytes("ASCII");
// Write the STRING data type definition (except if this String is used as a key)
if (!key) {
out.write(AmfType.STRING.getValue());
}
// Write 2 bytes indicating string length
Util.writeUnsignedInt16(out, byteValue.length);
// Write string
out.write(byteValue);
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
int length = Util.readUnsignedInt16(in);
size = 3 + length; // 1 + 2 + length
// Read string value
byte[] byteValue = new byte[length];
Util.readBytesUntilFull(in, byteValue);
value = new String(byteValue, "ASCII");
}
public static String readStringFrom(InputStream in, boolean isKey) throws IOException {
if (!isKey) {
// Read past the data type byte
in.read();
}
int length = Util.readUnsignedInt16(in);
// Read string value
byte[] byteValue = new byte[length];
Util.readBytesUntilFull(in, byteValue);
return new String(byteValue, "ASCII");
}
public static void writeStringTo(OutputStream out, String string, boolean isKey) throws IOException {
// Strings are ASCII encoded
byte[] byteValue = string.getBytes("ASCII");
// Write the STRING data type definition (except if this String is used as a key)
if (!isKey) {
out.write(AmfType.STRING.getValue());
}
// Write 2 bytes indicating string length
Util.writeUnsignedInt16(out, byteValue.length);
// Write string
out.write(byteValue);
}
@Override
public int getSize() {
if (size == -1) {
try {
size = (isKey() ? 0 : 1) + 2 + value.getBytes("ASCII").length;
} catch (UnsupportedEncodingException ex) {
Log.e(TAG, "AmfString.getSize(): caught exception", ex);
throw new RuntimeException(ex);
}
}
return size;
}
/** @return the byte size of the resulting AMF string of the specified value */
public static int sizeOf(String string, boolean isKey) {
try {
int size = (isKey ? 0 : 1) + 2 + string.getBytes("ASCII").length;
return size;
} catch (UnsupportedEncodingException ex) {
Log.e(TAG, "AmfString.SizeOf(): caught exception", ex);
throw new RuntimeException(ex);
}
}
}

@ -0,0 +1,45 @@
package net.ossrs.sea.rtmp.amf;
import java.util.HashMap;
import java.util.Map;
/**
* AMF0 data type enum
*
* @author francois
*/
public enum AmfType {
/** Number (encoded as IEEE 64-bit double precision floating point number) */
NUMBER(0x00),
/** Boolean (Encoded as a single byte of value 0x00 or 0x01) */
BOOLEAN(0x01),
/** String (ASCII encoded) */
STRING(0x02),
/** Object - set of key/value pairs */
OBJECT(0x03),
NULL(0x05),
UNDEFINED(0x06),
MAP(0x08),
ARRAY(0x0A);
private byte value;
private static final Map<Byte, AmfType> quickLookupMap = new HashMap<Byte, AmfType>();
static {
for (AmfType amfType : AmfType.values()) {
quickLookupMap.put(amfType.getValue(), amfType);
}
}
private AmfType(int intValue) {
this.value = (byte) intValue;
}
public byte getValue() {
return value;
}
public static AmfType valueOf(byte amfTypeByte) {
return quickLookupMap.get(amfTypeByte);
}
}

@ -0,0 +1,35 @@
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package net.ossrs.sea.rtmp.amf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
*
* @author leoma
*/
public class AmfUndefined implements AmfData {
@Override
public void writeTo(OutputStream out) throws IOException {
out.write(AmfType.UNDEFINED.getValue());
}
@Override
public void readFrom(InputStream in) throws IOException {
// Skip data type byte (we assume it's already read)
}
public static void writeUndefinedTo(OutputStream out) throws IOException {
out.write(AmfType.UNDEFINED.getValue());
}
@Override
public int getSize() {
return 1;
}
}

@ -0,0 +1,76 @@
package net.ossrs.sea.rtmp.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.packets.RtmpHeader;
/**
* Chunk stream channel information
*
* @author francois
*/
public class ChunkStreamInfo {
public static final byte RTMP_STREAM_CHANNEL = 0x05;
public static final byte RTMP_COMMAND_CHANNEL = 0x03;
public static final byte RTMP_VIDEO_CHANNEL = 0x06;
public static final byte RTMP_AUDIO_CHANNEL = 0x07;
public static final byte RTMP_CONTROL_CHANNEL = 0x02;
private RtmpHeader prevHeaderRx;
private RtmpHeader prevHeaderTx;
private long realLastTimestamp = 0;
private ByteArrayOutputStream baos = new ByteArrayOutputStream(1024 * 128);
/** @return the previous header that was received on this channel, or <code>null</code> if no previous header was received */
public RtmpHeader prevHeaderRx() {
return prevHeaderRx;
}
/** Sets the previous header that was received on this channel, or <code>null</code> if no previous header was sent */
public void setPrevHeaderRx(RtmpHeader previousHeader) {
this.prevHeaderRx = previousHeader;
}
/** @return the previous header that was transmitted on this channel */
public RtmpHeader getPrevHeaderTx() {
return prevHeaderTx;
}
public boolean canReusePrevHeaderTx(RtmpHeader.MessageType forMessageType) {
return (prevHeaderTx != null && prevHeaderTx.getMessageType() == forMessageType);
}
/** Sets the previous header that was transmitted on this channel */
public void setPrevHeaderTx(RtmpHeader prevHeaderTx) {
this.prevHeaderTx = prevHeaderTx;
}
/** Utility method for calculating & synchronizing transmitted timestamps & timestamp deltas */
public long markRealAbsoluteTimestampTx() {
realLastTimestamp = System.currentTimeMillis() - realLastTimestamp;
return realLastTimestamp;
}
/** @return <code>true</code> if all packet data has been stored, or <code>false</code> if not */
public boolean storePacketChunk(InputStream in, int chunkSize) throws IOException {
final int remainingBytes = prevHeaderRx.getPacketLength() - baos.size();
byte[] chunk = new byte[Math.min(remainingBytes, chunkSize)];
Util.readBytesUntilFull(in, chunk);
baos.write(chunk);
return (baos.size() == prevHeaderRx.getPacketLength());
}
public ByteArrayInputStream getStoredPacketInputStream() {
ByteArrayInputStream bis = new ByteArrayInputStream(baos.toByteArray());
baos.reset();
return bis;
}
/** Clears all currently-stored packet chunks (used when an ABORT packet is received) */
public void clearStoredChunks() {
baos.reset();
}
}

@ -0,0 +1,14 @@
package net.ossrs.sea.rtmp.io;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
/**
* Handler interface for received RTMP packets
* @author francois
*/
public interface PacketRxHandler {
public void handleRxPacket(RtmpPacket rtmpPacket);
public void notifyWindowAckRequired(final int numBytesReadThusFar);
}

@ -0,0 +1,69 @@
package net.ossrs.sea.rtmp.io;
import java.io.EOFException;
import java.io.InputStream;
import android.util.Log;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
/**
* RTMPConnection's read thread
*
* @author francois, leo
*/
public class ReadThread extends Thread {
private static final String TAG = "ReadThread";
private RtmpDecoder rtmpDecoder;
private InputStream in;
private PacketRxHandler packetRxHandler;
private ThreadController threadController;
public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, PacketRxHandler packetRxHandler, ThreadController threadController) {
super("RtmpReadThread");
this.in = in;
this.packetRxHandler = packetRxHandler;
this.rtmpDecoder = new RtmpDecoder(rtmpSessionInfo);
this.threadController = threadController;
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
RtmpPacket rtmpPacket = rtmpDecoder.readPacket(in);
packetRxHandler.handleRxPacket(rtmpPacket);
} catch (EOFException eof) {
// The handler thread will wait until be invoked.
packetRxHandler.handleRxPacket(null);
// } catch (WindowAckRequired war) {
// Log.i(TAG, "Window Acknowledgment required, notifying packet handler...");
// packetRxHandler.notifyWindowAckRequired(war.getBytesRead());
// if (war.getRtmpPacket() != null) {
// // Pass to handler
// packetRxHandler.handleRxPacket(war.getRtmpPacket());
// }
} catch (Exception ex) {
if (!this.isInterrupted()) {
Log.e(TAG, "Caught exception while reading/decoding packet, shutting down...", ex);
this.interrupt();
}
}
}
// Close inputstream
try {
in.close();
} catch (Exception ex) {
Log.w(TAG, "Failed to close inputstream", ex);
}
Log.i(TAG, "exiting");
if (threadController != null) {
threadController.threadHasExited(this);
}
}
public void shutdown() {
Log.d(TAG, "Stopping read thread...");
this.interrupt();
}
}

@ -0,0 +1,437 @@
package net.ossrs.sea.rtmp.io;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import android.util.Log;
import net.ossrs.sea.rtmp.RtmpPublisher;
import net.ossrs.sea.rtmp.amf.AmfNull;
import net.ossrs.sea.rtmp.amf.AmfNumber;
import net.ossrs.sea.rtmp.amf.AmfObject;
import net.ossrs.sea.rtmp.packets.Abort;
import net.ossrs.sea.rtmp.packets.Acknowledgement;
import net.ossrs.sea.rtmp.packets.Handshake;
import net.ossrs.sea.rtmp.packets.Command;
import net.ossrs.sea.rtmp.packets.Audio;
import net.ossrs.sea.rtmp.packets.Video;
import net.ossrs.sea.rtmp.packets.UserControl;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
import net.ossrs.sea.rtmp.packets.WindowAckSize;
/**
* Main RTMP connection implementation class
*
* @author francois, leoma
*/
public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadController {
private static final String TAG = "RtmpConnection";
private static final Pattern rtmpUrlPattern = Pattern.compile("^rtmp://([^/:]+)(:(\\d+))*/([^/]+)(/(.*))*$");
private String appName;
private String host;
private String streamName;
private String publishType;
private String swfUrl = "";
private String tcUrl = "";
private String pageUrl = "";
private int port;
private Socket socket;
private RtmpSessionInfo rtmpSessionInfo;
private int transactionIdCounter = 0;
private static final int SOCKET_CONNECT_TIMEOUT_MS = 3000;
private WriteThread writeThread;
private final ConcurrentLinkedQueue<RtmpPacket> rxPacketQueue;
private final Object rxPacketLock = new Object();
private boolean active = false;
private volatile boolean fullyConnected = false;
private final Object connectingLock = new Object();
private final Object publishLock = new Object();
private volatile boolean connecting = false;
private int currentStreamId = -1;
public RtmpConnection(String url) {
this.tcUrl = url.substring(0, url.lastIndexOf('/'));
Matcher matcher = rtmpUrlPattern.matcher(url);
if (matcher.matches()) {
this.host = matcher.group(1);
String portStr = matcher.group(3);
this.port = portStr != null ? Integer.parseInt(portStr) : 1935;
this.appName = matcher.group(4);
this.streamName = matcher.group(6);
rtmpSessionInfo = new RtmpSessionInfo();
rxPacketQueue = new ConcurrentLinkedQueue<RtmpPacket>();
} else {
throw new RuntimeException("Invalid RTMP URL. Must be in format: rtmp://host[:port]/application[/streamName]");
}
}
@Override
public void connect() throws IOException {
Log.d(TAG, "connect() called. Host: " + host + ", port: " + port + ", appName: " + appName + ", publishPath: " + streamName);
socket = new Socket();
SocketAddress socketAddress = new InetSocketAddress(host, port);
socket.connect(socketAddress, SOCKET_CONNECT_TIMEOUT_MS);
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream());
Log.d(TAG, "connect(): socket connection established, doing handhake...");
handshake(in, out);
active = true;
Log.d(TAG, "connect(): handshake done");
ReadThread readThread = new ReadThread(rtmpSessionInfo, in, this, this);
writeThread = new WriteThread(rtmpSessionInfo, out, this);
readThread.start();
writeThread.start();
// Start the "main" handling thread
new Thread(new Runnable() {
@Override
public void run() {
try {
Log.d(TAG, "starting main rx handler loop");
handleRxPacketLoop();
} catch (IOException ex) {
Logger.getLogger(RtmpConnection.class.getName()).log(Level.SEVERE, null, ex);
}
}
}).start();
rtmpConnect();
}
@Override
public void publish(String type) throws IllegalStateException, IOException {
if (connecting) {
synchronized (connectingLock) {
try {
connectingLock.wait();
} catch (InterruptedException ex) {
// do nothing
}
}
}
this.publishType = type;
createStream();
}
private void createStream() {
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId != -1) {
throw new IllegalStateException("Current stream object has existed");
}
Log.d(TAG, "createStream(): Sending releaseStream command...");
// transactionId == 2
Command releaseStream = new Command("releaseStream", ++transactionIdCounter);
releaseStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
releaseStream.addData(new AmfNull()); // command object: null for "createStream"
releaseStream.addData(streamName); // command object: null for "releaseStream"
writeThread.send(releaseStream);
Log.d(TAG, "createStream(): Sending FCPublish command...");
// transactionId == 3
Command FCPublish = new Command("FCPublish", ++transactionIdCounter);
FCPublish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
FCPublish.addData(new AmfNull()); // command object: null for "FCPublish"
FCPublish.addData(streamName);
writeThread.send(FCPublish);
Log.d(TAG, "createStream(): Sending createStream command...");
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL);
// transactionId == 4
Command createStream = new Command("createStream", ++transactionIdCounter, chunkStreamInfo);
createStream.addData(new AmfNull()); // command object: null for "createStream"
writeThread.send(createStream);
// Waiting for "publish" command response.
synchronized (publishLock) {
try {
publishLock.wait();
} catch (InterruptedException ex) {
// do nothing
}
}
}
private void fmlePublish() throws IllegalStateException {
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId == -1) {
throw new IllegalStateException("No current stream object exists");
}
Log.d(TAG, "fmlePublish(): Sending publish command...");
// transactionId == 0
Command publish = new Command("publish", 0);
publish.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
publish.getHeader().setMessageStreamId(currentStreamId);
publish.addData(new AmfNull()); // command object: null for "publish"
publish.addData(streamName);
publish.addData(publishType);
writeThread.send(publish);
}
@Override
public void closeStream() throws IllegalStateException {
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId == -1) {
throw new IllegalStateException("No current stream object exists");
}
streamName = null;
Log.d(TAG, "closeStream(): setting current stream ID to -1");
currentStreamId = -1;
Command closeStream = new Command("closeStream", 0);
closeStream.getHeader().setChunkStreamId(ChunkStreamInfo.RTMP_STREAM_CHANNEL);
closeStream.getHeader().setMessageStreamId(currentStreamId);
closeStream.addData(new AmfNull()); // command object: null for "closeStream"
writeThread.send(closeStream);
}
/**
* Performs the RTMP handshake sequence with the server
*/
private void handshake(InputStream in, OutputStream out) throws IOException {
Handshake handshake = new Handshake();
handshake.writeC0(out);
handshake.writeC1(out); // Write C1 without waiting for S0
out.flush();
handshake.readS0(in);
handshake.readS1(in);
handshake.writeC2(out);
handshake.readS2(in);
}
private void rtmpConnect() throws IOException, IllegalStateException {
if (fullyConnected || connecting) {
throw new IllegalStateException("Already connecting, or connected to RTMP server");
}
Log.d(TAG, "rtmpConnect(): Building 'connect' invoke packet");
Command invoke = new Command("connect", ++transactionIdCounter, rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_COMMAND_CHANNEL));
invoke.getHeader().setMessageStreamId(0);
AmfObject args = new AmfObject();
args.setProperty("app", appName);
args.setProperty("flashVer", "LNX 11,2,202,233"); // Flash player OS: Linux, version: 11.2.202.233
args.setProperty("swfUrl", swfUrl);
args.setProperty("tcUrl", tcUrl);
args.setProperty("fpad", false);
args.setProperty("capabilities", 239);
args.setProperty("audioCodecs", 3575);
args.setProperty("videoCodecs", 252);
args.setProperty("videoFunction", 1);
args.setProperty("pageUrl", pageUrl);
args.setProperty("objectEncoding", 0);
invoke.addData(args);
connecting = true;
Log.d(TAG, "rtmpConnect(): Writing 'connect' invoke packet");
invoke.getHeader().setAbsoluteTimestamp(0);
writeThread.send(invoke);
}
@Override
public void handleRxPacket(RtmpPacket rtmpPacket) {
if (rtmpPacket != null) {
rxPacketQueue.add(rtmpPacket);
}
synchronized (rxPacketLock) {
rxPacketLock.notify();
}
}
private void handleRxPacketLoop() throws IOException {
// Handle all queued received RTMP packets
while (active) {
while (!rxPacketQueue.isEmpty()) {
RtmpPacket rtmpPacket = rxPacketQueue.poll();
//Log.d(TAG, "handleRxPacketLoop(): RTMP rx packet message type: " + rtmpPacket.getHeader().getMessageType());
switch (rtmpPacket.getHeader().getMessageType()) {
case ABORT:
rtmpSessionInfo.getChunkStreamInfo(((Abort) rtmpPacket).getChunkStreamId()).clearStoredChunks();
break;
case USER_CONTROL_MESSAGE: {
UserControl ping = (UserControl) rtmpPacket;
switch (ping.getType()) {
case PING_REQUEST: {
ChunkStreamInfo channelInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL);
Log.d(TAG, "handleRxPacketLoop(): Sending PONG reply..");
UserControl pong = new UserControl(ping, channelInfo);
writeThread.send(pong);
break;
}
case STREAM_EOF:
Log.i(TAG, "handleRxPacketLoop(): Stream EOF reached, closing RTMP writer...");
break;
}
break;
}
case WINDOW_ACKNOWLEDGEMENT_SIZE:
WindowAckSize windowAckSize = (WindowAckSize) rtmpPacket;
Log.d(TAG, "handleRxPacketLoop(): Setting acknowledgement window size to: " + windowAckSize.getAcknowledgementWindowSize());
rtmpSessionInfo.setAcknowledgmentWindowSize(windowAckSize.getAcknowledgementWindowSize());
break;
case SET_PEER_BANDWIDTH:
int acknowledgementWindowsize = rtmpSessionInfo.getAcknowledgementWindowSize();
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(ChunkStreamInfo.RTMP_CONTROL_CHANNEL);
Log.d(TAG, "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize);
writeThread.send(new WindowAckSize(acknowledgementWindowsize, chunkStreamInfo));
break;
case COMMAND_AMF0:
handleRxInvoke((Command) rtmpPacket);
break;
default:
Log.w(TAG, "handleRxPacketLoop(): Not handling unimplemented/unknown packet of type: " + rtmpPacket.getHeader().getMessageType());
break;
}
}
// Wait for next received packet
synchronized (rxPacketLock) {
try {
rxPacketLock.wait();
} catch (InterruptedException ex) {
Log.w(TAG, "handleRxPacketLoop: Interrupted", ex);
}
}
}
shutdownImpl();
}
private void handleRxInvoke(Command invoke) throws IOException {
String commandName = invoke.getCommandName();
if (commandName.equals("_result")) {
// This is the result of one of the methods invoked by us
String method = rtmpSessionInfo.takeInvokedCommand(invoke.getTransactionId());
Log.d(TAG, "handleRxInvoke: Got result for invoked method: " + method);
if ("connect".equals(method)) {
// We can now send createStream commands
connecting = false;
fullyConnected = true;
synchronized (connectingLock) {
connectingLock.notifyAll();
}
} else if ("createStream".contains(method)) {
// Get stream id
currentStreamId = (int) ((AmfNumber) invoke.getData().get(1)).getValue();
Log.d(TAG, "handleRxInvoke(): Stream ID to publish: " + currentStreamId);
if (streamName != null && publishType != null) {
fmlePublish();
}
} else if ("releaseStream".contains(method)) {
// Do nothing
} else if ("FCPublish".contains(method)) {
// Do nothing
} else {
Log.w(TAG, "handleRxInvoke(): '_result' message received for unknown method: " + method);
}
} else if (commandName.equals("onBWDone")) {
// Do nothing
} else if (commandName.equals("onFCPublish")) {
Log.d(TAG, "handleRxInvoke(): 'onFCPublish'");
synchronized (publishLock) {
publishLock.notifyAll();
}
} else if (commandName.equals("onStatus")) {
// Do nothing
} else {
Log.e(TAG, "handleRxInvoke(): Uknown/unhandled server invoke: " + invoke);
}
}
@Override
public void threadHasExited(Thread thread) {
shutdown();
}
@Override
public void shutdown() {
active = false;
synchronized (rxPacketLock) {
rxPacketLock.notify();
}
}
private void shutdownImpl() {
// Shut down read/write threads, if necessary
if (Thread.activeCount() > 1) {
Log.i(TAG, "shutdown(): Shutting down read/write threads");
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
for (Thread thread : threads) {
if (thread instanceof ReadThread && thread.isAlive()) {
((ReadThread) thread).shutdown();
} else if (thread instanceof WriteThread && thread.isAlive()) {
((WriteThread) thread).shutdown();
}
}
}
if (socket != null) {
try {
socket.close();
} catch (Exception ex) {
Log.w(TAG, "shutdown(): failed to close socket", ex);
}
}
}
@Override
public void notifyWindowAckRequired(final int numBytesReadThusFar) {
Log.i(TAG, "notifyWindowAckRequired() called");
// Create and send window bytes read acknowledgement
writeThread.send(new Acknowledgement(numBytesReadThusFar));
}
@Override
public void publishVideoData(byte[] data, int dts) throws IllegalStateException {
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId == -1) {
throw new IllegalStateException("No current stream object exists");
}
Video video = new Video();
video.setData(data);
video.getHeader().setMessageStreamId(currentStreamId);
video.getHeader().setAbsoluteTimestamp(dts);
writeThread.send(video);
}
@Override
public void publishAudioData(byte[] data, int dts) throws IllegalStateException {
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId == -1) {
throw new IllegalStateException("No current stream object exists");
}
Audio audio = new Audio();
audio.setData(data);
audio.getHeader().setMessageStreamId(currentStreamId);
audio.getHeader().setAbsoluteTimestamp(dts);
writeThread.send(audio);
}
}

@ -0,0 +1,95 @@
package net.ossrs.sea.rtmp.io;
import java.io.IOException;
import java.io.InputStream;
import android.util.Log;
import net.ossrs.sea.rtmp.packets.Abort;
import net.ossrs.sea.rtmp.packets.Audio;
import net.ossrs.sea.rtmp.packets.Command;
import net.ossrs.sea.rtmp.packets.Data;
import net.ossrs.sea.rtmp.packets.RtmpHeader;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
import net.ossrs.sea.rtmp.packets.SetChunkSize;
import net.ossrs.sea.rtmp.packets.SetPeerBandwidth;
import net.ossrs.sea.rtmp.packets.UserControl;
import net.ossrs.sea.rtmp.packets.Video;
import net.ossrs.sea.rtmp.packets.WindowAckSize;
/**
*
* @author francois
*/
public class RtmpDecoder {
private static final String TAG = "RtmpDecoder";
private RtmpSessionInfo rtmpSessionInfo;
public RtmpDecoder(RtmpSessionInfo rtmpSessionInfo) {
this.rtmpSessionInfo = rtmpSessionInfo;
}
public RtmpPacket readPacket(InputStream in) throws IOException {
RtmpHeader header = RtmpHeader.readHeader(in, rtmpSessionInfo);
RtmpPacket rtmpPacket;
Log.d(TAG, "readPacket(): header.messageType: " + header.getMessageType());
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(header.getChunkStreamId());
chunkStreamInfo.setPrevHeaderRx(header);
if (header.getPacketLength() > rtmpSessionInfo.getChunkSize()) {
//Log.d(TAG, "readPacket(): packet size (" + header.getPacketLength() + ") is bigger than chunk size (" + rtmpSessionInfo.getChunkSize() + "); storing chunk data");
// This packet consists of more than one chunk; store the chunks in the chunk stream until everything is read
if (!chunkStreamInfo.storePacketChunk(in, rtmpSessionInfo.getChunkSize())) {
Log.d(TAG, " readPacket(): returning null because of incomplete packet");
return null; // packet is not yet complete
} else {
Log.d(TAG, " readPacket(): stored chunks complete packet; reading packet");
in = chunkStreamInfo.getStoredPacketInputStream();
}
} else {
//Log.d(TAG, "readPacket(): packet size (" + header.getPacketLength() + ") is LESS than chunk size (" + rtmpSessionInfo.getChunkSize() + "); reading packet fully");
}
switch (header.getMessageType()) {
case SET_CHUNK_SIZE: {
SetChunkSize setChunkSize = new SetChunkSize(header);
setChunkSize.readBody(in);
Log.d(TAG, "readPacket(): Setting chunk size to: " + setChunkSize.getChunkSize());
rtmpSessionInfo.setChunkSize(setChunkSize.getChunkSize());
return null;
}
case ABORT:
rtmpPacket = new Abort(header);
break;
case USER_CONTROL_MESSAGE:
rtmpPacket = new UserControl(header);
break;
case WINDOW_ACKNOWLEDGEMENT_SIZE:
rtmpPacket = new WindowAckSize(header);
break;
case SET_PEER_BANDWIDTH:
rtmpPacket = new SetPeerBandwidth(header);
break;
case AUDIO:
rtmpPacket = new Audio(header);
break;
case VIDEO:
rtmpPacket = new Video(header);
break;
case COMMAND_AMF0:
rtmpPacket = new Command(header);
break;
case DATA_AMF0:
rtmpPacket = new Data(header);
break;
default:
throw new IOException("No packet body implementation for message type: " + header.getMessageType());
}
rtmpPacket.readBody(in);
return rtmpPacket;
}
}

@ -0,0 +1,72 @@
package net.ossrs.sea.rtmp.io;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
/**
*
* @author francois
*/
public class RtmpSessionInfo {
/** The (total) number of bytes read for this window (resets to 0 if the agreed-upon RTMP window acknowledgement size is reached) */
private int windowBytesRead;
/** The window acknowledgement size for this RTMP session, in bytes; default to max to avoid unnecessary "Acknowledgment" messages from being sent */
private int acknowledgementWindowSize = Integer.MAX_VALUE;
/** Used internally to store the total number of bytes read (used when sending Acknowledgement messages) */
private int totalBytesRead = 0;
/** Default chunk size is 128 bytes */
private int chunkSize = 128;
private Map<Integer, ChunkStreamInfo> chunkChannels = new HashMap<Integer, ChunkStreamInfo>();
private Map<Integer, String> invokedMethods = new ConcurrentHashMap<Integer, String>();
public ChunkStreamInfo getChunkStreamInfo(int chunkStreamId) {
ChunkStreamInfo chunkStreamInfo = chunkChannels.get(chunkStreamId);
if (chunkStreamInfo == null) {
chunkStreamInfo = new ChunkStreamInfo();
chunkChannels.put(chunkStreamId, chunkStreamInfo);
}
return chunkStreamInfo;
}
public String takeInvokedCommand(int transactionId) {
return invokedMethods.remove(transactionId);
}
public String addInvokedCommand(int transactionId, String commandName) {
return invokedMethods.put(transactionId, commandName);
}
public int getChunkSize() {
return chunkSize;
}
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
public int getAcknowledgementWindowSize() {
return acknowledgementWindowSize;
}
public void setAcknowledgmentWindowSize(int acknowledgementWindowSize) {
this.acknowledgementWindowSize = acknowledgementWindowSize;
}
/**
* Add the specified amount of bytes to the total number of bytes read for this RTMP window;
* @param numBytes the number of bytes to add
* @return <code>true</code> if an "acknowledgement" packet should be sent, <code>false</code> otherwise
*/
public final void addToWindowBytesRead(final int numBytes, final RtmpPacket packet) throws WindowAckRequired {
windowBytesRead += numBytes;
totalBytesRead += numBytes;
if (windowBytesRead >= acknowledgementWindowSize) {
windowBytesRead -= acknowledgementWindowSize;
throw new WindowAckRequired(totalBytesRead, packet);
}
}
}

@ -0,0 +1,16 @@
package net.ossrs.sea.rtmp.io;
/**
* Simple interface for the "parent" of one or more worker threads, so that
* these worker threads can signal the parent if they stop (e.g. in the event of
* parent/main thread not expecting a child thread to exit, such as when an irrecoverable
* error has occurred in that child thread).
*
* @author francois
*/
public interface ThreadController {
/** Called when a child thread has exited its run() loop */
void threadHasExited(Thread thread);
}

@ -0,0 +1,40 @@
package net.ossrs.sea.rtmp.io;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
/**
* Thrown by RTMP read thread when an Acknowledgement packet needs to be sent
* to acknowledge the RTMP window size. It contains the RTMP packet that was
* read when this event occurred (if any).
*
* @author francois
*/
public class WindowAckRequired extends Exception {
private RtmpPacket rtmpPacket;
private int bytesRead;
/**
* Used when the window acknowledgement size was reached, whilst fully reading
* an RTMP packet or not. If a packet is present, it should still be handled as if it was returned
* by the RTMP decoder.
*
* @param bytesReadThusFar The (total) number of bytes received so far
* @param rtmpPacket The packet that was read (and thus should be handled), can be <code>null</code>
*/
public WindowAckRequired(int bytesReadThusFar, RtmpPacket rtmpPacket) {
this.rtmpPacket = rtmpPacket;
this.bytesRead = bytesReadThusFar;
}
/**
* @return The RTMP packet that should be handled, or <code>null</code> if no full packet is available
*/
public RtmpPacket getRtmpPacket() {
return rtmpPacket;
}
public int getBytesRead() {
return bytesRead;
}
}

@ -0,0 +1,108 @@
package net.ossrs.sea.rtmp.io;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import android.util.Log;
import net.ossrs.sea.rtmp.packets.Command;
import net.ossrs.sea.rtmp.packets.RtmpPacket;
/**
* RTMPConnection's write thread
*
* @author francois, leo
*/
public class WriteThread extends Thread {
private static final String TAG = "WriteThread";
private RtmpSessionInfo rtmpSessionInfo;
private OutputStream out;
private ConcurrentLinkedQueue<RtmpPacket> writeQueue = new ConcurrentLinkedQueue<RtmpPacket>();
private final Object txPacketLock = new Object();
private volatile boolean active = true;
private ThreadController threadController;
public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, ThreadController threadController) {
super("RtmpWriteThread");
this.rtmpSessionInfo = rtmpSessionInfo;
this.out = out;
this.threadController = threadController;
}
@Override
public void run() {
while (active) {
try {
while (!writeQueue.isEmpty()) {
RtmpPacket rtmpPacket = writeQueue.poll();
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo.getChunkStreamInfo(rtmpPacket.getHeader().getChunkStreamId());
chunkStreamInfo.setPrevHeaderTx(rtmpPacket.getHeader());
rtmpPacket.writeTo(out, rtmpSessionInfo.getChunkSize(), chunkStreamInfo);
Log.d(TAG, "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket.getHeader().getPacketLength());
if (rtmpPacket instanceof Command) {
rtmpSessionInfo.addInvokedCommand(((Command) rtmpPacket).getTransactionId(), ((Command) rtmpPacket).getCommandName());
}
}
out.flush();
} catch (SocketException se) {
Log.e(TAG, "Caught SocketException during write loop, shutting down", se);
active = false;
continue;
} catch (IOException ex) {
Log.e(TAG, "Caught IOException during write loop, shutting down", ex);
active = false;
continue; // Exit this thread
}
// Waiting for next packet
synchronized (txPacketLock) {
try {
txPacketLock.wait();
} catch (InterruptedException ex) {
Log.w(TAG, "Interrupted", ex);
}
}
}
// Close outputstream
try {
out.close();
} catch (Exception ex) {
Log.w(TAG, "Failed to close outputstream", ex);
}
Log.d(TAG, "exiting");
if (threadController != null) {
threadController.threadHasExited(this);
}
}
/** Transmit the specified RTMP packet (thread-safe) */
public void send(RtmpPacket rtmpPacket) {
if (rtmpPacket != null) {
writeQueue.offer(rtmpPacket);
}
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
/** Transmit the specified RTMP packet (thread-safe) */
public void send(RtmpPacket... rtmpPackets) {
writeQueue.addAll(Arrays.asList(rtmpPackets));
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
public void shutdown() {
Log.d(TAG, "Stopping write thread...");
active = false;
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
}

@ -0,0 +1,47 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* A "Abort" RTMP control message, received on chunk stream ID 2 (control channel)
*
* @author francois
*/
public class Abort extends RtmpPacket {
private int chunkStreamId;
public Abort(RtmpHeader header) {
super(header);
}
public Abort(int chunkStreamId) {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_1_RELATIVE_LARGE, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.SET_CHUNK_SIZE));
this.chunkStreamId = chunkStreamId;
}
/** @return the ID of the chunk stream to be aborted */
public int getChunkStreamId() {
return chunkStreamId;
}
/** Sets the ID of the chunk stream to be aborted */
public void setChunkStreamId(int chunkStreamId) {
this.chunkStreamId = chunkStreamId;
}
@Override
public void readBody(InputStream in) throws IOException {
// Value is received in the 4 bytes of the body
chunkStreamId = Util.readUnsignedInt32(in);
}
@Override
protected void writeBody(OutputStream out) throws IOException {
Util.writeUnsignedInt32(out, chunkStreamId);
}
}

@ -0,0 +1,62 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* (Window) Acknowledgement
*
* The client or the server sends the acknowledgment to the peer after
* receiving bytes equal to the window size. The window size is the
* maximum number of bytes that the sender sends without receiving
* acknowledgment from the receiver. The server sends the window size to
* the client after application connects. This message specifies the
* sequence number, which is the number of the bytes received so far.
*
* @author francois
*/
public class Acknowledgement extends RtmpPacket {
private int sequenceNumber;
public Acknowledgement(RtmpHeader header) {
super(header);
}
public Acknowledgement(int numBytesReadThusFar) {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.ACKNOWLEDGEMENT));
this.sequenceNumber = numBytesReadThusFar;
}
public int getAcknowledgementWindowSize() {
return sequenceNumber;
}
/** @return the sequence number, which is the number of the bytes received so far */
public int getSequenceNumber() {
return sequenceNumber;
}
/** Sets the sequence number, which is the number of the bytes received so far */
public void setSequenceNumber(int numBytesRead) {
this.sequenceNumber = numBytesRead;
}
@Override
public void readBody(InputStream in) throws IOException {
sequenceNumber = Util.readUnsignedInt32(in);
}
@Override
protected void writeBody(OutputStream out) throws IOException {
Util.writeUnsignedInt32(out, sequenceNumber);
}
@Override
public String toString() {
return "RTMP Acknowledgment (sequence number: " + sequenceNumber + ")";
}
}

@ -0,0 +1,24 @@
package net.ossrs.sea.rtmp.packets;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* Audio data packet
*
* @author francois
*/
public class Audio extends ContentData {
public Audio(RtmpHeader header) {
super(header);
}
public Audio() {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_AUDIO_CHANNEL, RtmpHeader.MessageType.AUDIO));
}
@Override
public String toString() {
return "RTMP Audio";
}
}

@ -0,0 +1,80 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.amf.AmfNumber;
import net.ossrs.sea.rtmp.amf.AmfString;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* Encapsulates an command/"invoke" RTMP packet
*
* Invoke/command packet structure (AMF encoded):
* (String) <commmand name>
* (Number) <Transaction ID>
* (Mixed) <Argument> ex. Null, String, Object: {key1:value1, key2:value2 ... }
*
* @author francois
*/
public class Command extends VariableBodyRtmpPacket {
private static final String TAG = "Command";
private String commandName;
private int transactionId;
public Command(RtmpHeader header) {
super(header);
}
public Command(String commandName, int transactionId, ChunkStreamInfo channelInfo) {
super(new RtmpHeader((channelInfo.canReusePrevHeaderTx(RtmpHeader.MessageType.COMMAND_AMF0) ? RtmpHeader.ChunkType.TYPE_1_RELATIVE_LARGE : RtmpHeader.ChunkType.TYPE_0_FULL), ChunkStreamInfo.RTMP_COMMAND_CHANNEL, RtmpHeader.MessageType.COMMAND_AMF0));
this.commandName = commandName;
this.transactionId = transactionId;
}
public Command(String commandName, int transactionId) {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_COMMAND_CHANNEL, RtmpHeader.MessageType.COMMAND_AMF0));
this.commandName = commandName;
this.transactionId = transactionId;
}
public String getCommandName() {
return commandName;
}
public void setCommandName(String commandName) {
this.commandName = commandName;
}
public int getTransactionId() {
return transactionId;
}
public void setTransactionId(int transactionId) {
this.transactionId = transactionId;
}
@Override
public void readBody(InputStream in) throws IOException {
// The command name and transaction ID are always present (AMF string followed by number)
commandName = AmfString.readStringFrom(in, false);
transactionId = (int) AmfNumber.readNumberFrom(in);
int bytesRead = AmfString.sizeOf(commandName, false) + AmfNumber.SIZE;
readVariableData(in, bytesRead);
}
@Override
protected void writeBody(OutputStream out) throws IOException {
AmfString.writeStringTo(out, commandName, false);
AmfNumber.writeNumberTo(out, transactionId);
// Write body data
writeVariableData(out);
}
@Override
public String toString() {
return "RTMP Command (command: " + commandName + ", transaction ID: " + transactionId + ")";
}
}

@ -0,0 +1,44 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
/**
* Content (audio/video) data packet base
*
* @author francois
*/
public abstract class ContentData extends RtmpPacket {
protected byte[] data;
public ContentData(RtmpHeader header) {
super(header);
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
@Override
public void readBody(InputStream in) throws IOException {
data = new byte[this.header.getPacketLength()];
Util.readBytesUntilFull(in, data);
}
/**
* Method is public for content (audio/video)
* Write this packet body without chunking;
* useful for dumping audio/video streams
*/
@Override
public void writeBody(OutputStream out) throws IOException {
out.write(data);
}
}

@ -0,0 +1,59 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.amf.AmfString;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* AMF Data packet
*
* Also known as NOTIFY in some RTMP implementations.
*
* The client or the server sends this message to send Metadata or any user data
* to the peer. Metadata includes details about the data (audio, video etc.)
* like creation time, duration, theme and so on.
*
* @author francois
*/
public class Data extends VariableBodyRtmpPacket {
private String type;
public Data(RtmpHeader header) {
super(header);
}
public Data(String type) {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_COMMAND_CHANNEL, RtmpHeader.MessageType.DATA_AMF0));
this.type = type;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public void readBody(InputStream in) throws IOException {
// Read notification type
type = AmfString.readStringFrom(in, false);
int bytesRead = AmfString.sizeOf(type, false);
// Read data body
readVariableData(in, bytesRead);
}
/**
* This method is public for Data to make it easy to dump its contents to
* another output stream
*/
@Override
public void writeBody(OutputStream out) throws IOException {
AmfString.writeStringTo(out, type, false);
writeVariableData(out);
}
}

@ -0,0 +1,221 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import android.util.Log;
import net.ossrs.sea.rtmp.Crypto;
import net.ossrs.sea.rtmp.Util;
/**
* Handles the RTMP handshake song 'n dance
*
* Thanks to http://thompsonng.blogspot.com/2010/11/rtmp-part-10-handshake.html for some very useful information on
* the the hidden "features" of the RTMP handshake
*
* @author francois
*/
public final class Handshake {
private static final String TAG = "Handshake";
/** S1 as sent by the server */
private byte[] s1;
private static final int PROTOCOL_VERSION = 0x03;
private static final int HANDSHAKE_SIZE = 1536;
private static final int SHA256_DIGEST_SIZE = 32;
private static final int DIGEST_OFFSET_INDICATOR_POS = 772; // should either be byte 772 or byte 8
private static final byte[] GENUINE_FP_KEY = {
(byte) 0x47, (byte) 0x65, (byte) 0x6E, (byte) 0x75, (byte) 0x69, (byte) 0x6E, (byte) 0x65, (byte) 0x20,
(byte) 0x41, (byte) 0x64, (byte) 0x6F, (byte) 0x62, (byte) 0x65, (byte) 0x20, (byte) 0x46, (byte) 0x6C,
(byte) 0x61, (byte) 0x73, (byte) 0x68, (byte) 0x20, (byte) 0x50, (byte) 0x6C, (byte) 0x61, (byte) 0x79,
(byte) 0x65, (byte) 0x72, (byte) 0x20, (byte) 0x30, (byte) 0x30, (byte) 0x31, // Genuine Adobe Flash Player 001
(byte) 0xF0, (byte) 0xEE, (byte) 0xC2, (byte) 0x4A, (byte) 0x80, (byte) 0x68, (byte) 0xBE, (byte) 0xE8,
(byte) 0x2E, (byte) 0x00, (byte) 0xD0, (byte) 0xD1, (byte) 0x02, (byte) 0x9E, (byte) 0x7E, (byte) 0x57,
(byte) 0x6E, (byte) 0xEC, (byte) 0x5D, (byte) 0x2D, (byte) 0x29, (byte) 0x80, (byte) 0x6F, (byte) 0xAB,
(byte) 0x93, (byte) 0xB8, (byte) 0xE6, (byte) 0x36, (byte) 0xCF, (byte) 0xEB, (byte) 0x31, (byte) 0xAE};
/** Generates and writes the first handshake packet (C0) */
public final void writeC0(OutputStream out) throws IOException {
Log.d(TAG, "writeC0");
out.write(PROTOCOL_VERSION);
}
public final void readS0(InputStream in) throws IOException {
Log.d(TAG, "readS0");
byte s0 = (byte) in.read();
if (s0 != PROTOCOL_VERSION) {
if (s0 == -1) {
throw new IOException("InputStream closed");
} else {
throw new IOException("Invalid RTMP protocol version; expected " + PROTOCOL_VERSION + ", got " + s0);
}
}
}
/** Generates and writes the second handshake packet (C1) */
public final void writeC1(OutputStream out) throws IOException {
Log.d(TAG, "writeC1");
// Util.writeUnsignedInt32(out, (int) (System.currentTimeMillis() / 1000)); // Bytes 0 - 3 bytes: current epoch (timestamp)
//out.write(new byte[]{0x09, 0x00, 0x7c, 0x02}); // Bytes 4 - 7: Flash player version: 9.0.124.2
// out.write(new byte[]{(byte) 0x80, 0x00, 0x07, 0x02}); // Bytes 4 - 7: Flash player version: 11.2.202.233
Log.d(TAG, "writeC1(): Calculating digest offset");
Random random = new Random();
// Since we are faking a real Flash Player handshake, include a digest in C1
// Choose digest offset point (scheme 1; that is, offset is indicated by bytes 772 - 775 (4 bytes) )
final int digestOffset = random.nextInt(HANDSHAKE_SIZE - DIGEST_OFFSET_INDICATOR_POS - 4 - 8 - SHA256_DIGEST_SIZE); //random.nextInt(DIGEST_OFFSET_INDICATOR_POS - SHA256_DIGEST_SIZE);
final int absoluteDigestOffset = ((digestOffset % 728) + DIGEST_OFFSET_INDICATOR_POS + 4);
Log.d(TAG, "writeC1(): (real value of) digestOffset: " + digestOffset);
Log.d(TAG, "writeC1(): recalculated digestOffset: " + absoluteDigestOffset);
int remaining = digestOffset;
final byte[] digestOffsetBytes = new byte[4];
for (int i = 3; i >= 0; i--) {
if (remaining > 255) {
digestOffsetBytes[i] = (byte)255;
remaining -= 255;
} else {
digestOffsetBytes[i] = (byte)remaining;
remaining -= remaining;
}
}
// Calculate the offset value that will be written
//inal byte[] digestOffsetBytes = Util.unsignedInt32ToByteArray(digestOffset);// //((digestOffset - DIGEST_OFFSET_INDICATOR_POS) % 728)); // Thanks to librtmp for the mod 728
Log.d(TAG, "writeC1(): digestOffsetBytes: " + Util.toHexString(digestOffsetBytes)); //Util.unsignedInt32ToByteArray((digestOffset % 728))));
// Create random bytes up to the digest offset point
byte[] partBeforeDigest = new byte[absoluteDigestOffset];
Log.d(TAG, "partBeforeDigest(): size: " + partBeforeDigest.length);
random.nextBytes(partBeforeDigest);
Log.d(TAG, "writeC1(): Writing timestamp and Flash Player version");
byte[] timeStamp = Util.unsignedInt32ToByteArray((int) (System.currentTimeMillis() / 1000));
System.arraycopy(timeStamp, 0, partBeforeDigest, 0, 4); // Bytes 0 - 3 bytes: current epoch timestamp
System.arraycopy(new byte[]{(byte) 0x80, 0x00, 0x07, 0x02}, 0, partBeforeDigest, 4, 4); // Bytes 4 - 7: Flash player version: 11.2.202.233
// Create random bytes for the part after the digest
byte[] partAfterDigest = new byte[HANDSHAKE_SIZE - absoluteDigestOffset - SHA256_DIGEST_SIZE]; // subtract 8 because of initial 8 bytes already written
Log.d(TAG, "partAfterDigest(): size: " + partAfterDigest.length);
random.nextBytes(partAfterDigest);
// Set the offset byte
// if (digestOffset > 772) {
Log.d(TAG, "copying digest offset bytes in partBeforeDigest");
System.arraycopy(digestOffsetBytes, 0, partBeforeDigest, 772, 4);
// } else {
// Implied offset of partAfterDigest is digestOffset + 32
/// Log.d(TAG, "copying digest offset bytes in partAfterDigest");
/// Log.d(TAG, " writing to location: " + (DIGEST_OFFSET_INDICATOR_POS - digestOffset - SHA256_DIGEST_SIZE - 8));
// System.arraycopy(digestOffsetBytes, 0, partAfterDigest, (DIGEST_OFFSET_INDICATOR_POS - digestOffset - SHA256_DIGEST_SIZE - 8), 4);
// }
Log.d(TAG, "writeC1(): Calculating digest");
byte[] tempBuffer = new byte[HANDSHAKE_SIZE - SHA256_DIGEST_SIZE];
System.arraycopy(partBeforeDigest, 0, tempBuffer, 0, partBeforeDigest.length);
System.arraycopy(partAfterDigest, 0, tempBuffer, partBeforeDigest.length, partAfterDigest.length);
Crypto crypto = new Crypto();
byte[] digest = crypto.calculateHmacSHA256(tempBuffer, GENUINE_FP_KEY, 30);
// Now write the packet
Log.d(TAG, "writeC1(): writing C1 packet");
out.write(partBeforeDigest);
out.write(digest);
out.write(partAfterDigest);
}
public final void readS1(InputStream in) throws IOException {
// S1 == 1536 bytes. We do not bother with checking the content of it
Log.d(TAG, "readS1");
s1 = new byte[HANDSHAKE_SIZE];
// Read server time (4 bytes)
int totalBytesRead = 0;
int read;
do {
read = in.read(s1, totalBytesRead, (HANDSHAKE_SIZE - totalBytesRead));
if (read != -1) {
totalBytesRead += read;
}
} while (totalBytesRead < HANDSHAKE_SIZE);
if (totalBytesRead != HANDSHAKE_SIZE) {
throw new IOException("Unexpected EOF while reading S1, expected " + HANDSHAKE_SIZE + " bytes, but only read " + totalBytesRead + " bytes");
} else {
Log.d(TAG, "readS1(): S1 total bytes read OK");
}
}
/** Generates and writes the third handshake packet (C2) */
public final void writeC2(OutputStream out) throws IOException {
Log.d(TAG, "readC2");
// C2 is an echo of S1
if (s1 == null) {
throw new IllegalStateException("C2 cannot be written without S1 being read first");
}
out.write(s1);
}
public final void readS2(InputStream in) throws IOException {
// S2 should be an echo of C1, but we are not too strict
Log.d(TAG, "readS2");
byte[] sr_serverTime = new byte[4];
byte[] s2_serverVersion = new byte[4];
byte[] s2_rest = new byte[HANDSHAKE_SIZE - 8]; // subtract 4+4 bytes for time and version
// Read server time (4 bytes)
int totalBytesRead = 0;
int read;
do {
read = in.read(sr_serverTime, totalBytesRead, (4 - totalBytesRead));
if (read == -1) {
// End of stream reached - should not have happened at this point
throw new IOException("Unexpected EOF while reading S2 bytes 0-3");
} else {
totalBytesRead += read;
}
} while (totalBytesRead < 4);
// Read server version (4 bytes)
totalBytesRead = 0;
do {
read = in.read(s2_serverVersion, totalBytesRead, (4 - totalBytesRead));
if (read == -1) {
// End of stream reached - should not have happened at this point
throw new IOException("Unexpected EOF while reading S2 bytes 4-7");
} else {
totalBytesRead += read;
}
} while (totalBytesRead < 4);
// Read 1528 bytes (to make up S1 total size of 1536 bytes)
final int remainingBytes = HANDSHAKE_SIZE - 8;
totalBytesRead = 0;
do {
read = in.read(s2_rest, totalBytesRead, (remainingBytes - totalBytesRead));
if (read != -1) {
totalBytesRead += read;
}
} while (totalBytesRead < remainingBytes && read != -1);
if (totalBytesRead != remainingBytes) {
throw new IOException("Unexpected EOF while reading remainder of S2, expected " + remainingBytes + " bytes, but only read " + totalBytesRead + " bytes");
} else {
Log.d(TAG, "readS2(): S2 total bytes read OK");
}
// Technically we should check that S2 == C1, but for now this is ignored
}
}

@ -0,0 +1,431 @@
/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package net.ossrs.sea.rtmp.packets;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import android.util.Log;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
import net.ossrs.sea.rtmp.io.RtmpSessionInfo;
/**
*
* @author francois
*/
public class RtmpHeader {
private static final String TAG = "RtmpHeader";
/**
* RTMP packet/message type definitions.
* Note: docstrings are adapted from the official Adobe RTMP spec:
* http://www.adobe.com/devnet/rtmp/
*/
public static enum MessageType {
/**
* Protocol control message 1
* Set Chunk Size, is used to notify the peer a new maximum chunk size to use.
*/
SET_CHUNK_SIZE(0x01),
/**
* Protocol control message 2
* Abort Message, is used to notify the peer if it is waiting for chunks
* to complete a message, then to discard the partially received message
* over a chunk stream and abort processing of that message.
*/
ABORT(0x02),
/**
* Protocol control message 3
* The client or the server sends the acknowledgment to the peer after
* receiving bytes equal to the window size. The window size is the
* maximum number of bytes that the sender sends without receiving
* acknowledgment from the receiver.
*/
ACKNOWLEDGEMENT(0x03),
/**
* Protocol control message 4
* The client or the server sends this message to notify the peer about
* the user control events. This message carries Event type and Event
* data.
* Also known as a PING message in some RTMP implementations.
*/
USER_CONTROL_MESSAGE(0x04),
/**
* Protocol control message 5
* The client or the server sends this message to inform the peer which
* window size to use when sending acknowledgment.
* Also known as ServerBW ("server bandwidth") in some RTMP implementations.
*/
WINDOW_ACKNOWLEDGEMENT_SIZE(0x05),
/**
* Protocol control message 6
* The client or the server sends this message to update the output
* bandwidth of the peer. The output bandwidth value is the same as the
* window size for the peer.
* Also known as ClientBW ("client bandwidth") in some RTMP implementations.
*/
SET_PEER_BANDWIDTH(0x06),
/**
* RTMP audio packet (0x08)
* The client or the server sends this message to send audio data to the peer.
*/
AUDIO(0x08),
/**
* RTMP video packet (0x09)
* The client or the server sends this message to send video data to the peer.
*/
VIDEO(0x09),
/**
* RTMP message type 0x0F
* The client or the server sends this message to send Metadata or any
* user data to the peer. Metadata includes details about the data (audio, video etc.)
* like creation time, duration, theme and so on.
* This is the AMF3-encoded version.
*/
DATA_AMF3(0x0F),
/**
* RTMP message type 0x10
* A shared object is a Flash object (a collection of name value pairs)
* that are in synchronization across multiple clients, instances, and
* so on.
* This is the AMF3 version: kMsgContainerEx=16 for AMF3.
*/
SHARED_OBJECT_AMF3(0x10),
/**
* RTMP message type 0x11
* Command messages carry the AMF-encoded commands between the client
* and the server.
* A command message consists of command name, transaction ID, and command object that
* contains related parameters.
* This is the AMF3-encoded version.
*/
COMMAND_AMF3(0x11),
/**
* RTMP message type 0x12
* The client or the server sends this message to send Metadata or any
* user data to the peer. Metadata includes details about the data (audio, video etc.)
* like creation time, duration, theme and so on.
* This is the AMF0-encoded version.
*/
DATA_AMF0(0x12),
/**
* RTMP message type 0x14
* Command messages carry the AMF-encoded commands between the client
* and the server.
* A command message consists of command name, transaction ID, and command object that
* contains related parameters.
* This is the common AMF0 version, also known as INVOKE in some RTMP implementations.
*/
COMMAND_AMF0(0x14),
/**
* RTMP message type 0x13
* A shared object is a Flash object (a collection of name value pairs)
* that are in synchronization across multiple clients, instances, and
* so on.
* This is the AMF0 version: kMsgContainer=19 for AMF0.
*/
SHARED_OBJECT_AMF0(0x13),
/**
* RTMP message type 0x16
* An aggregate message is a single message that contains a list of sub-messages.
*/
AGGREGATE_MESSAGE(0x16);
private byte value;
private static final Map<Byte, MessageType> quickLookupMap = new HashMap<Byte, MessageType>();
static {
for (MessageType messageTypId : MessageType.values()) {
quickLookupMap.put(messageTypId.getValue(), messageTypId);
}
}
private MessageType(int value) {
this.value = (byte) value;
}
/** Returns the value of this chunk type */
public byte getValue() {
return value;
}
public static MessageType valueOf(byte messageTypeId) {
if (quickLookupMap.containsKey(messageTypeId)) {
return quickLookupMap.get(messageTypeId);
} else {
throw new IllegalArgumentException("Unknown message type byte: " + Util.toHexString(messageTypeId));
}
}
}
public static enum ChunkType {
/** Full 12-byte RTMP chunk header */
TYPE_0_FULL(0x00, 12),
/** Relative 8-byte RTMP chunk header (message stream ID is not included) */
TYPE_1_RELATIVE_LARGE(0x01, 8),
/** Relative 4-byte RTMP chunk header (only timestamp delta) */
TYPE_2_RELATIVE_TIMESTAMP_ONLY(0x02, 4),
/** Relative 1-byte RTMP chunk header (no "real" header, just the 1-byte indicating chunk header type & chunk stream ID) */
TYPE_3_RELATIVE_SINGLE_BYTE(0x03, 1);
/** The byte value of this chunk header type */
private byte value;
/** The full size (in bytes) of this RTMP header (including the basic header byte) */
private int size;
private static final Map<Byte, ChunkType> quickLookupMap = new HashMap<Byte, ChunkType>();
static {
for (ChunkType messageTypId : ChunkType.values()) {
quickLookupMap.put(messageTypId.getValue(), messageTypId);
}
}
private ChunkType(int byteValue, int fullHeaderSize) {
this.value = (byte) byteValue;
this.size = fullHeaderSize;
}
/** Returns the byte value of this chunk header type */
public byte getValue() {
return value;
}
public int getSize() {
return size;
}
public static ChunkType valueOf(byte chunkHeaderType) {
if (quickLookupMap.containsKey(chunkHeaderType)) {
return quickLookupMap.get(chunkHeaderType);
} else {
throw new IllegalArgumentException("Unknown chunk header type byte: " + Util.toHexString(chunkHeaderType));
}
}
}
private ChunkType chunkType;
private int chunkStreamId;
private int absoluteTimestamp;
private int timestampDelta = -1;
private int packetLength;
private MessageType messageType;
private int messageStreamId;
public RtmpHeader() {
}
public RtmpHeader(ChunkType chunkType, int chunkStreamId, MessageType messageType) {
this.chunkType = chunkType;
this.chunkStreamId = chunkStreamId;
this.messageType = messageType;
}
public static RtmpHeader readHeader(InputStream in, RtmpSessionInfo rtmpSessionInfo) throws IOException {
RtmpHeader rtmpHeader = new RtmpHeader();
rtmpHeader.readHeaderImpl(in, rtmpSessionInfo);
return rtmpHeader;
}
private void readHeaderImpl(InputStream in, RtmpSessionInfo rtmpSessionInfo) throws IOException {
int basicHeaderByte = in.read();
if (basicHeaderByte == -1) {
throw new EOFException("Unexpected EOF while reading RTMP packet basic header");
}
// Read byte 0: chunk type and chunk stream ID
parseBasicHeader((byte) basicHeaderByte);
switch (chunkType) {
case TYPE_0_FULL: { // b00 = 12 byte header (full header)
// Read bytes 1-3: Absolute timestamp
absoluteTimestamp = Util.readUnsignedInt24(in);
timestampDelta = 0;
// Read bytes 4-6: Packet length
packetLength = Util.readUnsignedInt24(in);
// Read byte 7: Message type ID
messageType = MessageType.valueOf((byte) in.read());
// Read bytes 8-11: Message stream ID (apparently little-endian order)
byte[] messageStreamIdBytes = new byte[4];
Util.readBytesUntilFull(in, messageStreamIdBytes);
messageStreamId = Util.toUnsignedInt32LittleEndian(messageStreamIdBytes);
break;
}
case TYPE_1_RELATIVE_LARGE: { // b01 = 8 bytes - like type 0. not including message stream ID (4 last bytes)
// Read bytes 1-3: Timestamp delta
timestampDelta = Util.readUnsignedInt24(in);
// Read bytes 4-6: Packet length
packetLength = Util.readUnsignedInt24(in);
// Read byte 7: Message type ID
messageType = MessageType.valueOf((byte) in.read());
RtmpHeader prevHeader = rtmpSessionInfo.getChunkStreamInfo(chunkStreamId).prevHeaderRx();
try {
messageStreamId = prevHeader.messageStreamId;
absoluteTimestamp = prevHeader.absoluteTimestamp + timestampDelta;
} catch (NullPointerException ex) {
messageStreamId = 0;
absoluteTimestamp = timestampDelta;
}
break;
}
case TYPE_2_RELATIVE_TIMESTAMP_ONLY: { // b10 = 4 bytes - Basic Header and timestamp (3 bytes) are included
// Read bytes 1-3: Timestamp delta
timestampDelta = Util.readUnsignedInt24(in);
RtmpHeader prevHeader = rtmpSessionInfo.getChunkStreamInfo(chunkStreamId).prevHeaderRx();
packetLength = prevHeader.packetLength;
messageType = prevHeader.messageType;
messageStreamId = prevHeader.messageStreamId;
absoluteTimestamp = prevHeader.absoluteTimestamp + timestampDelta;
break;
}
case TYPE_3_RELATIVE_SINGLE_BYTE: { // b11 = 1 byte: basic header only
RtmpHeader prevHeader = rtmpSessionInfo.getChunkStreamInfo(chunkStreamId).prevHeaderRx();
timestampDelta = prevHeader.timestampDelta;
absoluteTimestamp = prevHeader.absoluteTimestamp + timestampDelta;
packetLength = prevHeader.packetLength;
messageType = prevHeader.messageType;
messageStreamId = prevHeader.messageStreamId;
break;
}
default:
Log.e(TAG, "readHeaderImpl(): Invalid chunk type; basic header byte was: " + Util.toHexString((byte) basicHeaderByte));
throw new IOException("Invalid chunk type; basic header byte was: " + Util.toHexString((byte) basicHeaderByte));
}
}
public void writeTo(OutputStream out, final ChunkStreamInfo chunkStreamInfo) throws IOException {
// Write basic header byte
out.write(((byte) (chunkType.getValue() << 6) | chunkStreamId));
switch (chunkType) {
case TYPE_0_FULL: { // b00 = 12 byte header (full header)
chunkStreamInfo.markRealAbsoluteTimestampTx();
Util.writeUnsignedInt24(out, absoluteTimestamp);
Util.writeUnsignedInt24(out, packetLength);
out.write(messageType.getValue());
Util.writeUnsignedInt32LittleEndian(out, messageStreamId);
break;
}
case TYPE_1_RELATIVE_LARGE: { // b01 = 8 bytes - like type 0. not including message ID (4 last bytes)
if (timestampDelta == -1) {
timestampDelta = (int) chunkStreamInfo.markRealAbsoluteTimestampTx();
}
absoluteTimestamp = chunkStreamInfo.getPrevHeaderTx().getAbsoluteTimestamp() + timestampDelta;
Util.writeUnsignedInt24(out, timestampDelta);
Util.writeUnsignedInt24(out, packetLength);
out.write(messageType.getValue());
break;
}
case TYPE_2_RELATIVE_TIMESTAMP_ONLY: { // b10 = 4 bytes - Basic Header and timestamp (3 bytes) are included
if (timestampDelta == -1) {
timestampDelta = (int) chunkStreamInfo.markRealAbsoluteTimestampTx();
}
absoluteTimestamp = chunkStreamInfo.getPrevHeaderTx().getAbsoluteTimestamp() + timestampDelta;
Util.writeUnsignedInt24(out, timestampDelta);
break;
}
case TYPE_3_RELATIVE_SINGLE_BYTE: { // b11 = 1 byte: basic header only
if (timestampDelta == -1) {
timestampDelta = (int) chunkStreamInfo.markRealAbsoluteTimestampTx();
}
absoluteTimestamp = chunkStreamInfo.getPrevHeaderTx().getAbsoluteTimestamp() + timestampDelta;
break;
}
default:
throw new IOException("Invalid chunk type: " + chunkType);
}
}
private void parseBasicHeader(byte basicHeaderByte) {
chunkType = ChunkType.valueOf((byte) ((0xff & basicHeaderByte) >>> 6)); // 2 most significant bits define the chunk type
chunkStreamId = basicHeaderByte & 0x3F; // 6 least significant bits define chunk stream ID
}
/** @return the RTMP chunk stream ID (channel ID) for this chunk */
public int getChunkStreamId() {
return chunkStreamId;
}
public ChunkType getChunkType() {
return chunkType;
}
public int getPacketLength() {
return packetLength;
}
public int getMessageStreamId() {
return messageStreamId;
}
public MessageType getMessageType() {
return messageType;
}
public int getAbsoluteTimestamp() {
return absoluteTimestamp;
}
public void setAbsoluteTimestamp(int absoluteTimestamp) {
this.absoluteTimestamp = absoluteTimestamp;
}
public int getTimestampDelta() {
return timestampDelta;
}
public void setTimestampDelta(int timestampDelta) {
this.timestampDelta = timestampDelta;
}
//
// /** Get the timestamp as specified by the server */
// public int getTimestamp() {
// return timestamp;
// }
//
//
// /** Calculate and return the timestamp delta relative to START_TIMESTAMP */
// public int getTimestampDelta() {
// return (int) System.currentTimeMillis() - START_TIMESTAMP;
// }
/** Sets the RTMP chunk stream ID (channel ID) for this chunk */
public void setChunkStreamId(int channelId) {
this.chunkStreamId = channelId;
}
public void setChunkType(ChunkType chunkType) {
this.chunkType = chunkType;
}
public void setMessageStreamId(int messageStreamId) {
this.messageStreamId = messageStreamId;
}
public void setMessageType(MessageType messageType) {
this.messageType = messageType;
}
public void setPacketLength(int packetLength) {
this.packetLength = packetLength;
}
// public void initStartTimeStamp() {
// if (START_TIMESTAMP == -1) {
// START_TIMESTAMP = (int) System.currentTimeMillis();
// }
// timestamp = 0;
// }
public void writeAggregateHeaderByte(OutputStream out) throws IOException {
// Aggregate header 0x11 : 11.. ....
out.write(0xC0 | chunkStreamId);
}
public static void writeAggregateHeaderByte(OutputStream out, int chunkStreamId) throws IOException {
// Aggregate header 0x11 : 11.. ....
out.write(0xC0 | chunkStreamId);
}
}

@ -0,0 +1,48 @@
package net.ossrs.sea.rtmp.packets;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
*
* @author francois
*/
public abstract class RtmpPacket {
protected RtmpHeader header;
public RtmpPacket(RtmpHeader header) {
this.header = header;
}
public RtmpHeader getHeader() {
return header;
}
public abstract void readBody(InputStream in) throws IOException;
protected abstract void writeBody(OutputStream out) throws IOException;
public void writeTo(OutputStream out, final int chunkSize, final ChunkStreamInfo chunkStreamInfo) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writeBody(baos);
byte[] body = baos.toByteArray();
header.setPacketLength(body.length);
// Write header for first chunk
header.writeTo(out, chunkStreamInfo);
int remainingBytes = body.length;
int pos = 0;
while (remainingBytes > chunkSize) {
// Write packet for chunk
out.write(body, pos, chunkSize);
remainingBytes -= chunkSize;
pos += chunkSize;
// Write header for remain chunk
header.writeAggregateHeaderByte(out);
}
out.write(body, pos, remainingBytes);
}
}

@ -0,0 +1,45 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* A "Set chunk size" RTMP message, received on chunk stream ID 2 (control channel)
*
* @author francois
*/
public class SetChunkSize extends RtmpPacket {
private int chunkSize;
public SetChunkSize(RtmpHeader header) {
super(header);
}
public SetChunkSize(int chunkSize) {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_1_RELATIVE_LARGE, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.SET_CHUNK_SIZE));
this.chunkSize = chunkSize;
}
public int getChunkSize() {
return chunkSize;
}
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
@Override
public void readBody(InputStream in) throws IOException {
// Value is received in the 4 bytes of the body
chunkSize = Util.readUnsignedInt32(in);
}
@Override
protected void writeBody(OutputStream out) throws IOException {
Util.writeUnsignedInt32(out, chunkSize);
}
}

@ -0,0 +1,104 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* Set Peer Bandwidth
*
* Also known as ClientrBW ("client bandwidth") in some RTMP implementations.
*
* @author francois
*/
public class SetPeerBandwidth extends RtmpPacket {
/**
* Bandwidth limiting type
*/
public static enum LimitType {
/**
* In a hard (0) request, the peer must send the data in the provided bandwidth.
*/
HARD(0),
/**
* In a soft (1) request, the bandwidth is at the discretion of the peer
* and the sender can limit the bandwidth.
*/
SOFT(1),
/**
* In a dynamic (2) request, the bandwidth can be hard or soft.
*/
DYNAMIC(2);
private int intValue;
private static final Map<Integer, LimitType> quickLookupMap = new HashMap<Integer, LimitType>();
static {
for (LimitType type : LimitType.values()) {
quickLookupMap.put(type.getIntValue(), type);
}
}
private LimitType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static LimitType valueOf(int intValue) {
return quickLookupMap.get(intValue);
}
}
private int acknowledgementWindowSize;
private LimitType limitType;
public SetPeerBandwidth(RtmpHeader header) {
super(header);
}
public SetPeerBandwidth(int acknowledgementWindowSize, LimitType limitType, ChunkStreamInfo channelInfo) {
super(new RtmpHeader(channelInfo.canReusePrevHeaderTx(RtmpHeader.MessageType.SET_PEER_BANDWIDTH) ? RtmpHeader.ChunkType.TYPE_2_RELATIVE_TIMESTAMP_ONLY : RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.WINDOW_ACKNOWLEDGEMENT_SIZE));
this.acknowledgementWindowSize = acknowledgementWindowSize;
this.limitType = limitType;
}
public int getAcknowledgementWindowSize() {
return acknowledgementWindowSize;
}
public void setAcknowledgementWindowSize(int acknowledgementWindowSize) {
this.acknowledgementWindowSize = acknowledgementWindowSize;
}
public LimitType getLimitType() {
return limitType;
}
public void setLimitType(LimitType limitType) {
this.limitType = limitType;
}
@Override
public void readBody(InputStream in) throws IOException {
acknowledgementWindowSize = Util.readUnsignedInt32(in);
limitType = LimitType.valueOf(in.read());
}
@Override
protected void writeBody(OutputStream out) throws IOException {
Util.writeUnsignedInt32(out, acknowledgementWindowSize);
out.write(limitType.getIntValue());
}
@Override
public String toString() {
return "RTMP Set Peer Bandwidth";
}
}

@ -0,0 +1,241 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* User Control message, such as ping
*
* @author francois
*/
public class UserControl extends RtmpPacket {
/**
* Control message type
* Docstring adapted from the official Adobe RTMP spec, section 3.7
*/
public static enum Type {
/**
* Type: 0
* The server sends this event to notify the client that a stream has become
* functional and can be used for communication. By default, this event
* is sent on ID 0 after the application connect command is successfully
* received from the client.
*
* Event Data:
* eventData[0] (int) the stream ID of the stream that became functional
*/
STREAM_BEGIN(0),
/**
* Type: 1
* The server sends this event to notify the client that the playback of
* data is over as requested on this stream. No more data is sent without
* issuing additional commands. The client discards the messages received
* for the stream.
*
* Event Data:
* eventData[0]: the ID of thestream on which playback has ended.
*/
STREAM_EOF(1),
/**
* Type: 2
* The server sends this event to notify the client that there is no
* more data on the stream. If the server does not detect any message for
* a time period, it can notify the subscribed clients that the stream is
* dry.
*
* Event Data:
* eventData[0]: the stream ID of the dry stream.
*/
STREAM_DRY(2),
/**
* Type: 3
* The client sends this event to inform the server of the buffer size
* (in milliseconds) that is used to buffer any data coming over a stream.
* This event is sent before the server starts processing the stream.
*
* Event Data:
* eventData[0]: the stream ID and
* eventData[1]: the buffer length, in milliseconds.
*/
SET_BUFFER_LENGTH(3),
/**
* Type: 4
* The server sends this event to notify the client that the stream is a
* recorded stream.
*
* Event Data:
* eventData[0]: the stream ID of the recorded stream.
*/
STREAM_IS_RECORDED(4),
/**
* Type: 6
* The server sends this event to test whether the client is reachable.
*
* Event Data:
* eventData[0]: a timestamp representing the local server time when the server dispatched the command.
*
* The client responds with PING_RESPONSE on receiving PING_REQUEST.
*/
PING_REQUEST(6),
/**
* Type: 7
* The client sends this event to the server in response to the ping request.
*
* Event Data:
* eventData[0]: the 4-byte timestamp which was received with the PING_REQUEST.
*/
PONG_REPLY(7),
/**
* Type: 31 (0x1F)
*
* This user control type is not specified in any official documentation, but
* is sent by Flash Media Server 3.5. Thanks to the rtmpdump devs for their
* explanation:
*
* Buffer Empty (unofficial name): After the server has sent a complete buffer, and
* sends this Buffer Empty message, it will wait until the play
* duration of that buffer has passed before sending a new buffer.
* The Buffer Ready message will be sent when the new buffer starts.
*
* (see also: http://repo.or.cz/w/rtmpdump.git/blob/8880d1456b282ee79979adbe7b6a6eb8ad371081:/librtmp/rtmp.c#l2787)
*/
BUFFER_EMPTY(31),
/**
* Type: 32 (0x20)
*
* This user control type is not specified in any official documentation, but
* is sent by Flash Media Server 3.5. Thanks to the rtmpdump devs for their
* explanation:
*
* Buffer Ready (unofficial name): After the server has sent a complete buffer, and
* sends a Buffer Empty message, it will wait until the play
* duration of that buffer has passed before sending a new buffer.
* The Buffer Ready message will be sent when the new buffer starts.
* (There is no BufferReady message for the very first buffer;
* presumably the Stream Begin message is sufficient for that
* purpose.)
*
* (see also: http://repo.or.cz/w/rtmpdump.git/blob/8880d1456b282ee79979adbe7b6a6eb8ad371081:/librtmp/rtmp.c#l2787)
*/
BUFFER_READY(32);
private int intValue;
private static final Map<Integer, Type> quickLookupMap = new HashMap<Integer, Type>();
static {
for (Type type : Type.values()) {
quickLookupMap.put(type.getIntValue(), type);
}
}
private Type(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return intValue;
}
public static Type valueOf(int intValue) {
return quickLookupMap.get(intValue);
}
}
private Type type;
private int[] eventData;
public UserControl(RtmpHeader header) {
super(header);
}
public UserControl(ChunkStreamInfo channelInfo) {
super(new RtmpHeader(channelInfo.canReusePrevHeaderTx(RtmpHeader.MessageType.USER_CONTROL_MESSAGE) ? RtmpHeader.ChunkType.TYPE_2_RELATIVE_TIMESTAMP_ONLY : RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.USER_CONTROL_MESSAGE));
}
/** Convenience construtor that creates a "pong" message for the specified ping */
public UserControl(UserControl replyToPing, ChunkStreamInfo channelInfo) {
this(Type.PONG_REPLY, channelInfo);
this.eventData = replyToPing.eventData;
}
public UserControl(Type type, ChunkStreamInfo channelInfo) {
this(channelInfo);
this.type = type;
}
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
/**
* Convenience method for getting the first event data item, as most user control
* message types only have one event data item anyway
* This is equivalent to calling <code>getEventData()[0]</code>
*/
public int getFirstEventData() {
return eventData[0];
}
public int[] getEventData() {
return eventData;
}
/** Used to set (a single) event data for most user control message types */
public void setEventData(int eventData) {
if (type == Type.SET_BUFFER_LENGTH) {
throw new IllegalStateException("SET_BUFFER_LENGTH requires two event data values; use setEventData(int, int) instead");
}
this.eventData = new int[]{eventData};
}
/** Used to set event data for the SET_BUFFER_LENGTH user control message types */
public void setEventData(int streamId, int bufferLength) {
if (type != Type.SET_BUFFER_LENGTH) {
throw new IllegalStateException("User control type " + type + " requires only one event data value; use setEventData(int) instead");
}
this.eventData = new int[]{streamId, bufferLength};
}
@Override
public void readBody(InputStream in) throws IOException {
// Bytes 0-1: first parameter: ping type (mandatory)
type = Type.valueOf(Util.readUnsignedInt16(in));
int bytesRead = 2;
// Event data (1 for most types, 2 for SET_BUFFER_LENGTH)
if (type == Type.SET_BUFFER_LENGTH) {
setEventData(Util.readUnsignedInt32(in), Util.readUnsignedInt32(in));
bytesRead += 8;
} else {
setEventData(Util.readUnsignedInt32(in));
bytesRead += 4;
}
// To ensure some strange non-specified UserControl/ping message does not slip through
assert header.getPacketLength() == bytesRead;
}
@Override
protected void writeBody(OutputStream out) throws IOException {
// Write the user control message type
Util.writeUnsignedInt16(out, type.getIntValue());
// Now write the event data
Util.writeUnsignedInt32(out, eventData[0]);
if (type == Type.SET_BUFFER_LENGTH) {
Util.writeUnsignedInt32(out, eventData[1]);
}
}
@Override
public String toString() {
return "RTMP User Control (type: " + type + ", event data: " + eventData + ")";
}
}

@ -0,0 +1,77 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import net.ossrs.sea.rtmp.amf.AmfBoolean;
import net.ossrs.sea.rtmp.amf.AmfData;
import net.ossrs.sea.rtmp.amf.AmfDecoder;
import net.ossrs.sea.rtmp.amf.AmfNull;
import net.ossrs.sea.rtmp.amf.AmfNumber;
import net.ossrs.sea.rtmp.amf.AmfString;
/**
* RTMP packet with a "variable" body structure (i.e. the structure of the
* body depends on some other state/parameter in the packet.
*
* Examples of this type of packet are Command and Data; this abstract class
* exists mostly for code re-use.
*
* @author francois
*/
public abstract class VariableBodyRtmpPacket extends RtmpPacket {
protected List<AmfData> data;
public VariableBodyRtmpPacket(RtmpHeader header) {
super(header);
}
public List<AmfData> getData() {
return data;
}
public void addData(String string) {
addData(new AmfString(string));
}
public void addData(double number) {
addData(new AmfNumber(number));
}
public void addData(boolean bool) {
addData(new AmfBoolean(bool));
}
public void addData(AmfData dataItem) {
if (data == null) {
this.data = new ArrayList<AmfData>();
}
if (dataItem == null) {
dataItem = new AmfNull();
}
this.data.add(dataItem);
}
protected void readVariableData(final InputStream in, int bytesAlreadyRead) throws IOException {
// ...now read in arguments (if any)
do {
AmfData dataItem = AmfDecoder.readFrom(in);
addData(dataItem);
bytesAlreadyRead += dataItem.getSize();
} while (bytesAlreadyRead < header.getPacketLength());
}
protected void writeVariableData(final OutputStream out) throws IOException {
if (data != null) {
for (AmfData dataItem : data) {
dataItem.writeTo(out);
}
} else {
// Write a null
AmfNull.writeNullTo(out);
}
}
}

@ -0,0 +1,24 @@
package net.ossrs.sea.rtmp.packets;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* Video data packet
*
* @author francois
*/
public class Video extends ContentData {
public Video(RtmpHeader header) {
super(header);
}
public Video() {
super(new RtmpHeader(RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_VIDEO_CHANNEL, RtmpHeader.MessageType.VIDEO));
}
@Override
public String toString() {
return "RTMP Video";
}
}

@ -0,0 +1,52 @@
package net.ossrs.sea.rtmp.packets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import net.ossrs.sea.rtmp.Util;
import net.ossrs.sea.rtmp.io.ChunkStreamInfo;
/**
* Window Acknowledgement Size
*
* Also known as ServerBW ("Server bandwidth") in some RTMP implementations.
*
* @author francois
*/
public class WindowAckSize extends RtmpPacket {
private int acknowledgementWindowSize;
public WindowAckSize(RtmpHeader header) {
super(header);
}
public WindowAckSize(int acknowledgementWindowSize, ChunkStreamInfo channelInfo) {
super(new RtmpHeader(channelInfo.canReusePrevHeaderTx(RtmpHeader.MessageType.WINDOW_ACKNOWLEDGEMENT_SIZE) ? RtmpHeader.ChunkType.TYPE_2_RELATIVE_TIMESTAMP_ONLY : RtmpHeader.ChunkType.TYPE_0_FULL, ChunkStreamInfo.RTMP_CONTROL_CHANNEL, RtmpHeader.MessageType.WINDOW_ACKNOWLEDGEMENT_SIZE));
this.acknowledgementWindowSize = acknowledgementWindowSize;
}
public int getAcknowledgementWindowSize() {
return acknowledgementWindowSize;
}
public void setAcknowledgementWindowSize(int acknowledgementWindowSize) {
this.acknowledgementWindowSize = acknowledgementWindowSize;
}
@Override
public void readBody(InputStream in) throws IOException {
acknowledgementWindowSize = Util.readUnsignedInt32(in);
}
@Override
protected void writeBody(OutputStream out) throws IOException {
Util.writeUnsignedInt32(out, acknowledgementWindowSize);
}
@Override
public String toString() {
return "RTMP Window Acknowledgment Size";
}
}

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:paddingBottom="@dimen/activity_vertical_margin"
android:paddingLeft="@dimen/activity_horizontal_margin"
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingTop="@dimen/activity_vertical_margin"
tools:context="net.ossrs.sea.MainActivity">
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="publish"
android:id="@+id/publish"
android:layout_alignParentTop="true" />
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="stop"
android:id="@+id/stop"
android:layout_toRightOf="@id/publish"
android:layout_marginTop="0dp" />
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="switch"
android:id="@+id/swCam"
android:layout_alignBottom="@id/stop"
android:layout_toRightOf="@id/stop" />
<Button
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="rotate"
android:id="@+id/rotate"
android:layout_above="@+id/url"
android:layout_toRightOf="@id/swCam" />
<EditText
android:id="@+id/vbitrate"
android:textSize="14dp"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_below="@id/publish"
android:layout_marginTop="0dp" />
<EditText
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:textSize="14dp"
android:id="@+id/url"
android:layout_below="@id/publish"
android:layout_above="@+id/frameLayout"
android:layout_toRightOf="@id/vbitrate" />
<FrameLayout
android:layout_width="match_parent"
android:layout_height="match_parent"
android:layout_below="@id/vbitrate"
android:layout_centerHorizontal="true"
android:layout_marginTop="0dp"
android:id="@+id/frameLayout">
<SurfaceView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:id="@+id/preview" />
</FrameLayout>
</RelativeLayout>

@ -0,0 +1,6 @@
<menu xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools" tools:context="net.ossrs.sea.MainActivity">
<item android:id="@+id/action_settings" android:title="action_settings"
android:orderInCategory="100" app:showAsAction="never" />
</menu>

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.7 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

@ -0,0 +1,6 @@
<resources>
<!-- Example customization of dimensions originally defined in res/values/dimens.xml
(such as screen margins) for screens with more than 820dp of available width. This
would include 7" and 10" devices in landscape (~960dp and ~1280dp respectively). -->
<dimen name="activity_horizontal_margin">64dp</dimen>
</resources>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<color name="colorPrimary">#3F51B5</color>
<color name="colorPrimaryDark">#303F9F</color>
<color name="colorAccent">#FF4081</color>
</resources>

@ -0,0 +1,5 @@
<resources>
<!-- Default screen margins, per the Android Design guidelines. -->
<dimen name="activity_horizontal_margin">16dp</dimen>
<dimen name="activity_vertical_margin">16dp</dimen>
</resources>

@ -0,0 +1,3 @@
<resources>
<string name="app_name">Sea</string>
</resources>

@ -0,0 +1,11 @@
<resources>
<!-- Base application theme. -->
<style name="AppTheme" parent="Theme.AppCompat.Light.DarkActionBar">
<!-- Customize your theme here. -->
<item name="colorPrimary">@color/colorPrimary</item>
<item name="colorPrimaryDark">@color/colorPrimaryDark</item>
<item name="colorAccent">@color/colorAccent</item>
</style>
</resources>

@ -0,0 +1,15 @@
package net.ossrs.sea;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* To work on unit tests, switch the Test Artifact in the Build Variants view.
*/
public class ExampleUnitTest {
@Test
public void addition_isCorrect() throws Exception {
assertEquals(4, 2 + 2);
}
}

@ -0,0 +1,23 @@
// Top-level build file where you can add configuration options common to all sub-projects/modules.
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.0.0-rc1'
// NOTE: Do not place your application dependencies here; they belong
// in the individual module build.gradle files
}
}
allprojects {
repositories {
jcenter()
}
}
task clean(type: Delete) {
delete rootProject.buildDir
}

@ -0,0 +1 @@
include ':app'
Loading…
Cancel
Save