jaulib v1.3.0
Jau Support Library (C++, Java, ..)
ByteInStreamUtil.java
Go to the documentation of this file.
1/**
2 * Author: Sven Gothel <sgothel@jausoft.com>
3 * Copyright (c) 2022 Gothel Software e.K.
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining
6 * a copy of this software and associated documentation files (the
7 * "Software"), to deal in the Software without restriction, including
8 * without limitation the rights to use, copy, modify, merge, publish,
9 * distribute, sublicense, and/or sell copies of the Software, and to
10 * permit persons to whom the Software is furnished to do so, subject to
11 * the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be
14 * included in all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 */
24package org.jau.io;
25
26import java.nio.ByteBuffer;
27
28/**
29 * This class represents an abstract byte input stream object.
30 *
31 * @anchor byte_in_stream_properties
32 * ### ByteInStream Properties
33 * The byte input stream can originate from a local source w/o delay,
34 * remote URL like http connection or even from another thread feeding the input buffer.<br />
35 * Both latter asynchronous resources may expose blocking properties
36 * in check_available().
37 *
38 * Asynchronous resources benefit from knowing their content size,
39 * as their check_available() implementation may avoid
40 * blocking and waiting for requested bytes available
41 * if the stream is already beyond its scope.
42 *
43 * One may use error() to detect whether an error has occurred,
44 * while end_of_data() not only covered the EOS case but includes error().
45 *
46 * @see @ref byte_in_stream_properties "ByteInStream Properties"
47 */
48public interface ByteInStreamUtil {
49 /**
50 * Stream consumer using a byte array
51 */
52 public static interface StreamConsumer1 {
53 /**
54 *
55 * @param data
56 * @param data_len
57 * @param is_final
58 * @return true to signal continuation, false to end streaming.
59 */
60 boolean consume(byte[] data, int data_len, boolean is_final);
61 }
62
63 /**
64 * Synchronous byte array input stream reader using the given {@link StreamConsumer1}.
65 *
66 * To abort streaming, user may return `false` from the given {@link StreamConsumer1#consume(byte[], long, boolean)}.
67 *
68 * @param in the input byte stream to read from
69 * @param buffer byte buffer passed down to {@link StreamConsumer1#consume(byte[], long, boolean)}
70 * @param consumer StreamConsumer consumer for each received heap of bytes, returning true to continue stream of false to abort.
71 * @return total bytes read or 0 if error
72 */
73 public static long read_stream(final ByteInStream in,
74 final byte buffer[],
75 final StreamConsumer1 consumer)
76 {
77 long total = 0;
78 boolean has_more = in.good();
79 while( has_more ) {
80 if( in.available(1) ) { // at least one byte to stream ..
81 final int got = in.read(buffer, 0, buffer.length);
82
83 total += got;
84 has_more = 1 <= got && in.good() && ( !in.has_content_size() || total < in.content_size() );
85 try {
86 if( !consumer.consume(buffer, got, !has_more) ) {
87 break; // end streaming
88 }
89 } catch (final Throwable e) {
90 PrintUtil.fprintf_td(System.err, "org.jau.nio.read_stream: Caught exception: %s", e.getMessage());
91 break; // end streaming
92 }
93 } else {
94 has_more = false;
95 consumer.consume(buffer, 0, true); // forced final, zero size
96 }
97 }
98 return total;
99 }
100
101 /**
102 * Stream consumer using a direct ByteBuffer
103 */
104 public static interface StreamConsumer2 {
105 /**
106 *
107 * @param data
108 * @param is_final
109 * @return true to signal continuation, false to end streaming.
110 */
111 boolean consume(ByteBuffer data, boolean is_final);
112 }
113
114 /**
115 * Synchronous direct ByteBuffer input stream reader using the given {@link StreamConsumer2}.
116 *
117 * To abort streaming, user may return `false` from the given {@link StreamConsumer2#consume(ByteBuffer, boolean)}.
118 *
119 * @param in the input byte stream to read from
120 * @param buffer byte buffer passed down to {@link StreamConsumer2#consume(ByteBuffer, boolean)}
121 * @param consumer StreamConsumer2 consumer for each received heap of bytes, returning true to continue stream of false to abort.
122 * @return total bytes read or 0 if error
123 */
124 public static long read_stream(final ByteInStream in,
125 final ByteBuffer buffer,
126 final StreamConsumer2 consumer)
127 {
128 long total = 0;
129 boolean has_more = in.good();
130 while( has_more ) {
131 if( in.available(1) ) { // at least one byte to stream ..
132 final int got = in.read(buffer);
133
134 total += got;
135 has_more = 1 <= got && in.good() && ( !in.has_content_size() || total < in.content_size() );
136 try {
137 if( !consumer.consume(buffer, !has_more) ) {
138 break; // end streaming
139 }
140 } catch (final Throwable e) {
141 PrintUtil.fprintf_td(System.err, "org.jau.nio.read_stream: Caught exception: %s", e.getMessage());
142 break; // end streaming
143 }
144 } else {
145 has_more = false;
146 }
147 }
148 return total;
149 }
150
151 /**
152 * Parses the given path_or_uri, if it matches a supported protocol, see {@link org.jau.io.UriTk#protocol_supported(String)},
153 * but is not a local file, see {@link org.jau.io.UriTk#is_local_file_protocol(String)}, ByteInStream_URL is being attempted.
154 *
155 * If the above fails, ByteInStream_File is attempted.
156 *
157 * If non of the above leads to a ByteInStream without {@link ByteInStreamUtil#error()}, null is returned.
158 *
159 * @param path_or_uri given path or uri for with a ByteInStream instance shall be established.
160 * @param timeoutMS a timeout in case ByteInStream_URL is being used as maximum duration in milliseconds to wait for next bytes at {@link ByteInStream_URL#available(long)}, defaults to 20_s
161 * @return a working ByteInStream w/o {@link ByteInStreamUtil#error()} or nullptr
162 */
163 public static ByteInStream to_ByteInStream(final String path_or_uri, final long timeoutMS) {
164 if( !org.jau.io.UriTk.is_local_file_protocol(path_or_uri) &&
165 org.jau.io.UriTk.protocol_supported(path_or_uri) )
166 {
167 final ByteInStream res = new ByteInStream_URL(path_or_uri, timeoutMS);
168 if( null != res && !res.fail() ) {
169 return res;
170 }
171 }
172 final ByteInStream res = new ByteInStream_File(path_or_uri);
173 if( null != res && !res.fail() ) {
174 return res;
175 }
176 return null;
177 }
178 /**
179 * Parses the given path_or_uri, if it matches a supported protocol, see {@link org.jau.io.UriTk#protocol_supported(String)},
180 * but is not a local file, see {@link org.jau.io.UriTk#is_local_file_protocol(String)}, ByteInStream_URL is being attempted.
181 *
182 * If the above fails, ByteInStream_File is attempted.
183 *
184 * If non of the above leads to a ByteInStream without {@link ByteInStreamUtil#error()}, null is returned.
185 *
186 * Method uses a timeout of 20_s for maximum duration to wait for next bytes at {@link ByteInStream_URL#available(long)}
187 *
188 * @param path_or_uri given path or uri for with a ByteInStream instance shall be established.
189 * @return a working ByteInStream w/o {@link ByteInStreamUtil#error()} or nullptr
190 */
191 public static ByteInStream to_ByteInStream(final String path_or_uri) {
192 return to_ByteInStream(path_or_uri, 20000);
193 }
194
195 public static void print_stats(final String prefix, final long out_bytes_total, final long td_ms) {
196 PrintUtil.fprintf_td(System.err, "%s: Duration %,d ms\n", prefix, td_ms);
197
198 if( out_bytes_total >= 100000000 ) {
199
200 PrintUtil.fprintf_td(System.err, "%s: Size %,d MB%n", prefix, Math.round(out_bytes_total/1000000.0));
201 } else if( out_bytes_total >= 100000 ) {
202 PrintUtil.fprintf_td(System.err, "%s: Size %,d KB%n", prefix, Math.round(out_bytes_total/1000.0));
203 } else {
204 PrintUtil.fprintf_td(System.err, "%s: Size %,d B%n", prefix, out_bytes_total);
205 }
206
207 final long _rate_bps = Math.round( out_bytes_total / ( td_ms / 1000.0 )); // bytes per second
208 final long _rate_bitps = Math.round( ( out_bytes_total * 8.0 ) / ( td_ms / 1000.0 ) ); // bits per second
209
210 if( _rate_bitps >= 100000000 ) {
211 PrintUtil.fprintf_td(System.err, "%s: Bitrate %,d Mbit/s, %,d MB/s%n", prefix,
212 Math.round(_rate_bitps/1000000.0),
213 Math.round(_rate_bps/1000000.0));
214 } else if( _rate_bitps >= 100000 ) {
215 PrintUtil.fprintf_td(System.err, "%s: Bitrate %,d kbit/s, %,d kB/s%n", prefix,
216 Math.round(_rate_bitps/10000), Math.round(_rate_bps/10000));
217 } else {
218 PrintUtil.fprintf_td(System.err, "%s: Bitrate %,d bit/s, %,d B/s%n", prefix,
219 _rate_bitps, _rate_bps);
220 }
221 }
222}
File based byte input stream, including named file descriptor.
Ringbuffer-Based byte input stream with a URL connection provisioned data feed.
static void fprintf_td(final PrintStream out, final String format, final Object ... args)
Convenient PrintStream#printf(String, Object...) invocation, prepending the elapsedTimeMillis() times...
Definition: PrintUtil.java:37
Stream consumer using a byte array.
boolean consume(byte[] data, int data_len, boolean is_final)
Stream consumer using a direct ByteBuffer.
boolean consume(ByteBuffer data, boolean is_final)
This class represents an abstract byte input stream object.
static ByteInStream to_ByteInStream(final String path_or_uri)
Parses the given path_or_uri, if it matches a supported protocol, see org.jau.io.UriTk#protocol_suppo...
static long read_stream(final ByteInStream in, final byte buffer[], final StreamConsumer1 consumer)
Synchronous byte array input stream reader using the given StreamConsumer1.
static void print_stats(final String prefix, final long out_bytes_total, final long td_ms)
static long read_stream(final ByteInStream in, final ByteBuffer buffer, final StreamConsumer2 consumer)
Synchronous direct ByteBuffer input stream reader using the given StreamConsumer2.
static ByteInStream to_ByteInStream(final String path_or_uri, final long timeoutMS)
Parses the given path_or_uri, if it matches a supported protocol, see org.jau.io.UriTk#protocol_suppo...
Abstract byte input stream object.
long content_size()
Returns the content_size if known.
boolean available(long n)
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
int read(byte out[], final int offset, final int length)
Read from the source.
boolean has_content_size()
Returns true if implementation is aware of content_size(), otherwise false.
boolean fail()
Checks if an error has occurred.
boolean good()
Checks if no error nor eof() has occurred i.e.