@ -3,9 +3,8 @@ package net.ossrs.sea.rtmp.io;
import java.io.IOException ;
import java.io.OutputStream ;
import java.net.SocketException ;
import android.os.Handler ;
import android.os.Looper ;
import android.os.Message ;
import java.util.Arrays ;
import java.util.concurrent.ConcurrentLinkedQueue ;
import android.util.Log ;
import net.ossrs.sea.rtmp.packets.Command ;
import net.ossrs.sea.rtmp.packets.RtmpPacket ;
@ -21,9 +20,10 @@ public class WriteThread extends Thread {
private RtmpSessionInfo rtmpSessionInfo ;
private OutputStream out ;
private Handler handler ;
private ConcurrentLinkedQueue < RtmpPacket > writeQueue = new ConcurrentLinkedQueue < RtmpPacket > ( ) ;
private final Object txPacketLock = new Object ( ) ;
private volatile boolean active = true ;
private ThreadController threadController ;
private Thread t ;
public WriteThread ( RtmpSessionInfo rtmpSessionInfo , OutputStream out , ThreadController threadController ) {
super ( "RtmpWriteThread" ) ;
@ -34,31 +34,43 @@ public class WriteThread extends Thread {
@Override
public void run ( ) {
t = this ;
Looper . prepare ( ) ;
handler = new Handler ( Looper . myLooper ( ) ) {
@Override
public void handleMessage ( Message msg ) {
try {
RtmpPacket rtmpPacket = ( RtmpPacket ) msg . obj ;
ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo . getChunkStreamInfo ( rtmpPacket . getHeader ( ) . getChunkStreamId ( ) ) ;
while ( active ) {
try {
while ( ! writeQueue . isEmpty ( ) ) {
RtmpPacket rtmpPacket = writeQueue . poll ( ) ;
final ChunkStreamInfo chunkStreamInfo = rtmpSessionInfo . getChunkStreamInfo ( rtmpPacket . getHeader ( ) . getChunkStreamId ( ) ) ;
chunkStreamInfo . setPrevHeaderTx ( rtmpPacket . getHeader ( ) ) ;
rtmpPacket . writeTo ( out , rtmpSessionInfo . getTxChunkSize ( ) , chunkStreamInfo ) ;
out . flush ( ) ;
Log . d ( TAG , "WriteThread: wrote packet: " + rtmpPacket + ", size: " + rtmpPacket . getHeader ( ) . getPacketLength ( ) ) ;
if ( rtmpPacket instanceof Command ) {
rtmpSessionInfo . addInvokedCommand ( ( ( Command ) rtmpPacket ) . getTransactionId ( ) , ( ( Command ) rtmpPacket ) . getCommandName ( ) ) ;
}
} catch ( SocketException se ) {
Log . e ( TAG , "WriteThread: Caught SocketException during write loop, shutting down" , se ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( t , se ) ;
} catch ( IOException ex ) {
Log . e ( TAG , "WriteThread: Caught IOException during write loop, shutting down" , ex ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( t , ex ) ;
}
out . flush ( ) ;
} catch ( SocketException se ) {
Log . e ( TAG , "WriteThread: Caught SocketException during write loop, shutting down" , se ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( this , se ) ;
active = false ;
continue ;
} catch ( IOException ioe ) {
Log . e ( TAG , "WriteThread: Caught IOException during write loop, shutting down" , ioe ) ;
Thread . getDefaultUncaughtExceptionHandler ( ) . uncaughtException ( this , ioe ) ;
active = false ;
continue ;
}
} ;
Looper . loop ( ) ;
// Waiting for next packet
Log . d ( TAG , "WriteThread: waiting..." ) ;
synchronized ( txPacketLock ) {
try {
// isEmpty() may take some time, so time out should be set to wait next offer
txPacketLock . wait ( 1000 ) ;
} catch ( InterruptedException ex ) {
Log . w ( TAG , "Interrupted" , ex ) ;
}
}
}
// Close outputstream
try {
@ -75,14 +87,26 @@ public class WriteThread extends Thread {
/** Transmit the specified RTMP packet (thread-safe) */
public void send ( RtmpPacket rtmpPacket ) {
if ( rtmpPacket ! = null ) {
Message msg = Message . obtain ( ) ;
msg . obj = rtmpPacket ;
handler . sendMessage ( msg ) ;
writeQueue . offer ( rtmpPacket ) ;
}
synchronized ( txPacketLock ) {
txPacketLock . notify ( ) ;
}
}
/** Transmit the specified RTMP packet (thread-safe) */
public void send ( RtmpPacket . . . rtmpPackets ) {
writeQueue . addAll ( Arrays . asList ( rtmpPackets ) ) ;
synchronized ( txPacketLock ) {
txPacketLock . notify ( ) ;
}
}
public void shutdown ( ) {
Log . d ( TAG , "Stopping write thread..." ) ;
Looper . myLooper ( ) . quit ( ) ;
active = false ;
synchronized ( txPacketLock ) {
txPacketLock . notify ( ) ;
}
}
}