@ -2,6 +2,7 @@ package net.ossrs.yasea.rtmp.io;
import java.io.BufferedInputStream ;
import java.io.BufferedOutputStream ;
import java.io.EOFException ;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.OutputStream ;
@ -30,6 +31,7 @@ import net.ossrs.yasea.rtmp.packets.Data;
import net.ossrs.yasea.rtmp.packets.Handshake ;
import net.ossrs.yasea.rtmp.packets.Command ;
import net.ossrs.yasea.rtmp.packets.Audio ;
import net.ossrs.yasea.rtmp.packets.SetPeerBandwidth ;
import net.ossrs.yasea.rtmp.packets.Video ;
import net.ossrs.yasea.rtmp.packets.UserControl ;
import net.ossrs.yasea.rtmp.packets.RtmpPacket ;
@ -46,24 +48,6 @@ public class RtmpConnection implements RtmpPublisher {
private static final Pattern rtmpUrlPattern = Pattern . compile ( "^rtmp://([^/:]+)(:(\\d+))*/([^/]+)(/(.*))*$" ) ;
private RtmpPublisher . EventHandler mHandler ;
private PacketRxHandler rxHandler = new PacketRxHandler ( ) {
@Override
public void handleRxPacket ( RtmpPacket rtmpPacket ) {
if ( rtmpPacket ! = null ) {
rxPacketQueue . add ( rtmpPacket ) ;
}
synchronized ( rxPacketLock ) {
rxPacketLock . notify ( ) ;
}
}
@Override
public void notifyWindowAckRequired ( final int numBytesReadThusFar ) {
Log . i ( TAG , "notifyWindowAckRequired() called" ) ;
// Create and send window bytes read acknowledgement
sendRtmpPacket ( new Acknowledgement ( numBytesReadThusFar ) ) ;
}
} ;
private String appName ;
private String streamName ;
private String publishType ;
@ -71,20 +55,18 @@ public class RtmpConnection implements RtmpPublisher {
private String tcUrl ;
private String pageUrl ;
private Socket socket ;
private RtmpSessionInfo rtmpSessionInfo ;
private RtmpSessionInfo rtmpSessionInfo = new RtmpSessionInfo ( ) ;
private RtmpDecoder rtmpDecoder = new RtmpDecoder ( rtmpSessionInfo ) ;
private BufferedInputStream inputStream ;
private BufferedOutputStream outputStream ;
private ReadThread readThread ;
private final ConcurrentLinkedQueue < RtmpPacket > rxPacketQueue = new ConcurrentLinkedQueue < > ( ) ;
private final Object rxPacketLock = new Object ( ) ;
private volatile boolean active = false ;
private Thread rxPacketHandler ;
private volatile boolean connecting = false ;
private volatile boolean fullyConnected = false ;
private volatile boolean publishPermitted = false ;
private final Object connectingLock = new Object ( ) ;
private final Object publishLock = new Object ( ) ;
private AtomicInteger videoFrameCacheNumber = new AtomicInteger ( 0 ) ;
private int currentStreamId = - 1 ;
private int currentStreamId = 0 ;
private int transactionIdCounter = 0 ;
private AmfString serverIpAddr ;
private AmfNumber serverPid ;
@ -117,8 +99,8 @@ public class RtmpConnection implements RtmpPublisher {
Matcher matcher = rtmpUrlPattern . matcher ( url ) ;
if ( matcher . matches ( ) ) {
tcUrl = url . substring ( 0 , url . lastIndexOf ( '/' ) ) ;
swfUrl = "" ;
pageUrl = "" ;
swfUrl = "" ;
pageUrl = "" ;
host = matcher . group ( 1 ) ;
String portStr = matcher . group ( 3 ) ;
port = portStr ! = null ? Integer . parseInt ( portStr ) : 1935 ;
@ -137,14 +119,10 @@ public class RtmpConnection implements RtmpPublisher {
outputStream = new BufferedOutputStream ( socket . getOutputStream ( ) ) ;
Log . d ( TAG , "connect(): socket connection established, doing handhake..." ) ;
handshake ( inputStream , outputStream ) ;
active = true ;
Log . d ( TAG , "connect(): handshake done" ) ;
rtmpSessionInfo = new RtmpSessionInfo ( ) ;
readThread = new ReadThread ( rtmpSessionInfo , inputStream , rxHandler ) ;
readThread . start ( ) ;
// Start the "main" handling thread
new Thread ( new Runnable ( ) {
rxPacketHandler = new Thread ( new Runnable ( ) {
@Override
public void run ( ) {
@ -155,7 +133,8 @@ public class RtmpConnection implements RtmpPublisher {
Logger . getLogger ( RtmpConnection . class . getName ( ) ) . log ( Level . SEVERE , null , ex ) ;
}
}
} ) . start ( ) ;
} ) ;
rxPacketHandler . start ( ) ;
rtmpConnect ( ) ;
}
@ -211,7 +190,7 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId ! = - 1 ) {
if ( currentStreamId ! = 0 ) {
throw new IllegalStateException ( "Current stream object has existed" ) ;
}
@ -252,7 +231,7 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId = = - 1 ) {
if ( currentStreamId = = 0 ) {
throw new IllegalStateException ( "No current stream object exists" ) ;
}
@ -271,7 +250,7 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId = = - 1 ) {
if ( currentStreamId = = 0 ) {
throw new IllegalStateException ( "No current stream object exists" ) ;
}
@ -299,13 +278,13 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId = = - 1 ) {
if ( currentStreamId = = 0 ) {
throw new IllegalStateException ( "No current stream object exists" ) ;
}
if ( ! publishPermitted ) {
throw new IllegalStateException ( "Not get the _result(Netstream.Publish.Start)" ) ;
}
Log . d ( TAG , "closeStream(): setting current stream ID to -1 ") ;
Log . d ( TAG , "closeStream(): setting current stream ID to 0 ") ;
Command closeStream = new Command ( "closeStream" , 0 ) ;
closeStream . getHeader ( ) . setChunkStreamId ( ChunkStreamInfo . RTMP_STREAM_CHANNEL ) ;
closeStream . getHeader ( ) . setMessageStreamId ( currentStreamId ) ;
@ -316,67 +295,53 @@ public class RtmpConnection implements RtmpPublisher {
@Override
public void shutdown ( ) {
if ( active ) {
readThread . shutdown ( ) ;
try {
// It will raise EOFException in handleRxPacketThread
socket . shutdownInput ( ) ;
// It will raise SocketException in sendRtmpPacket
socket . shutdownOutput ( ) ;
} catch ( IOException ioe ) {
ioe . printStackTrace ( ) ;
}
try {
// It will invoke EOFException in read thread
socket . shutdownInput ( ) ;
// It will invoke SocketException in write thread
socket . shutdownOutput ( ) ;
} catch ( IOException ioe ) {
ioe . printStackTrace ( ) ;
}
// shutdown rxPacketHandler
try {
rxPacketHandler . join ( ) ;
} catch ( InterruptedException ie ) {
rxPacketHandler . interrupt ( ) ;
}
// shutdown socket as well as its input and output stream
if ( socket ! = null ) {
try {
readThread . join ( ) ;
} catch ( InterruptedException ie ) {
ie . printStackTrace ( ) ;
readThread . interrupt ( ) ;
}
// shutdown handleRxPacketLoop
rxPacketQueue . clear ( ) ;
active = false ;
synchronized ( rxPacketLock ) {
rxPacketLock . notify ( ) ;
}
// shutdown socket as well as its input and output stream
if ( socket ! = null ) {
try {
socket . close ( ) ;
Log . d ( TAG , "socket closed" ) ;
} catch ( IOException ex ) {
Log . e ( TAG , "shutdown(): failed to close socket" , ex ) ;
}
socket . close ( ) ;
Log . d ( TAG , "socket closed" ) ;
} catch ( IOException ex ) {
Log . e ( TAG , "shutdown(): failed to close socket" , ex ) ;
}
mHandler . onRtmpDisconnected ( "disconnected" ) ;
}
mHandler . onRtmpDisconnected ( "disconnected" ) ;
reset ( ) ;
}
private void reset ( ) {
active = false ;
connecting = false ;
fullyConnected = false ;
publishPermitted = false ;
tcUrl = null ;
swfUrl = null ;
pageUrl = null ;
swfUrl = null ;
pageUrl = null ;
appName = null ;
streamName = null ;
publishType = null ;
currentStreamId = - 1 ;
currentStreamId = 0 ;
transactionIdCounter = 0 ;
videoFrameCacheNumber . set ( 0 ) ;
socketExceptionCause = "" ;
serverIpAddr = null ;
serverPid = null ;
serverId = null ;
rtmpSessionInfo = null ;
}
@Override
@ -384,7 +349,7 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId = = - 1 ) {
if ( currentStreamId = = 0 ) {
throw new IllegalStateException ( "No current stream object exists" ) ;
}
if ( ! publishPermitted ) {
@ -403,7 +368,7 @@ public class RtmpConnection implements RtmpPublisher {
if ( ! fullyConnected ) {
throw new IllegalStateException ( "Not connected to RTMP server" ) ;
}
if ( currentStreamId = = - 1 ) {
if ( currentStreamId = = 0 ) {
throw new IllegalStateException ( "No current stream object exists" ) ;
}
if ( ! publishPermitted ) {
@ -419,7 +384,9 @@ public class RtmpConnection implements RtmpPublisher {
mHandler . onRtmpVideoStreaming ( "video streaming" ) ;
}
/** Transmit the specified RTMP packet */
/ * *
* Transmit the specified RTMP packet
* /
public void sendRtmpPacket ( RtmpPacket rtmpPacket ) {
try {
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo . getChunkStreamInfo ( rtmpPacket . getHeader ( ) . getChunkStreamId ( ) ) ;
@ -458,66 +425,72 @@ public class RtmpConnection implements RtmpPublisher {
}
}
public interface PacketRxHandler {
public void handleRxPacket ( RtmpPacket rtmpPacket ) ;
public void notifyWindowAckRequired ( final int numBytesReadThusFar ) ;
}
private void handleRxPacketLoop ( ) throws IOException {
// Handle all queued received RTMP packets
while ( active ) {
while ( ! rxPacketQueue . isEmpty ( ) ) {
RtmpPacket rtmpPacket = rxPacketQueue . poll ( ) ;
//Log.d(TAG, "handleRxPacketLoop(): RTMP rx packet message type: " + rtmpPacket.getHeader().getMessageType());
switch ( rtmpPacket . getHeader ( ) . getMessageType ( ) ) {
case ABORT :
rtmpSessionInfo . getChunkStreamInfo ( ( ( Abort ) rtmpPacket ) . getChunkStreamId ( ) ) . clearStoredChunks ( ) ;
break ;
case USER_CONTROL_MESSAGE :
UserControl ping = ( UserControl ) rtmpPacket ;
switch ( ping . getType ( ) ) {
case PING_REQUEST :
ChunkStreamInfo channelInfo = rtmpSessionInfo . getChunkStreamInfo ( ChunkStreamInfo . RTMP_CONTROL_CHANNEL ) ;
Log . d ( TAG , "handleRxPacketLoop(): Sending PONG reply.." ) ;
UserControl pong = new UserControl ( ping , channelInfo ) ;
sendRtmpPacket ( pong ) ;
break ;
case STREAM_EOF :
Log . i ( TAG , "handleRxPacketLoop(): Stream EOF reached, closing RTMP writer..." ) ;
break ;
}
break ;
case WINDOW_ACKNOWLEDGEMENT_SIZE :
WindowAckSize windowAckSize = ( WindowAckSize ) rtmpPacket ;
int size = windowAckSize . getAcknowledgementWindowSize ( ) ;
Log . d ( TAG , "handleRxPacketLoop(): Setting acknowledgement window size: " + size ) ;
rtmpSessionInfo . setAcknowledgmentWindowSize ( size ) ;
// Set socket option
socket . setSendBufferSize ( size ) ;
break ;
case SET_PEER_BANDWIDTH :
int acknowledgementWindowsize = rtmpSessionInfo . getAcknowledgementWindowSize ( ) ;
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo . getChunkStreamInfo ( ChunkStreamInfo . RTMP_CONTROL_CHANNEL ) ;
Log . d ( TAG , "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize ) ;
sendRtmpPacket ( new WindowAckSize ( acknowledgementWindowsize , chunkStreamInfo ) ) ;
break ;
case COMMAND_AMF0 :
handleRxInvoke ( ( Command ) rtmpPacket ) ;
break ;
default :
Log . w ( TAG , "handleRxPacketLoop(): Not handling unimplemented/unknown packet of type: " + rtmpPacket . getHeader ( ) . getMessageType ( ) ) ;
break ;
}
}
// Wait for next received packet
synchronized ( rxPacketLock ) {
try {
rxPacketLock . wait ( 500 ) ;
} catch ( InterruptedException ex ) {
Log . w ( TAG , "handleRxPacketLoop: Interrupted" , ex ) ;
while ( ! Thread . interrupted ( ) ) {
try {
// It will be blocked when no data in input stream buffer
RtmpPacket rtmpPacket = rtmpDecoder . readPacket ( inputStream ) ;
if ( rtmpPacket ! = null ) {
//Log.d(TAG, "handleRxPacketLoop(): RTMP rx packet message type: " + rtmpPacket.getHeader().getMessageType());
switch ( rtmpPacket . getHeader ( ) . getMessageType ( ) ) {
case ABORT :
rtmpSessionInfo . getChunkStreamInfo ( ( ( Abort ) rtmpPacket ) . getChunkStreamId ( ) ) . clearStoredChunks ( ) ;
break ;
case USER_CONTROL_MESSAGE :
UserControl user = ( UserControl ) rtmpPacket ;
switch ( user . getType ( ) ) {
case STREAM_BEGIN :
if ( currentStreamId ! = user . getFirstEventData ( ) ) {
throw new IllegalStateException ( "Current stream ID error!" ) ;
}
break ;
case PING_REQUEST :
ChunkStreamInfo channelInfo = rtmpSessionInfo . getChunkStreamInfo ( ChunkStreamInfo . RTMP_CONTROL_CHANNEL ) ;
Log . d ( TAG , "handleRxPacketLoop(): Sending PONG reply.." ) ;
UserControl pong = new UserControl ( user , channelInfo ) ;
sendRtmpPacket ( pong ) ;
break ;
case STREAM_EOF :
Log . i ( TAG , "handleRxPacketLoop(): Stream EOF reached, closing RTMP writer..." ) ;
break ;
default :
// Ignore...
break ;
}
break ;
case WINDOW_ACKNOWLEDGEMENT_SIZE :
WindowAckSize windowAckSize = ( WindowAckSize ) rtmpPacket ;
int size = windowAckSize . getAcknowledgementWindowSize ( ) ;
Log . d ( TAG , "handleRxPacketLoop(): Setting acknowledgement window size: " + size ) ;
rtmpSessionInfo . setAcknowledgmentWindowSize ( size ) ;
break ;
case SET_PEER_BANDWIDTH :
SetPeerBandwidth bw = ( SetPeerBandwidth ) rtmpPacket ;
rtmpSessionInfo . setAcknowledgmentWindowSize ( bw . getAcknowledgementWindowSize ( ) ) ;
int acknowledgementWindowsize = rtmpSessionInfo . getAcknowledgementWindowSize ( ) ;
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo . getChunkStreamInfo ( ChunkStreamInfo . RTMP_CONTROL_CHANNEL ) ;
Log . d ( TAG , "handleRxPacketLoop(): Send acknowledgement window size: " + acknowledgementWindowsize ) ;
sendRtmpPacket ( new WindowAckSize ( acknowledgementWindowsize , chunkStreamInfo ) ) ;
// Set socket option
socket . setSendBufferSize ( acknowledgementWindowsize ) ;
break ;
case COMMAND_AMF0 :
handleRxInvoke ( ( Command ) rtmpPacket ) ;
break ;
default :
Log . w ( TAG , "handleRxPacketLoop(): Not handling unimplemented/unknown packet of type: " + rtmpPacket . getHeader ( ) . getMessageType ( ) ) ;
break ;
}
}
} catch ( EOFException eof ) {
Thread . currentThread ( ) . interrupt ( ) ;
} catch ( SocketException se ) {
Log . e ( TAG , "Caught SocketException while reading/decoding packet, shutting down: " + se . getMessage ( ) ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( Thread . currentThread ( ) , se ) ;
} catch ( IOException ioe ) {
Log . e ( TAG , "Caught exception while reading/decoding packet, shutting down: " + ioe . getMessage ( ) ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( Thread . currentThread ( ) , ioe ) ;
}
}
}