jaulib v1.3.0
Jau Support Library (C++, Java, ..)
MappedByteBufferInputStream.java
Go to the documentation of this file.
1/**
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2021 Gothel Software e.K.
4 * Copyright (c) 2014 Gothel Software e.K.
5 * Copyright (c) 2014 JogAmp Community.
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining
8 * a copy of this software and associated documentation files (the
9 * "Software"), to deal in the Software without restriction, including
10 * without limitation the rights to use, copy, modify, merge, publish,
11 * distribute, sublicense, and/or sell copies of the Software, and to
12 * permit persons to whom the Software is furnished to do so, subject to
13 * the following conditions:
14 *
15 * The above copyright notice and this permission notice shall be
16 * included in all copies or substantial portions of the Software.
17 *
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
20 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
22 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 */
26package org.jau.io;
27
28import java.io.IOException;
29import java.io.InputStream;
30import java.io.OutputStream;
31import java.io.PrintStream;
32import java.io.RandomAccessFile;
33import java.lang.ref.WeakReference;
34import java.nio.ByteBuffer;
35import java.nio.MappedByteBuffer;
36import java.nio.channels.FileChannel;
37import java.nio.channels.FileChannel.MapMode;
38
39import org.jau.sys.Debug;
40import org.jau.sys.PlatformProps;
41import org.jau.sys.PlatformTypes;
42
43/**
44 * An {@link InputStream} implementation based on an underlying {@link FileChannel}'s memory mapped {@link ByteBuffer},
45 * {@link #markSupported() supporting} {@link #mark(int) mark} and {@link #reset()}.
46 * <p>
47 * Implementation allows full memory mapped {@link ByteBuffer} coverage via {@link FileChannel#map(MapMode, long, long) FileChannel}
48 * beyond its size limitation of {@link Integer#MAX_VALUE} utilizing an array of {@link ByteBuffer} slices.<br>
49 * </p>
50 * <p>
51 * Implementation further allows full random access via {@link #position()} and {@link #position(long)}
52 * and accessing the memory mapped {@link ByteBuffer} slices directly via {@link #currentSlice()} and {@link #nextSlice()}.
53 * </p>
54 * @since 0.3.0
55 */
56public class MappedByteBufferInputStream extends InputStream {
57 public static enum CacheMode {
58 /**
59 * Keep all previous lazily cached buffer slices alive, useful for hopping readers,
60 * i.e. random access via {@link MappedByteBufferInputStream#position(long) position(p)}
61 * or {@link MappedByteBufferInputStream#reset() reset()}.
62 * <p>
63 * Note that without flushing, the platform may fail memory mapping
64 * due to virtual address space exhaustion.<br>
65 * In such case an {@link OutOfMemoryError} may be thrown directly,
66 * or encapsulated as the {@link IOException#getCause() the cause}
67 * of a thrown {@link IOException}.
68 * </p>
69 */
71 /**
72 * Soft flush the previous lazily cached buffer slice when caching the next buffer slice,
73 * useful for sequential forward readers, as well as for hopping readers like {@link #FLUSH_NONE}
74 * in case of relatively short periods between hopping across slices.
75 * <p>
76 * Implementation clears the buffer slice reference
77 * while preserving a {@link WeakReference} to allow its resurrection if not yet
78 * {@link System#gc() garbage collected}.
79 * </p>
80 */
82 /**
83 * Hard flush the previous lazily cached buffer slice when caching the next buffer slice,
84 * useful for sequential forward readers.
85 * <p>
86 * Besides clearing the buffer slice reference,
87 * implementation attempts to hard flush the mapped buffer
88 * using a {@code sun.misc.Cleaner} by reflection.
89 * In case such method does not exist nor works, implementation falls back to {@link #FLUSH_PRE_SOFT}.
90 * </p>
91 * <p>
92 * This is the default.
93 * </p>
94 */
95 FLUSH_PRE_HARD
96 };
97
98 /**
99 * File resize interface allowing a file to change its size,
100 * e.g. via {@link RandomAccessFile#setLength(long)}.
101 */
102 public static interface FileResizeOp {
103 /**
104 * @param newSize the new file size
105 * @throws IOException if file size change is not supported or any other I/O error occurs
106 */
107 void setLength(final long newSize) throws IOException;
108 }
109 private static final FileResizeOp NoFileResize = new FileResizeOp() {
110 @Override
111 public void setLength(final long newSize) throws IOException {
112 throw new IOException("file size change not supported");
113 }
114 };
115
116 /**
117 * Default slice shift, i.e. 1L << shift, denoting slice size in MiB:
118 * <ul>
119 * <li>{@link Platform#is64Bit() 64bit machines} -> 30 = 1024 MiB</li>
120 * <li>{@link Platform#is32Bit() 32bit machines} -> 29 = 512 MiB</li>
121 * </ul>
122 * <p>
123 * In case the default is too much of-used up address-space, one may choose other values:
124 * <ul>
125 * <li>29 -> 512 MiB</li>
126 * <li>28 -> 256 MiB</li>
127 * <li>27 -> 128 MiB</li>
128 * <li>26 -> 64 MiB</li>
129 * </ul>
130 * </p>
131 */
132 public static final int DEFAULT_SLICE_SHIFT;
133
134 static final boolean DEBUG;
135
136 static {
139 } else {
141 }
142
143 DEBUG = Debug.debug("ByteBufferInputStream");
144 }
145
146 private final int sliceShift;
147 private final FileChannel fc;
148 private final FileChannel.MapMode mmode;
149 private FileResizeOp fileResizeOp = NoFileResize;
150
151 private int sliceCount;
152 private ByteBuffer[] slices;
153 private WeakReference<ByteBuffer>[] slices2GC;
154 private long totalSize;
155 private int slicesEntries, slices2GCEntries;
156 private boolean synchronous;
157
158 private int refCount;
159
160 private CacheMode cmode;
161
162 private int sliceIdx;
163 private long mark;
164
165 final void dbgDump(final String prefix, final PrintStream out) {
166 int _slicesEntries = 0;
167 for(int i=0; i<sliceCount; i++) {
168 if( null != slices[i] ) {
169 _slicesEntries++;
170 }
171 }
172 int _slices2GCEntries = 0;
173 int _slices2GCAliveEntries = 0;
174 for(int i=0; i<sliceCount; i++) {
175 final WeakReference<ByteBuffer> ref = slices2GC[i];
176 if( null != ref ) {
177 _slices2GCEntries++;
178 if( null != ref.get() ) {
179 _slices2GCAliveEntries++;
180 }
181 }
182 }
183 long fcSz = 0, pos = 0, rem = 0;
184 if( fc.isOpen() ) {
185 try {
186 fcSz = fc.size();
187 } catch (final IOException e) {
188 e.printStackTrace();
189 }
190 }
191 if( 0 < refCount ) {
192 try {
193 pos = position();
194 rem = totalSize - pos;
195 } catch (final IOException e) {
196 e.printStackTrace();
197 }
198 }
199 final int sliceCount2 = null != slices ? slices.length : 0;
200 out.println(prefix+" refCount "+refCount+", fcSize "+fcSz+", totalSize "+totalSize);
201 out.println(prefix+" position "+pos+", remaining "+rem);
202 out.println(prefix+" mmode "+mmode+", cmode "+cmode+", fileResizeOp "+fileResizeOp);
203 out.println(prefix+" slice "+sliceIdx+" / "+sliceCount+" ("+sliceCount2+"), synchronous "+synchronous);
204 out.println(prefix+" mapped "+slicesEntries+" / "+_slicesEntries);
205 out.println(prefix+" GC-queue "+slices2GCEntries+" / "+_slices2GCEntries+" (alive "+_slices2GCAliveEntries+")");
206 out.println(prefix+" sliceShift "+sliceShift+" -> "+(1L << sliceShift));
207 }
208
209 MappedByteBufferInputStream(final FileChannel fc, final FileChannel.MapMode mmode, final CacheMode cmode,
210 final int sliceShift, final long totalSize, final int currSliceIdx) throws IOException {
211 this.sliceShift = sliceShift;
212 this.fc = fc;
213 this.mmode = mmode;
214
215 if( 0 > totalSize ) {
216 throw new IllegalArgumentException("Negative size "+totalSize);
217 }
218 // trigger notifyLengthChange
219 this.totalSize = -1;
220 this.sliceCount = 0;
221 notifyLengthChange( totalSize );
222
223 this.refCount = 1;
224 this.cmode = cmode;
225
226 this.sliceIdx = currSliceIdx;
227 this.mark = -1;
228
229 currentSlice().position(0);
230
231 if( MappedByteBufferInputStream.DEBUG ) {
232 this.dbgDump("CTOR", System.err);
233 }
234 }
235
236 /**
237 * Creates a new instance using the given {@link FileChannel}.
238 * <p>
239 * The {@link ByteBuffer} slices will be mapped lazily at first usage.
240 * </p>
241 * @param fileChannel the file channel to be mapped lazily.
242 * @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
243 * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
244 * @param sliceShift the pow2 slice size, default is {@link #DEFAULT_SLICE_SHIFT}.
245 * @throws IOException
246 */
247 public MappedByteBufferInputStream(final FileChannel fileChannel,
248 final FileChannel.MapMode mmode,
249 final CacheMode cmode,
250 final int sliceShift) throws IOException {
251 this(fileChannel, mmode, cmode, sliceShift, fileChannel.size(), 0);
252 }
253
254 /**
255 * Creates a new instance using the given {@link FileChannel},
256 * given mapping-mode, given cache-mode and the {@link #DEFAULT_SLICE_SHIFT}.
257 * <p>
258 * The {@link ByteBuffer} slices will be mapped lazily at first usage.
259 * </p>
260 * @param fileChannel the file channel to be used.
261 * @param mmode the map mode, default is {@link FileChannel.MapMode#READ_ONLY}.
262 * @param cmode the caching mode, default is {@link CacheMode#FLUSH_PRE_HARD}.
263 * @throws IOException
264 */
265 public MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode) throws IOException {
266 this(fileChannel, mmode, cmode, DEFAULT_SLICE_SHIFT);
267 }
268
269 /**
270 * Creates a new instance using the given {@link FileChannel},
271 * {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode, {@link CacheMode#FLUSH_PRE_HARD}
272 * and the {@link #DEFAULT_SLICE_SHIFT}.
273 * <p>
274 * The {@link ByteBuffer} slices will be mapped {@link FileChannel.MapMode#READ_ONLY} lazily at first usage.
275 * </p>
276 * @param fileChannel the file channel to be used.
277 * @throws IOException
278 */
279 public MappedByteBufferInputStream(final FileChannel fileChannel) throws IOException {
280 this(fileChannel, FileChannel.MapMode.READ_ONLY, CacheMode.FLUSH_PRE_HARD, DEFAULT_SLICE_SHIFT);
281 }
282
283 /**
284 * Enable or disable synchronous mode.
285 * <p>
286 * If synchronous mode is enabled, mapped buffers will be {@link #flush(boolean) flushed}
287 * if {@link #notifyLengthChange(long) resized}, <i>written to</i> or {@link #close() closing} in {@link FileChannel.MapMode#READ_WRITE read-write} mapping mode.
288 * </p>
289 * <p>
290 * If synchronous mode is enabled, {@link FileChannel#force(boolean)} is issued
291 * if {@link #setLength(long) resizing} or {@link #close() closing} and not in {@link FileChannel.MapMode#READ_ONLY read-only} mapping mode.
292 * </p>
293 * @param s {@code true} to enable synchronous mode
294 */
295 public final synchronized void setSynchronous(final boolean s) {
296 synchronous = s;
297 }
298 /**
299 * Return {@link #setSynchronous(boolean) synchronous mode}.
300 */
301 public final synchronized boolean getSynchronous() {
302 return synchronous ;
303 }
304
305 final synchronized void checkOpen() throws IOException {
306 if( 0 == refCount ) {
307 throw new IOException("stream closed");
308 }
309 }
310
311 @Override
312 public final synchronized void close() throws IOException {
313 if( 0 < refCount ) {
314 refCount--;
315 if( 0 == refCount ) {
316 try {
317 cleanAllSlices( true /* syncBuffer */ );
318 } finally {
319 flushImpl(true /* metaData */, false /* syncBuffer */);
320 fc.close();
321 mark = -1;
322 sliceIdx = -1;
323 super.close();
324 }
325 }
326 }
327 if( MappedByteBufferInputStream.DEBUG ) {
328 this.dbgDump("Close", System.err);
329 }
330 }
331
332 final FileChannel.MapMode getMapMode() { return mmode; }
333
334 /**
335 * @param fileResizeOp the new {@link FileResizeOp}.
336 * @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
337 */
338 public final synchronized void setFileResizeOp(final FileResizeOp fileResizeOp) throws IllegalStateException {
339 if( NoFileResize != this.fileResizeOp && this.fileResizeOp != fileResizeOp ) {
340 throw new IllegalStateException("FileResizeOp already set, this value differs");
341 }
342 this.fileResizeOp = null != fileResizeOp ? fileResizeOp : NoFileResize;
343 }
344
345 /**
346 * Resize the underlying {@link FileChannel}'s size and adjusting this instance
347 * via {@link #notifyLengthChange(long) accordingly}.
348 * <p>
349 * User must have a {@link FileResizeOp} {@link #setFileResizeOp(FileResizeOp) registered} before.
350 * </p>
351 * <p>
352 * Implementation calls {@link #notifyLengthChange(long)} after {@link FileResizeOp#setLength(long)}.
353 * </p>
354 * @param newTotalSize the new total size
355 * @throws IOException if no {@link FileResizeOp} has been {@link #setFileResizeOp(FileResizeOp) registered}
356 * or if a buffer slice operation failed
357 */
358 public final synchronized void setLength(final long newTotalSize) throws IOException {
359 final long currentPosition;
360 if( 0 != newTotalSize && totalSize != newTotalSize ) {
361 currentPosition = position();
362 } else {
363 currentPosition = -1L;
364 }
365 if( fc.size() != newTotalSize ) {
367 // On Windows, we have to close all mapped slices.
368 // Otherwise we will receive:
369 // java.io.IOException: The requested operation cannot be performed on a file with a user-mapped section open
370 // at java.io.RandomAccessFile.setLength(Native Method)
371 cleanAllSlices( synchronous );
372 }
373 fileResizeOp.setLength(newTotalSize);
374 if( synchronous ) {
375 // buffers will be synchronized in notifyLengthChangeImpl(..)
376 flushImpl( true /* metaData */, false /* syncBuffer */);
377 }
378 }
379 notifyLengthChangeImpl(newTotalSize, currentPosition);
380 }
381
382 /**
383 * Notify this instance that the underlying {@link FileChannel}'s size has been changed
384 * and adjusting this instances buffer slices and states accordingly.
385 * <p>
386 * Should be called by user API when aware of such event.
387 * </p>
388 * @param newTotalSize the new total size
389 * @throws IOException if a buffer slice operation failed
390 */
391 public final synchronized void notifyLengthChange(final long newTotalSize) throws IOException {
392 notifyLengthChangeImpl(newTotalSize, -1L);
393 }
394 private final synchronized void notifyLengthChangeImpl(final long newTotalSize, final long currentPosition) throws IOException {
395 /* if( DEBUG ) {
396 System.err.println("notifyLengthChange.0: "+totalSize+" -> "+newTotalSize);
397 dbgDump("notifyLengthChange.0:", System.err);
398 } */
399 if( totalSize == newTotalSize ) {
400 // NOP
401 return;
402 } else if( 0 == newTotalSize ) {
403 // ZERO - ensure one entry avoiding NULL checks
404 cleanAllSlices( synchronous );
405 @SuppressWarnings("unchecked")
406 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ 1 ];
407 slices2GC = newSlices2GC;
408 slices = new ByteBuffer[1];
409 slices[0] = ByteBuffer.allocate(0);
410 sliceCount = 0;
411 totalSize = 0;
412 mark = -1;
413 sliceIdx = 0;
414 } else {
415 final long prePosition = 0 <= currentPosition ? currentPosition : position();
416
417 final long sliceSize = 1L << sliceShift;
418 final int newSliceCount = (int)( ( newTotalSize + ( sliceSize - 1 ) ) / sliceSize );
419 @SuppressWarnings("unchecked")
420 final WeakReference<ByteBuffer>[] newSlices2GC = new WeakReference[ newSliceCount ];
421 final ByteBuffer[] newSlices = new ByteBuffer[ newSliceCount ];
422 final int copySliceCount = Math.min(newSliceCount, sliceCount-1); // drop last (resize)
423 if( 0 <= copySliceCount ) {
424 if( 0 < copySliceCount ) {
425 System.arraycopy(slices2GC, 0, newSlices2GC, 0, copySliceCount);
426 System.arraycopy(slices, 0, newSlices, 0, copySliceCount);
427 }
428 for(int i=copySliceCount; i<sliceCount; i++) { // clip shrunken slices + 1 (last), incl. slices2GC!
429 cleanSlice(i, synchronous);
430 }
431 }
432 slices2GC = newSlices2GC;
433 slices = newSlices;
434 sliceCount = newSliceCount;
435 totalSize = newTotalSize;
436 if( newTotalSize < mark ) {
437 mark = -1;
438 }
439 position2( Math.min(prePosition, newTotalSize) ); // -> clipped position (set currSlice and re-map/-pos buffer)
440 }
441 if( MappedByteBufferInputStream.DEBUG ) {
442 this.dbgDump("NotifyLengthChange", System.err);
443 }
444 }
445
446 /**
447 * Similar to {@link OutputStream#flush()}, synchronizes all mapped buffers
448 * from local storage via {@link MappedByteBuffer#force()}
449 * as well as the {@link FileChannel#force(boolean)} w/o {@code metaData}.
450 * @param metaData TODO
451 * @throws IOException if this stream has been {@link #close() closed}.
452 */
453 public final synchronized void flush(final boolean metaData) throws IOException {
454 checkOpen();
455 flushImpl(metaData, true);
456 }
457 private final synchronized void flushImpl(final boolean metaData, final boolean syncBuffer) throws IOException {
458 if( FileChannel.MapMode.READ_ONLY != mmode ) {
459 if( syncBuffer && FileChannel.MapMode.READ_WRITE == mmode ) {
460 for(int i=0; i<sliceCount; i++) {
461 syncSlice(slices[i], true);
462 }
463 for(int i=0; i<sliceCount; i++) {
464 final WeakReference<ByteBuffer> ref = slices2GC[i];
465 if( null != ref ) {
466 syncSlice(ref.get(), true);
467 }
468 }
469 }
470 fc.force(metaData);
471 }
472 }
473
474
475 /**
476 * Returns a new MappedByteBufferOutputStream instance sharing
477 * all resources of this input stream, including all buffer slices.
478 *
479 * @throws IllegalStateException if attempting to set the {@link FileResizeOp} to a different value than before
480 * @throws IOException if this instance was opened w/ {@link FileChannel.MapMode#READ_ONLY}
481 * or if this stream has been {@link #close() closed}.
482 */
483 public final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
484 throws IllegalStateException, IOException
485 {
486 checkOpen();
487 final MappedByteBufferOutputStream res = new MappedByteBufferOutputStream(this, fileResizeOp);
488 refCount++;
489 return res;
490 }
491
492 /**
493 * Return the mapped {@link ByteBuffer} slice at the current {@link #position()}.
494 * <p>
495 * Due to the nature of using sliced buffers mapping the whole region,
496 * user has to determine whether the returned buffer covers the desired region
497 * and may fetch the {@link #nextSlice()} until satisfied.<br>
498 * It is also possible to repeat this operation after reposition the stream via {@link #position(long)}
499 * or {@link #skip(long)} to a position within the next block, similar to {@link #nextSlice()}.
500 * </p>
501 * @throws IOException if a buffer slice operation failed.
502 */
503 public final synchronized ByteBuffer currentSlice() throws IOException {
504 final ByteBuffer s0 = slices[sliceIdx];
505 if ( null != s0 ) {
506 return s0;
507 } else {
508 if( CacheMode.FLUSH_PRE_SOFT == cmode ) {
509 final WeakReference<ByteBuffer> ref = slices2GC[sliceIdx];
510 if( null != ref ) {
511 final ByteBuffer mbb = ref.get();
512 slices2GC[sliceIdx] = null;
513 slices2GCEntries--;
514 if( null != mbb ) {
515 slices[sliceIdx] = mbb;
516 slicesEntries++;
517 return mbb;
518 }
519 }
520 }
521 final long pos = (long)sliceIdx << sliceShift;
522 final MappedByteBuffer s1 = fc.map(mmode, pos, Math.min(1L << sliceShift, totalSize - pos));
523 slices[sliceIdx] = s1;
524 slicesEntries++;
525 return s1;
526 }
527 }
528
529 /**
530 * Return the <i>next</i> mapped {@link ByteBuffer} slice from the current {@link #position()},
531 * implicitly setting {@link #position(long)} to the start of the returned <i>next</i> slice,
532 * see {@link #currentSlice()}.
533 * <p>
534 * If no subsequent slice is available, {@code null} is being returned.
535 * </p>
536 * @throws IOException if a buffer slice operation failed.
537 */
538 public final synchronized ByteBuffer nextSlice() throws IOException {
539 if ( sliceIdx < sliceCount - 1 ) {
540 flushSlice(sliceIdx, synchronous);
541 sliceIdx++;
542 final ByteBuffer slice = currentSlice();
543 slice.position( 0 );
544 return slice;
545 } else {
546 return null;
547 }
548 }
549
550 /**
551 * Releases the mapped {@link ByteBuffer} slices.
552 * @throws IOException if a buffer slice operation failed.
553 */
554 public final synchronized void flushSlices() throws IOException {
555 if( null != slices ) {
556 for(int i=0; i<sliceCount; i++) {
557 flushSlice(i, synchronous);
558 }
559 }
560 if( MappedByteBufferInputStream.DEBUG ) {
561 this.dbgDump("FlushSlices", System.err);
562 }
563 }
564
565 synchronized void syncSlice(final ByteBuffer s) throws IOException {
566 syncSlice(s, synchronous);
567 }
568 synchronized void syncSlice(final ByteBuffer s, final boolean syncBuffer) throws IOException {
569 if( syncBuffer && null != s && FileChannel.MapMode.READ_WRITE == mmode ) {
570 try {
571 ((MappedByteBuffer)s).force();
572 } catch( final Throwable t ) {
573 // On Windows .. this may happen, like:
574 // java.io.IOException: The process cannot access the file because another process has locked a portion of the file
575 // at java.nio.MappedByteBuffer.force0(Native Method)
576 // at java.nio.MappedByteBuffer.force(MappedByteBuffer.java:203)
577 if( DEBUG ) {
578 System.err.println("Caught "+t.getMessage());
579 t.printStackTrace();
580 }
581 }
582 }
583 }
584 private synchronized void flushSlice(final int i, final boolean syncBuffer) throws IOException {
585 final ByteBuffer s = slices[i];
586 if ( null != s ) {
587 if( CacheMode.FLUSH_NONE != cmode ) {
588 slices[i] = null; // trigger slice GC
589 slicesEntries--;
590 if( CacheMode.FLUSH_PRE_HARD == cmode ) {
591 if( !cleanBuffer(s, syncBuffer) ) {
592 // buffer already synced in cleanBuffer(..) if requested
593 slices2GC[i] = new WeakReference<ByteBuffer>(s);
594 slices2GCEntries++;
595 }
596 } else {
597 syncSlice(s, syncBuffer);
598 slices2GC[i] = new WeakReference<ByteBuffer>(s);
599 slices2GCEntries++;
600 }
601 } else {
602 syncSlice(s, syncBuffer);
603 }
604 }
605 }
606 private synchronized void cleanAllSlices(final boolean syncBuffers) throws IOException {
607 if( null != slices ) {
608 for(int i=0; i<sliceCount; i++) {
609 cleanSlice(i, syncBuffers);
610 }
611 if( 0 != slicesEntries || 0 != slices2GCEntries ) { // FIXME
612 final String err = "mappedSliceCount "+slicesEntries+", slices2GCEntries "+slices2GCEntries;
613 dbgDump(err+": ", System.err);
614 throw new InternalError(err);
615 }
616 }
617 }
618
619 private synchronized void cleanSlice(final int i, final boolean syncBuffer) throws IOException {
620 final ByteBuffer s1 = slices[i];
621 final ByteBuffer s2;
622 {
623 final WeakReference<ByteBuffer> ref = slices2GC[i];
624 slices2GC[i] = null;
625 if( null != ref ) {
626 slices2GCEntries--;
627 s2 = ref.get();
628 } else {
629 s2 = null;
630 }
631 }
632 if( null != s1 ) {
633 slices[i] = null;
634 slicesEntries--;
635 cleanBuffer(s1, syncBuffer);
636 if( null != s2 ) {
637 throw new InternalError("XXX");
638 }
639 } else if( null != s2 ) {
640 cleanBuffer(s2, syncBuffer);
641 }
642 }
643 private synchronized boolean cleanBuffer(final ByteBuffer mbb, final boolean syncBuffer) throws IOException {
644 syncSlice(mbb, syncBuffer);
645 if( !mbb.isDirect() ) {
646 return false;
647 }
648 if( !Buffers.Cleaner.clean(mbb) && CacheMode.FLUSH_PRE_HARD == cmode ) {
649 cmode = CacheMode.FLUSH_PRE_SOFT;
650 return false;
651 } else {
652 return true;
653 }
654 }
655
656 /**
657 * Return the used {@link CacheMode}.
658 * <p>
659 * If a desired {@link CacheMode} is not available, it may fall back to an available one at runtime,
660 * see {@link CacheMode#FLUSH_PRE_HARD}.<br>
661 * This evaluation only happens if the {@link CacheMode} != {@link CacheMode#FLUSH_NONE}
662 * and while attempting to flush an unused buffer slice.
663 * </p>
664 */
665 public final synchronized CacheMode getCacheMode() { return cmode; }
666
667 /**
668 * Returns the total size in bytes of the {@link InputStream}
669 * <pre>
670 * <code>0 <= {@link #position()} <= {@link #length()}</code>
671 * </pre>
672 */
673 // @Override
674 public final synchronized long length() {
675 return totalSize;
676 }
677
678 /**
679 * Returns the number of remaining available bytes of the {@link InputStream},
680 * i.e. <code>{@link #length()} - {@link #position()}</code>.
681 * <pre>
682 * <code>0 <= {@link #position()} <= {@link #length()}</code>
683 * </pre>
684 * <p>
685 * In contrast to {@link InputStream}'s {@link #available()} method,
686 * this method returns the proper return type {@code long}.
687 * </p>
688 * @throws IOException if a buffer slice operation failed.
689 */
690 public final synchronized long remaining() throws IOException {
691 return 0 < refCount ? totalSize - position() : 0;
692 }
693
694 /**
695 * <i>See {@link #remaining()} for an accurate variant.</i>
696 * <p>
697 * {@inheritDoc}
698 * </p>
699 * @throws IOException if a buffer slice operation failed.
700 */
701 @Override
702 public final synchronized int available() throws IOException {
703 final long available = remaining();
704 return available <= Integer.MAX_VALUE ? (int)available : Integer.MAX_VALUE;
705 }
706
707 /**
708 * Returns the absolute position of the {@link InputStream}.
709 * <pre>
710 * <code>0 <= {@link #position()} <= {@link #length()}</code>
711 * </pre>
712 * @throws IOException if a buffer slice operation failed.
713 */
714 // @Override
715 public final synchronized long position() throws IOException {
716 if( 0 < refCount ) {
717 return ( (long)sliceIdx << sliceShift ) + currentSlice().position();
718 } else {
719 return 0;
720 }
721 }
722
723 /**
724 * Sets the absolute position of the {@link InputStream} to {@code newPosition}.
725 * <pre>
726 * <code>0 <= {@link #position()} <= {@link #length()}</code>
727 * </pre>
728 * @param newPosition The new position, which must be non-negative and &le; {@link #length()}.
729 * @return this instance
730 * @throws IOException if a buffer slice operation failed or stream is {@link #close() closed}.
731 */
732 // @Override
733 public final synchronized MappedByteBufferInputStream position( final long newPosition ) throws IOException {
734 checkOpen();
735 if ( totalSize < newPosition || 0 > newPosition ) {
736 throw new IllegalArgumentException("new position "+newPosition+" not within [0.."+totalSize+"]");
737 }
738 final int preSlice = sliceIdx;
739
740 if ( totalSize == newPosition ) {
741 // EOF, pos == maxPos + 1
742 sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
743 if( preSlice != sliceIdx ) {
744 flushSlice(preSlice, synchronous);
745 }
746 final ByteBuffer s = currentSlice();
747 s.position( s.capacity() );
748 } else {
749 sliceIdx = (int)( newPosition >>> sliceShift );
750 if( preSlice != sliceIdx ) {
751 flushSlice(preSlice, synchronous);
752 }
753 currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
754 }
755 return this;
756 }
757 private final synchronized void position2( final long newPosition ) throws IOException {
758 if ( totalSize == newPosition ) {
759 // EOF, pos == maxPos + 1
760 sliceIdx = Math.max(0, sliceCount - 1); // handle zero size
761 final ByteBuffer s = currentSlice();
762 s.position( s.capacity() );
763 } else {
764 sliceIdx = (int)( newPosition >>> sliceShift );
765 currentSlice().position( (int)( newPosition - ( (long)sliceIdx << sliceShift ) ) );
766 }
767 }
768
769 @Override
770 public final boolean markSupported() {
771 return true;
772 }
773
774 /**
775 * {@inheritDoc}
776 * <p>
777 * <i>Parameter {@code readLimit} is not used in this implementation,
778 * since 0.3.0
779 * </p>
780 */
781 @Override
782 public final synchronized void mark( final int readlimit ) {
783 if( 0 < refCount ) {
784 try {
785 mark = position();
786 } catch (final IOException e) {
787 throw new RuntimeException(e); // FIXME: oops
788 }
789 }
790 }
791
792 /**
793 * {@inheritDoc}
794 * @throws IOException if this stream has not been marked,
795 * a buffer slice operation failed or stream has been {@link #close() closed}.
796 */
797 @Override
798 public final synchronized void reset() throws IOException {
799 checkOpen();
800 if ( mark == -1 ) {
801 throw new IOException("mark not set");
802 }
803 position( mark );
804 }
805
806 /**
807 * {@inheritDoc}
808 * @throws IOException if a buffer slice operation failed or stream is {@link #close() closed}.
809 */
810 @Override
811 public final synchronized long skip( final long n ) throws IOException {
812 checkOpen();
813 if( 0 > n ) {
814 return 0;
815 }
816 final long pos = position();
817 final long rem = totalSize - pos; // remaining
818 final long s = Math.min( rem, n );
819 position( pos + s );
820 return s;
821 }
822
823 @Override
824 public final synchronized int read() throws IOException {
825 checkOpen();
826 ByteBuffer slice = currentSlice();
827 if ( !slice.hasRemaining() ) {
828 if ( null == ( slice = nextSlice() ) ) {
829 return -1;
830 }
831 }
832 return slice.get() & 0xFF;
833 }
834
835 @Override
836 public final synchronized int read( final byte[] b, final int off, final int len ) throws IOException {
837 checkOpen();
838 if (b == null) {
839 throw new NullPointerException();
840 } else if( off < 0 ||
841 len < 0 ||
842 off + len > b.length
843 ) {
844 throw new IndexOutOfBoundsException("offset "+off+", length "+len+", b.length "+b.length);
845 } else if ( 0 == len ) {
846 return 0;
847 }
848 final long totalRem = remaining();
849 if ( 0 == totalRem ) {
850 return -1;
851 }
852 final int maxLen = (int)Math.min( totalRem, len );
853 int read = 0;
854 while( read < maxLen ) {
855 ByteBuffer slice = currentSlice();
856 int currRem = slice.remaining();
857 if ( 0 == currRem ) {
858 if ( null == ( slice = nextSlice() ) ) {
859 throw new InternalError("Unexpected EOT");
860 }
861 currRem = slice.remaining();
862 }
863 final int currLen = Math.min( maxLen - read, currRem );
864 slice.get( b, off + read, currLen );
865 read += currLen;
866 }
867 return maxLen;
868 }
869
870 /**
871 * Perform similar to {@link #read(byte[], int, int)}
872 * with {@link ByteBuffer} instead of byte array.
873 * @param b the {@link ByteBuffer} sink, data is written at current {@link ByteBuffer#position()}
874 * @param len the number of bytes to read
875 * @return the number of bytes read, -1 for EOS
876 * @throws IOException if a buffer slice operation failed or stream has been {@link #close() closed}.
877 */
878 // @Override
879 public final synchronized int read(final ByteBuffer b, final int len) throws IOException {
880 checkOpen();
881 if (b == null) {
882 throw new NullPointerException();
883 } else if (len < 0 || len > b.remaining()) {
884 throw new IndexOutOfBoundsException("length "+len+", b "+b);
885 } else if ( 0 == len ) {
886 return 0;
887 }
888 final long totalRem = remaining();
889 if ( 0 == totalRem ) {
890 return -1;
891 }
892 final int maxLen = (int)Math.min( totalRem, len );
893 int read = 0;
894 while( read < maxLen ) {
895 ByteBuffer slice = currentSlice();
896 int currRem = slice.remaining();
897 if ( 0 == currRem ) {
898 if ( null == ( slice = nextSlice() ) ) {
899 throw new InternalError("Unexpected EOT");
900 }
901 currRem = slice.remaining();
902 }
903 final int currLen = Math.min( maxLen - read, currRem );
904 if( slice.hasArray() && b.hasArray() ) {
905 System.arraycopy(slice.array(), slice.arrayOffset() + slice.position(),
906 b.array(), b.arrayOffset() + b.position(),
907 currLen);
908 slice.position( slice.position() + currLen );
909 b.position( b.position() + currLen );
910 } else if( currLen == currRem ) {
911 b.put(slice);
912 } else {
913 final int _limit = slice.limit();
914 slice.limit(currLen);
915 try {
916 b.put(slice);
917 } finally {
918 slice.limit(_limit);
919 }
920 }
921 read += currLen;
922 }
923 return maxLen;
924 }
925}
An InputStream implementation based on an underlying FileChannel's memory mapped ByteBuffer,...
final synchronized void mark(final int readlimit)
final synchronized long remaining()
Returns the number of remaining available bytes of the InputStream, i.e.
final synchronized void setFileResizeOp(final FileResizeOp fileResizeOp)
final synchronized void notifyLengthChange(final long newTotalSize)
Notify this instance that the underlying FileChannel's size has been changed and adjusting this insta...
MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode)
Creates a new instance using the given FileChannel, given mapping-mode, given cache-mode and the DEFA...
final synchronized long skip(final long n)
final synchronized ByteBuffer currentSlice()
Return the mapped ByteBuffer slice at the current position().
final synchronized long length()
Returns the total size in bytes of the InputStream.
final synchronized MappedByteBufferInputStream position(final long newPosition)
Sets the absolute position of the InputStream to newPosition.
final synchronized ByteBuffer nextSlice()
Return the next mapped ByteBuffer slice from the current position(), implicitly setting position(long...
final synchronized long position()
Returns the absolute position of the InputStream.
final synchronized int available()
See remaining() for an accurate variant.
final synchronized void flushSlices()
Releases the mapped ByteBuffer slices.
static final int DEFAULT_SLICE_SHIFT
Default slice shift, i.e.
final synchronized CacheMode getCacheMode()
Return the used CacheMode.
final synchronized void setLength(final long newTotalSize)
Resize the underlying FileChannel's size and adjusting this instance via accordingly.
final synchronized boolean getSynchronous()
Return synchronous mode.
final synchronized int read(final byte[] b, final int off, final int len)
final synchronized void setSynchronous(final boolean s)
Enable or disable synchronous mode.
MappedByteBufferInputStream(final FileChannel fileChannel)
Creates a new instance using the given FileChannel, read-only mapping mode, CacheMode#FLUSH_PRE_HARD ...
final synchronized MappedByteBufferOutputStream getOutputStream(final FileResizeOp fileResizeOp)
Returns a new MappedByteBufferOutputStream instance sharing all resources of this input stream,...
MappedByteBufferInputStream(final FileChannel fileChannel, final FileChannel.MapMode mmode, final CacheMode cmode, final int sliceShift)
Creates a new instance using the given FileChannel.
final synchronized int read(final ByteBuffer b, final int len)
Perform similar to read(byte[], int, int) with ByteBuffer instead of byte array.
final synchronized void flush(final boolean metaData)
Similar to OutputStream#flush(), synchronizes all mapped buffers from local storage via MappedByteBuf...
An OutputStream implementation based on an underlying FileChannel's memory mapped ByteBuffer.
Helper routines for logging and debugging.
Definition: Debug.java:35
static final boolean debug(final String subcomponent)
Definition: Debug.java:63
Platform Properties derived from Java properties.
static final OSType OS
static final CPUType CPU
Exposing types describing the underlying platform.
FLUSH_NONE
Keep all previous lazily cached buffer slices alive, useful for hopping readers, i....
FLUSH_PRE_HARD
Hard flush the previous lazily cached buffer slice when caching the next buffer slice,...
FLUSH_PRE_SOFT
Soft flush the previous lazily cached buffer slice when caching the next buffer slice,...
File resize interface allowing a file to change its size, e.g.