37 #include <sys/types.h>
43 #include <curl/curl.h>
60#if defined(__FreeBSD__)
61 typedef off_t off64_t;
62 #define __posix_openat64 ::openat
63 #define __posix_lseek64 ::lseek
65 #define __posix_openat64 ::openat64
66 #define __posix_lseek64 ::lseek64
75inline constexpr void copy_mem(
void* out,
const void* in,
size_t n)
noexcept {
76 if(in !=
nullptr && out !=
nullptr && n > 0) {
77 std::memmove(out, in,
sizeof(uint8_t)*n);
82 return 1 ==
read(&out, 1);
86 return 1 ==
peek(&out, 1, 0);
90 uint8_t buf[1024] = { 0 };
95 const size_t got =
read(buf, std::min(n,
sizeof(buf)));
107 if( 0 == length || !
good() ) {
110 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
111 copy_mem(out, m_source.data() + m_offset, got);
113 if( m_source.size() == m_offset ) {
120 return m_source.size() - m_offset >= n;
124 const size_t bytes_left = m_source.size() - m_offset;
125 if(peek_offset >= bytes_left) {
128 const size_t got = std::min(bytes_left - peek_offset, length);
129 copy_mem(out, &m_source[m_offset + peek_offset], got);
146 return "ByteInStream_SecMemory[content size "+
jau::to_decstring(m_source.size())+
149 ", iostate["+jau::io::to_string(
rdstate())+
154 if( 0 == length || !
good() ) {
157 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
159 while( total < length ) {
161 while ( ( len =
::read(m_fd, out_u8+total, length-total) ) < 0 ) {
162 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
171 DBG_PRINT(
"ByteInStream_File::read: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
174 total +=
static_cast<size_t>(len);
175 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
180 m_bytes_consumed += total;
185 if( 0 == length || !
good() ||
186 (
sizeof(
size_t) >=
sizeof(off64_t) && offset > std::numeric_limits<off64_t>::max() ) ||
187 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 ) ) {
194 abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(offset), SEEK_CUR);
197 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
198 offset,
to_string().c_str(), errno, strerror(errno));
202 if( abs_pos ==
static_cast<off64_t
>(offset) ) {
204 while ( ( len =
::read(m_fd, out, length) ) < 0 ) {
205 if ( errno == EAGAIN || errno == EINTR ) {
211 DBG_PRINT(
"ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
216 if(
__posix_lseek64(m_fd,
static_cast<off64_t
>(m_bytes_consumed), SEEK_SET) < 0 ) {
219 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
220 offset,
to_string().c_str(), errno, strerror(errno));
227 return is_open() &&
good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
232 stats(
fd), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
234 if( !stats.exists() || !stats.has_access() ) {
236 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
239 m_content_size = m_has_content_length ? stats.size() : 0;
243 DBG_PRINT(
"ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
250 stats(), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
254 std::string path2 = path.substr(7);
259 if( !stats.exists() || !stats.has_access() ) {
261 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
264 m_has_content_length =
true;
265 m_content_size = stats.size();
267 m_has_content_length =
false;
271 const int src_flags = O_RDONLY|
O_BINARY|O_NOCTTY;
272 if( stats.has_fd() ) {
273 m_fd = ::dup( stats.fd() );
279 DBG_PRINT(
"ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
299 ", fd "+std::to_string(m_fd)+
300 ", iostate["+jau::io::to_string(
rdstate())+
301 "], "+stats.to_string()+
314 DBG_PRINT(
"ByteInStream_URL: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
316 if( m_stream_resp->processing() ) {
320 m_buffer.close(
true );
321 if( m_stream_resp->thread.joinable() ) {
322 DBG_PRINT(
"ByteInStream_URL: close.1 %s, %s",
id().c_str(), m_buffer.toString().c_str());
323 m_stream_resp->thread.join();
326 m_stream_resp->thread.swap(
none);
327 DBG_PRINT(
"ByteInStream_URL: close.X %s, %s",
id().c_str(), to_string_int().c_str());
331 if( !
good() || !m_stream_resp->processing() ) {
333 return m_buffer.size() >= n;
335 m_stream_resp->header_resp.wait_until_completion(m_timeout);
336 if( m_stream_resp->has_content_length && m_stream_resp->content_length - m_bytes_consumed < n ) {
341 bool timeout_occured;
342 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
344 if( timeout_occured ) {
346 if( m_stream_resp->processing() ) {
349 m_buffer.interruptWriter();
359 return m_stream_resp->processing() || m_buffer.size() > 0;
363 m_stream_resp->header_resp.wait_until_completion(m_timeout);
364 if( 0 == length || !
good() ) {
367 bool timeout_occured =
false;
369 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
370 m_bytes_consumed += got;
371 if( timeout_occured ) {
373 if( m_stream_resp->processing() ) {
376 m_buffer.interruptWriter();
386 ERR_PRINT(
"ByteInStream_URL::peek not implemented");
391 if ( ( !m_stream_resp->processing() && m_buffer.isEmpty() ) ||
392 ( m_stream_resp->has_content_length && m_bytes_consumed >= m_stream_resp->content_length ) )
396 if( m_stream_resp->failed() ) {
402std::string ByteInStream_URL::to_string_int() const noexcept {
405 ", result "+
std::
to_string((int8_t)m_stream_resp->result.load())+
412 return "ByteInStream_URL["+to_string_int()+
"]";
419 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri,
timeout);
420 if(
nullptr != res && !res->fail() ) {
424 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
425 if(
nullptr != res && !res->fail() ) {
433 m_has_content_length(
false ), m_content_size( 0 ), m_total_xfered( 0 ), m_result(
io::io_result_t::NONE ),
438 DBG_PRINT(
"ByteInStream_Feed: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
443 m_buffer.close(
true );
444 DBG_PRINT(
"ByteInStream_Feed: close.X %s, %s",
id().c_str(), to_string_int().c_str());
450 return m_buffer.size() >= n;
452 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
457 bool timeout_occured;
458 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
460 if( timeout_occured ) {
465 m_buffer.interruptWriter();
479 if( 0 == length || !
good() ) {
482 bool timeout_occured =
false;
484 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
485 m_bytes_consumed += got;
486 if( timeout_occured ) {
491 m_buffer.interruptWriter();
501 ERR_PRINT(
"ByteInStream_Feed::peek not implemented");
507 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
519 bool timeout_occured;
520 if( m_buffer.putBlocking(in, in+length,
timeout, timeout_occured) ) {
521 m_total_xfered.fetch_add(length);
524 if( timeout_occured ) {
526 m_buffer.interruptWriter();
542 m_buffer.set_end_of_input(
true);
545std::string ByteInStream_Feed::to_string_int() const noexcept {
556 return "ByteInStream_Feed["+to_string_int()+
"]";
562 DBG_PRINT(
"ByteInStream_Recorder: close.X %s",
id().c_str());
567 m_rec_offset = m_bytes_consumed;
568 m_is_recording =
true;
572 m_is_recording =
false;
576 m_is_recording =
false;
582 const size_t consumed_bytes = m_parent.read(out, length);
583 m_bytes_consumed += consumed_bytes;
585 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
586 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
588 return consumed_bytes;
592 return "ByteInStream_Recorder[parent "+m_parent.id()+
", recording[on "+std::to_string(m_is_recording)+
595 ", iostate["+jau::io::to_string(
rdstate())+
"]]";
599 return 1 ==
write(&in, 1);
603 if( 0 == length ||
fail() ) {
606 const uint8_t* out_u8 =
static_cast<const uint8_t*
>(out);
608 while( total < length ) {
610 while ( ( len =
::write(m_fd, out_u8+total, length-total) ) < 0 ) {
611 if ( errno == EAGAIN || errno == EINTR ) {
620 DBG_PRINT(
"ByteOutStream_File::write: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
623 total +=
static_cast<size_t>(len);
629 m_bytes_consumed += total;
634: stats(
fd), m_fd(-1),
637 if( !stats.exists() || !stats.has_access() ) {
639 DBG_PRINT(
"ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
644 DBG_PRINT(
"ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
656 std::string path2 = path.substr(7);
661 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
663 DBG_PRINT(
"ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(),
to_string().c_str());
666 if( stats.has_fd() ) {
667 m_fd = ::dup( stats.fd() );
669 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|
O_BINARY|O_NOCTTY;
674 DBG_PRINT(
"ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
676 if( stats.is_file() ) {
680 ERR_PRINT(
"Failed to position existing file to end %s, errno %d %s",
681 to_string().c_str(), errno, strerror(errno));
700 ", fd "+std::to_string(m_fd)+
701 ", iostate["+jau::io::to_string(
rdstate())+
702 "], "+stats.to_string()+
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
void set_eof(const io_result_t result) noexcept
Set end-of-data (EOS), i.e.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
bool is_recording() noexcept
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
ByteInStream_SecMemory(const std::string &in)
Construct a secure memory source that reads from a string.
std::string to_string() const noexcept override
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
size_t read(void *, size_t) noexcept override
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
virtual size_t peek(void *out, size_t length, size_t peek_offset) noexcept=0
Read from the source but do not modify the internal offset.
size_t discard(size_t N) noexcept
Discard the next N bytes of the data.
virtual size_t read(void *out, size_t length) noexcept=0
Read from the source.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
int fd() const noexcept
Returns the file descriptor if is_open(), otherwise -1 for no file descriptor.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
bool fail() const noexcept
Checks if an error has occurred.
constexpr iostate rdstate_impl() const noexcept
bool good() const noexcept
Checks if no error nor eof() has occurred i.e.
bool timeout() const noexcept
Checks if a timeout (non-recoverable) has occurred.
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
std::string to_string(const endian_t v) noexcept
Return std::string representation of the given endian.
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
constexpr E & write(E &store, const E bits, bool set) noexcept
If set==true, sets the bits in store, i.e.
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
fraction< int64_t > fraction_i64
fraction using int64_t as integral type
AsyncStreamResponseRef read_url_stream_async(net_tk_handle handle, const std::string &url, http::PostRequestPtr httpPostReq, ByteRingbuffer *buffer, const AsyncStreamConsumerFunc &consumer_fn) noexcept
Asynchronous URL stream reader using the given AsyncStreamConsumerFunc consumer_fn.
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
io_result_t
I/O operation result value.
jau::function< bool(AsyncStreamResponse &, const uint8_t *, size_t, bool)> AsyncStreamConsumerFunc
Asynchronous stream consumer function.
std::string toString(io_result_t v) noexcept
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ none
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
relaxed_atomic_uint64 content_length
content_length tracking the content_length
CXX_ALWAYS_INLINE _Tp load() const noexcept