28import java.io.IOException;
29import java.io.InputStream;
30import java.io.OutputStream;
31import java.io.PrintStream;
32import java.io.RandomAccessFile;
33import java.lang.ref.WeakReference;
34import java.nio.ByteBuffer;
35import java.nio.MappedByteBuffer;
36import java.nio.channels.FileChannel;
37import java.nio.channels.FileChannel.MapMode;
39import org.jau.sys.Debug;
40import org.jau.sys.PlatformProps;
41import org.jau.sys.PlatformTypes;
111 public void setLength(
final long newSize)
throws IOException {
112 throw new IOException(
"file size change not supported");
134 static final boolean DEBUG;
146 private final int sliceShift;
147 private final FileChannel fc;
148 private final FileChannel.MapMode mmode;
151 private int sliceCount;
152 private ByteBuffer[] slices;
153 private WeakReference<ByteBuffer>[] slices2GC;
154 private long totalSize;
155 private int slicesEntries, slices2GCEntries;
156 private boolean synchronous;
158 private int refCount;
162 private int sliceIdx;
165 final void dbgDump(
final String prefix,
final PrintStream out) {
166 int _slicesEntries = 0;
167 for(
int i=0; i<sliceCount; i++) {
168 if(
null != slices[i] ) {
172 int _slices2GCEntries = 0;
173 int _slices2GCAliveEntries = 0;
174 for(
int i=0; i<sliceCount; i++) {
175 final WeakReference<ByteBuffer> ref = slices2GC[i];
178 if(
null != ref.get() ) {
179 _slices2GCAliveEntries++;
183 long fcSz = 0, pos = 0, rem = 0;
187 }
catch (
final IOException e) {
194 rem = totalSize - pos;
195 }
catch (
final IOException e) {
199 final int sliceCount2 =
null != slices ? slices.length : 0;
200 out.println(prefix+
" refCount "+refCount+
", fcSize "+fcSz+
", totalSize "+totalSize);
201 out.println(prefix+
" position "+pos+
", remaining "+rem);
202 out.println(prefix+
" mmode "+mmode+
", cmode "+cmode+
", fileResizeOp "+fileResizeOp);
203 out.println(prefix+
" slice "+sliceIdx+
" / "+sliceCount+
" ("+sliceCount2+
"), synchronous "+synchronous);
204 out.println(prefix+
" mapped "+slicesEntries+
" / "+_slicesEntries);
205 out.println(prefix+
" GC-queue "+slices2GCEntries+
" / "+_slices2GCEntries+
" (alive "+_slices2GCAliveEntries+
")");
206 out.println(prefix+
" sliceShift "+sliceShift+
" -> "+(1L << sliceShift));
209 MappedByteBufferInputStream(
final FileChannel fc,
final FileChannel.MapMode mmode,
final CacheMode cmode,
210 final int sliceShift,
final long totalSize,
final int currSliceIdx)
throws IOException {
211 this.sliceShift = sliceShift;
215 if( 0 > totalSize ) {
216 throw new IllegalArgumentException(
"Negative size "+totalSize);
226 this.sliceIdx = currSliceIdx;
231 if( MappedByteBufferInputStream.DEBUG ) {
232 this.dbgDump(
"CTOR", System.err);
248 final FileChannel.MapMode mmode,
250 final int sliceShift)
throws IOException {
251 this(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0);
305 final synchronized void checkOpen() throws IOException {
306 if( 0 == refCount ) {
307 throw new IOException(
"stream closed");
312 public final synchronized void close() throws IOException {
315 if( 0 == refCount ) {
317 cleanAllSlices(
true );
319 flushImpl(
true ,
false );
328 this.dbgDump(
"Close", System.err);
332 final FileChannel.MapMode getMapMode() {
return mmode; }
339 if( NoFileResize != this.fileResizeOp && this.fileResizeOp != fileResizeOp ) {
340 throw new IllegalStateException(
"FileResizeOp already set, this value differs");
342 this.fileResizeOp =
null != fileResizeOp ? fileResizeOp : NoFileResize;
358 public final synchronized void setLength(
final long newTotalSize)
throws IOException {
359 final long currentPosition;
360 if( 0 != newTotalSize && totalSize != newTotalSize ) {
363 currentPosition = -1L;
365 if( fc.size() != newTotalSize ) {
371 cleanAllSlices( synchronous );
376 flushImpl(
true ,
false );
379 notifyLengthChangeImpl(newTotalSize, currentPosition);
392 notifyLengthChangeImpl(newTotalSize, -1L);
394 private final synchronized void notifyLengthChangeImpl(
final long newTotalSize,
final long currentPosition)
throws IOException {
399 if( totalSize == newTotalSize ) {
402 }
else if( 0 == newTotalSize ) {
404 cleanAllSlices( synchronous );
405 @SuppressWarnings(
"unchecked")
406 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ];
407 slices2GC = newSlices2GC;
408 slices = new ByteBuffer[1];
409 slices[0] = ByteBuffer.allocate(0);
415 final long prePosition = 0 <= currentPosition ? currentPosition :
position();
417 final long sliceSize = 1L << sliceShift;
418 final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
419 @SuppressWarnings(
"unchecked")
420 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ];
421 final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
422 final
int copySliceCount = Math.min(newSliceCount, sliceCount-1);
423 if( 0 <= copySliceCount ) {
424 if( 0 < copySliceCount ) {
425 System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
426 System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
428 for(
int i=copySliceCount; i<sliceCount; i++) {
429 cleanSlice(i, synchronous);
432 slices2GC = newSlices2GC;
434 sliceCount = newSliceCount;
435 totalSize = newTotalSize;
436 if( newTotalSize < mark ) {
439 position2( Math.min(prePosition, newTotalSize) );
441 if( MappedByteBufferInputStream.DEBUG ) {
442 this.dbgDump(
"NotifyLengthChange", System.err);
453 public final synchronized void flush(
final boolean metaData)
throws IOException {
455 flushImpl(metaData,
true);
457 private final synchronized void flushImpl(
final boolean metaData,
final boolean syncBuffer)
throws IOException {
458 if( FileChannel.MapMode.READ_ONLY != mmode ) {
459 if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
460 for(
int i=0; i<sliceCount; i++) {
461 syncSlice(slices[i],
true);
463 for(
int i=0; i<sliceCount; i++) {
464 final WeakReference<ByteBuffer> ref = slices2GC[i];
466 syncSlice(ref.get(),
true);
484 throws IllegalStateException, IOException
503 public final synchronized ByteBuffer
currentSlice() throws IOException {
504 final ByteBuffer s0 = slices[sliceIdx];
509 final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx];
511 final ByteBuffer mbb = ref.get();
512 slices2GC[sliceIdx] =
null;
515 slices[sliceIdx] = mbb;
521 final long pos = (long)sliceIdx << sliceShift;
522 final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
523 slices[sliceIdx] = s1;
538 public final synchronized ByteBuffer
nextSlice() throws IOException {
539 if ( sliceIdx < sliceCount - 1 ) {
540 flushSlice(sliceIdx, synchronous);
555 if(
null != slices ) {
556 for(
int i=0; i<sliceCount; i++) {
557 flushSlice(i, synchronous);
561 this.dbgDump(
"FlushSlices", System.err);
565 synchronized void syncSlice(
final ByteBuffer s)
throws IOException {
566 syncSlice(s, synchronous);
568 synchronized void syncSlice(
final ByteBuffer s,
final boolean syncBuffer)
throws IOException {
569 if( syncBuffer &&
null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
571 ((MappedByteBuffer)s).force();
572 }
catch(
final Throwable t ) {
578 System.err.println(
"Caught "+t.getMessage());
584 private synchronized void flushSlice(
final int i,
final boolean syncBuffer)
throws IOException {
585 final ByteBuffer s = slices[i];
587 if( CacheMode.FLUSH_NONE != cmode ) {
590 if( CacheMode.FLUSH_PRE_HARD == cmode ) {
591 if( !cleanBuffer(s, syncBuffer) ) {
593 slices2GC[i] =
new WeakReference<ByteBuffer>(s);
597 syncSlice(s, syncBuffer);
598 slices2GC[i] =
new WeakReference<ByteBuffer>(s);
602 syncSlice(s, syncBuffer);
606 private synchronized void cleanAllSlices(
final boolean syncBuffers)
throws IOException {
607 if(
null != slices ) {
608 for(
int i=0; i<sliceCount; i++) {
609 cleanSlice(i, syncBuffers);
611 if( 0 != slicesEntries || 0 != slices2GCEntries ) {
612 final String err =
"mappedSliceCount "+slicesEntries+
", slices2GCEntries "+slices2GCEntries;
613 dbgDump(err+
": ", System.err);
614 throw new InternalError(err);
619 private synchronized void cleanSlice(
final int i,
final boolean syncBuffer)
throws IOException {
620 final ByteBuffer s1 = slices[i];
623 final WeakReference<ByteBuffer> ref = slices2GC[i];
635 cleanBuffer(s1, syncBuffer);
637 throw new InternalError(
"XXX");
639 }
else if(
null != s2 ) {
640 cleanBuffer(s2, syncBuffer);
643 private synchronized boolean cleanBuffer(
final ByteBuffer mbb,
final boolean syncBuffer)
throws IOException {
644 syncSlice(mbb, syncBuffer);
645 if( !mbb.isDirect() ) {
648 if( !Buffers.Cleaner.clean(mbb) && CacheMode.FLUSH_PRE_HARD == cmode ) {
674 public final synchronized long length() {
690 public final synchronized long remaining() throws IOException {
691 return 0 < refCount ? totalSize -
position() : 0;
702 public final synchronized int available() throws IOException {
715 public final synchronized long position() throws IOException {
717 return ( (
long)sliceIdx << sliceShift ) +
currentSlice().position();
735 if ( totalSize < newPosition || 0 > newPosition ) {
736 throw new IllegalArgumentException(
"new position "+newPosition+
" not within [0.."+totalSize+
"]");
738 final int preSlice = sliceIdx;
740 if ( totalSize == newPosition ) {
742 sliceIdx = Math.max(0, sliceCount - 1);
743 if( preSlice != sliceIdx ) {
744 flushSlice(preSlice, synchronous);
747 s.position( s.capacity() );
749 sliceIdx = (int)( newPosition >>> sliceShift );
750 if( preSlice != sliceIdx ) {
751 flushSlice(preSlice, synchronous);
753 currentSlice().position( (
int)( newPosition - ( (
long)sliceIdx << sliceShift ) ) );
757 private final synchronized void position2(
final long newPosition )
throws IOException {
758 if ( totalSize == newPosition ) {
760 sliceIdx = Math.max(0, sliceCount - 1);
762 s.position( s.capacity() );
764 sliceIdx = (int)( newPosition >>> sliceShift );
765 currentSlice().position( (
int)( newPosition - ( (
long)sliceIdx << sliceShift ) ) );
782 public final synchronized void mark(
final int readlimit ) {
786 }
catch (
final IOException e) {
787 throw new RuntimeException(e);
798 public final synchronized void reset() throws IOException {
801 throw new IOException(
"mark not set");
811 public final synchronized long skip(
final long n )
throws IOException {
817 final long rem = totalSize - pos;
818 final long s = Math.min( rem, n );
824 public final synchronized int read() throws IOException {
827 if ( !slice.hasRemaining() ) {
832 return slice.get() & 0xFF;
836 public final synchronized int read(
final byte[] b,
final int off,
final int len )
throws IOException {
839 throw new NullPointerException();
840 }
else if( off < 0 ||
844 throw new IndexOutOfBoundsException(
"offset "+off+
", length "+len+
", b.length "+b.length);
845 }
else if ( 0 == len ) {
849 if ( 0 == totalRem ) {
852 final int maxLen = (int)Math.min( totalRem, len );
854 while(
read < maxLen ) {
856 int currRem = slice.remaining();
857 if ( 0 == currRem ) {
859 throw new InternalError(
"Unexpected EOT");
861 currRem = slice.remaining();
863 final int currLen = Math.min( maxLen -
read, currRem );
864 slice.get( b, off +
read, currLen );
879 public final synchronized int read(
final ByteBuffer b,
final int len)
throws IOException {
882 throw new NullPointerException();
883 }
else if (len < 0 || len > b.remaining()) {
884 throw new IndexOutOfBoundsException(
"length "+len+
", b "+b);
885 }
else if ( 0 == len ) {
889 if ( 0 == totalRem ) {
892 final int maxLen = (int)Math.min( totalRem, len );
894 while(
read < maxLen ) {
896 int currRem = slice.remaining();
897 if ( 0 == currRem ) {
899 throw new InternalError(
"Unexpected EOT");
901 currRem = slice.remaining();
903 final int currLen = Math.min( maxLen -
read, currRem );
904 if( slice.hasArray() && b.hasArray() ) {
905 System.arraycopy(slice.array(), slice.arrayOffset() + slice.position(),
906 b.array(), b.arrayOffset() + b.position(),
908 slice.position( slice.position() + currLen );
909 b.position( b.position() + currLen );
910 }
else if( currLen == currRem ) {
913 final int _limit = slice.limit();
914 slice.limit(currLen);
An OutputStream implementation based on an underlying FileChannel's memory mapped ByteBuffer.
Helper routines for logging and debugging.
static final boolean debug(final String subcomponent)