HDDS-380. Remove synchronization from ChunkGroupOutputStream and ChunkOutputStream. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
3fa4639421
commit
0bd4217194
@ -99,7 +99,7 @@ public ByteBuffer getBuffer() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void write(int b) throws IOException {
|
public void write(int b) throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
int rollbackPosition = buffer.position();
|
int rollbackPosition = buffer.position();
|
||||||
int rollbackLimit = buffer.limit();
|
int rollbackLimit = buffer.limit();
|
||||||
@ -110,7 +110,7 @@ public synchronized void write(int b) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void write(byte[] b, int off, int len)
|
public void write(byte[] b, int off, int len)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (b == null) {
|
if (b == null) {
|
||||||
throw new NullPointerException();
|
throw new NullPointerException();
|
||||||
@ -137,7 +137,7 @@ public synchronized void write(byte[] b, int off, int len)
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
checkOpen();
|
checkOpen();
|
||||||
if (buffer.position() > 0) {
|
if (buffer.position() > 0) {
|
||||||
int rollbackPosition = buffer.position();
|
int rollbackPosition = buffer.position();
|
||||||
@ -147,7 +147,7 @@ public synchronized void flush() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (xceiverClientManager != null && xceiverClient != null
|
if (xceiverClientManager != null && xceiverClient != null
|
||||||
&& buffer != null) {
|
&& buffer != null) {
|
||||||
if (buffer.position() > 0) {
|
if (buffer.position() > 0) {
|
||||||
@ -164,7 +164,7 @@ public synchronized void close() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void cleanup() {
|
public void cleanup() {
|
||||||
xceiverClientManager.releaseClient(xceiverClient);
|
xceiverClientManager.releaseClient(xceiverClient);
|
||||||
xceiverClientManager = null;
|
xceiverClientManager = null;
|
||||||
xceiverClient = null;
|
xceiverClient = null;
|
||||||
@ -176,7 +176,7 @@ public synchronized void cleanup() {
|
|||||||
*
|
*
|
||||||
* @throws IOException if stream is closed
|
* @throws IOException if stream is closed
|
||||||
*/
|
*/
|
||||||
private synchronized void checkOpen() throws IOException {
|
private void checkOpen() throws IOException {
|
||||||
if (xceiverClient == null) {
|
if (xceiverClient == null) {
|
||||||
throw new IOException("ChunkOutputStream has been closed.");
|
throw new IOException("ChunkOutputStream has been closed.");
|
||||||
}
|
}
|
||||||
@ -191,7 +191,7 @@ private synchronized void checkOpen() throws IOException {
|
|||||||
* @param rollbackLimit limit to restore in buffer if write fails
|
* @param rollbackLimit limit to restore in buffer if write fails
|
||||||
* @throws IOException if there is an I/O error while performing the call
|
* @throws IOException if there is an I/O error while performing the call
|
||||||
*/
|
*/
|
||||||
private synchronized void flushBufferToChunk(int rollbackPosition,
|
private void flushBufferToChunk(int rollbackPosition,
|
||||||
int rollbackLimit) throws IOException {
|
int rollbackLimit) throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
@ -213,7 +213,7 @@ private synchronized void flushBufferToChunk(int rollbackPosition,
|
|||||||
*
|
*
|
||||||
* @throws IOException if there is an I/O error while performing the call
|
* @throws IOException if there is an I/O error while performing the call
|
||||||
*/
|
*/
|
||||||
private synchronized void writeChunkToContainer() throws IOException {
|
private void writeChunkToContainer() throws IOException {
|
||||||
buffer.flip();
|
buffer.flip();
|
||||||
ByteString data = ByteString.copyFrom(buffer);
|
ByteString data = ByteString.copyFrom(buffer);
|
||||||
ChunkInfo chunk = ChunkInfo
|
ChunkInfo chunk = ChunkInfo
|
||||||
|
@ -105,7 +105,7 @@ public ChunkGroupOutputStream() {
|
|||||||
* @param length
|
* @param length
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized void addStream(OutputStream outputStream, long length) {
|
public void addStream(OutputStream outputStream, long length) {
|
||||||
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
|
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,7 +227,7 @@ public long getByteOffset() {
|
|||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void write(int b) throws IOException {
|
public void write(int b) throws IOException {
|
||||||
byte[] buf = new byte[1];
|
byte[] buf = new byte[1];
|
||||||
buf[0] = (byte) b;
|
buf[0] = (byte) b;
|
||||||
write(buf, 0, 1);
|
write(buf, 0, 1);
|
||||||
@ -246,7 +246,7 @@ public synchronized void write(int b) throws IOException {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void write(byte[] b, int off, int len)
|
public void write(byte[] b, int off, int len)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
checkNotClosed();
|
checkNotClosed();
|
||||||
handleWrite(b, off, len);
|
handleWrite(b, off, len);
|
||||||
@ -404,7 +404,7 @@ private void allocateNewBlock(int index) throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
checkNotClosed();
|
checkNotClosed();
|
||||||
handleFlushOrClose(false);
|
handleFlushOrClose(false);
|
||||||
}
|
}
|
||||||
@ -450,7 +450,7 @@ private void handleFlushOrClose(boolean close) throws IOException {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -585,7 +585,7 @@ long getRemaining() {
|
|||||||
return length - currentPosition;
|
return length - currentPosition;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void checkStream() {
|
private void checkStream() {
|
||||||
if (this.outputStream == null) {
|
if (this.outputStream == null) {
|
||||||
this.outputStream = new ChunkOutputStream(blockID,
|
this.outputStream = new ChunkOutputStream(blockID,
|
||||||
key, xceiverClientManager, xceiverClient,
|
key, xceiverClientManager, xceiverClient,
|
||||||
|
Loading…
Reference in New Issue
Block a user