Fix socket and thread close method

1. Use boolean flag in write thread to avoid InterruptedException while
it is blocked and use interrupted flag in read thread.

2. Do NOT close input stream in read thread and output stream in write
thread when shutting down. socket.close() will handle it automatically.
Otherwise SocketException will be raised in the corresponding thread.

3. Do socket.shutdownInput() before ending read thread and capture
EOFException which may be the only correct way to return the blocking
input stream read method.

Signed-off-by: Leo Ma <begeekmyfriend@gmail.com>
camera2
Leo Ma 9 years ago
parent 33f3195c7a
commit 5f37bd808b

@ -19,30 +19,24 @@ public class ReadThread extends Thread {
private RtmpDecoder rtmpDecoder;
private InputStream in;
private PacketRxHandler packetRxHandler;
private ThreadController threadController;
public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, PacketRxHandler packetRxHandler, ThreadController threadController) {
public ReadThread(RtmpSessionInfo rtmpSessionInfo, InputStream in, PacketRxHandler packetRxHandler) {
super("RtmpReadThread");
this.in = in;
this.packetRxHandler = packetRxHandler;
this.rtmpDecoder = new RtmpDecoder(rtmpSessionInfo);
this.threadController = threadController;
}
@Override
public void run() {
boolean isEof = false;
while (!Thread.interrupted()) {
try {
// It will be blocked when no data in input stream buffer
RtmpPacket rtmpPacket = rtmpDecoder.readPacket(in);
packetRxHandler.handleRxPacket(rtmpPacket);
if (isEof) {
isEof = false;
Thread.sleep(500);
}
} catch (EOFException eof) {
isEof = true;
this.interrupt();
// } catch (WindowAckRequired war) {
// Log.i(TAG, "Window Acknowledgment required, notifying packet handler...");
// packetRxHandler.notifyWindowAckRequired(war.getBytesRead());
@ -51,36 +45,18 @@ public class ReadThread extends Thread {
// packetRxHandler.handleRxPacket(war.getRtmpPacket());
// }
} catch (SocketException se) {
if (!this.isInterrupted()) {
Log.e(TAG, "ReadThread: Caught SocketException while reading/decoding packet, shutting down...", se);
this.interrupt();
}
Log.e(TAG, "ReadThread: Caught SocketException while reading/decoding packet, shutting down...", se);
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, se);
} catch (IOException ioe) {
if (!this.isInterrupted()) {
Log.e(TAG, "ReadThread: Caught exception while reading/decoding packet, shutting down...", ioe);
this.interrupt();
}
Log.e(TAG, "ReadThread: Caught exception while reading/decoding packet, shutting down...", ioe);
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(this, ioe);
} catch (InterruptedException ie) {
ie.printStackTrace();
this.interrupt();
}
}
// Close inputstream
try {
in.close();
} catch (IOException ex) {
Log.w(TAG, "Failed to close inputstream", ex);
}
Log.i(TAG, "exiting");
if (threadController != null) {
threadController.threadHasExited(this);
}
Log.i(TAG, "exit");
}
public void shutdown() {
Log.d(TAG, "Stopping read thread...");
this.interrupt();
Log.d(TAG, "Stopping");
}
}

@ -34,7 +34,7 @@ import net.ossrs.sea.rtmp.packets.WindowAckSize;
*
* @author francois, leoma
*/
public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadController {
public class RtmpConnection implements RtmpPublisher, PacketRxHandler {
private static final String TAG = "RtmpConnection";
private static final Pattern rtmpUrlPattern = Pattern.compile("^rtmp://([^/:]+)(:(\\d+))*/([^/]+)(/(.*))*$");
@ -52,6 +52,7 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
private RtmpSessionInfo rtmpSessionInfo;
private int transactionIdCounter = 0;
private static final int SOCKET_CONNECT_TIMEOUT_MS = 3000;
private ReadThread readThread;
private WriteThread writeThread;
private final ConcurrentLinkedQueue<RtmpPacket> rxPacketQueue;
private final Object rxPacketLock = new Object();
@ -92,8 +93,8 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
handshake(in, out);
active = true;
Log.d(TAG, "connect(): handshake done");
ReadThread readThread = new ReadThread(rtmpSessionInfo, in, this, this);
writeThread = new WriteThread(rtmpSessionInfo, out, videoFrameCacheNumber, this);
readThread = new ReadThread(rtmpSessionInfo, in, this);
writeThread = new WriteThread(rtmpSessionInfo, out, videoFrameCacheNumber);
readThread.start();
writeThread.start();
@ -176,7 +177,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
if (!fullyConnected) {
throw new IllegalStateException("Not connected to RTMP server");
}
if (currentStreamId == -1) {
throw new IllegalStateException("No current stream object exists");
}
@ -190,7 +190,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
publish.addData(streamName);
publish.addData(publishType);
writeThread.send(publish);
}
@Override
@ -322,8 +321,6 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
}
}
}
shutdownImpl();
}
private void handleRxInvoke(Command invoke) throws IOException {
@ -369,44 +366,45 @@ public class RtmpConnection implements RtmpPublisher, PacketRxHandler, ThreadCon
}
}
@Override
public void threadHasExited(Thread thread) {
shutdown();
}
@Override
public void shutdown() {
// shutdown read thread
try {
// It will invoke EOFException in read thread
socket.shutdownInput();
} catch (IOException ioe) {
ioe.printStackTrace();
}
readThread.shutdown();
try {
readThread.join();
} catch (InterruptedException ie) {
ie.printStackTrace();
readThread.interrupt();
}
// shutdown write thread
writeThread.shutdown();
try {
writeThread.join();
} catch (InterruptedException ie) {
ie.printStackTrace();
writeThread.interrupt();
}
// shutdown handleRxPacketLoop
active = false;
synchronized (rxPacketLock) {
rxPacketLock.notify();
}
}
private void shutdownImpl() {
// Shut down read/write threads, if necessary
if (Thread.activeCount() > 1) {
Log.i(TAG, "shutdown(): Shutting down read/write threads");
Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
for (Thread thread : threads) {
if (thread instanceof ReadThread && thread.isAlive()) {
((ReadThread) thread).shutdown();
} else if (thread instanceof WriteThread && thread.isAlive()) {
((WriteThread) thread).shutdown();
}
try {
thread.join();
} catch (InterruptedException ie) {
ie.printStackTrace();
thread.interrupt();
}
}
}
// shutdown socket as well as its input and output stream
if (socket != null) {
try {
socket.close();
Log.d(TAG, "socket closed");
} catch (IOException ex) {
Log.w(TAG, "shutdown(): failed to close socket", ex);
Log.e(TAG, "shutdown(): failed to close socket", ex);
}
}
}

@ -3,7 +3,6 @@ package net.ossrs.sea.rtmp.io;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketException;
import java.util.Arrays;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
@ -27,14 +26,12 @@ public class WriteThread extends Thread {
private final Object txPacketLock = new Object();
private volatile boolean active = true;
private AtomicInteger videoFrameCacheNumber;
private ThreadController threadController;
public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, AtomicInteger i, ThreadController threadController) {
public WriteThread(RtmpSessionInfo rtmpSessionInfo, OutputStream out, AtomicInteger count) {
super("RtmpWriteThread");
this.rtmpSessionInfo = rtmpSessionInfo;
this.out = out;
this.videoFrameCacheNumber = i;
this.threadController = threadController;
this.videoFrameCacheNumber = count;
}
@Override
@ -73,23 +70,15 @@ public class WriteThread extends Thread {
synchronized (txPacketLock) {
try {
// isEmpty() may take some time, so time out should be set to wait next offer
txPacketLock.wait(1000);
txPacketLock.wait(500);
} catch (InterruptedException ex) {
Log.w(TAG, "Interrupted", ex);
this.interrupt();
}
}
}
// Close outputstream
try {
out.close();
} catch (IOException ex) {
Log.w(TAG, "WriteThread: Failed to close outputstream", ex);
}
Log.d(TAG, "exiting");
if (threadController != null) {
threadController.threadHasExited(this);
}
Log.d(TAG, "exit");
}
/** Transmit the specified RTMP packet (thread-safe) */
@ -102,16 +91,8 @@ public class WriteThread extends Thread {
}
}
/** Transmit the specified RTMP packet (thread-safe) */
public void send(RtmpPacket... rtmpPackets) {
writeQueue.addAll(Arrays.asList(rtmpPackets));
synchronized (txPacketLock) {
txPacketLock.notify();
}
}
public void shutdown() {
Log.d(TAG, "Stopping write thread...");
Log.d(TAG, "Stopping");
active = false;
synchronized (txPacketLock) {
txPacketLock.notify();

Loading…
Cancel
Save