38 #include <sys/types.h>
44 #include <curl/curl.h>
61#if defined(__FreeBSD__)
62 typedef off_t off64_t;
63 #define __posix_openat64 ::openat
64 #define __posix_lseek64 ::lseek
66 #define __posix_openat64 ::openat64
67 #define __posix_lseek64 ::lseek64
76inline constexpr void copy_mem(
void* out,
const void* in,
size_t n)
noexcept {
77 if(in !=
nullptr && out !=
nullptr && n > 0) {
78 std::memmove(out, in,
sizeof(uint8_t)*n);
82bool ByteInStream::read(uint8_t& out)
noexcept {
83 return 1 == read(&out, 1);
86bool ByteInStream::peek(uint8_t& out)
noexcept {
87 return 1 == peek(&out, 1, 0);
90size_t ByteInStream::discard(
size_t n)
noexcept {
91 uint8_t buf[1024] = { 0 };
96 const size_t got = read(buf,
std::min(n,
sizeof(buf)));
107size_t ByteInStream_SecMemory::read(
void* out,
size_t length)
noexcept {
108 if( 0 == length || !good() ) {
111 const size_t got = std::min<size_t>(m_source.size() - m_offset, length);
112 copy_mem(out, m_source.data() + m_offset, got);
114 if( m_source.size() == m_offset ) {
115 setstate_impl( iostate::eofbit );
120bool ByteInStream_SecMemory::available(
size_t n)
noexcept {
121 return m_source.size() - m_offset >= n;
124size_t ByteInStream_SecMemory::peek(
void* out,
size_t length,
size_t peek_offset)
noexcept {
125 const size_t bytes_left = m_source.size() - m_offset;
126 if(peek_offset >= bytes_left) {
129 const size_t got =
std::min(bytes_left - peek_offset, length);
130 copy_mem(out, &m_source[m_offset + peek_offset], got);
134ByteInStream_SecMemory::ByteInStream_SecMemory(
const std::string& in)
147 return "ByteInStream_SecMemory[content size "+
jau::to_decstring(m_source.size())+
155static void append_bitstr(std::string& out, T mask, T bit,
const std::string& bitstr,
bool& comma) {
156 if( bit == ( mask & bit ) ) {
157 if( comma ) { out.append(
", "); }
158 out.append(bitstr); comma =
true;
162#define APPEND_BITSTR(U,V,W,M) append_bitstr(out, M, U::V, #W, comma);
164#define IOSTATE_ENUM(X,M) \
165 X(iostate,badbit,bad,M) \
166 X(iostate,eofbit,eof,M) \
167 X(iostate,failbit,fail,M) \
168 X(iostate,timeout,timeout,M)
181 if( 0 == length || !good() ) {
184 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
186 while( total < length ) {
188 while ( ( len = ::read(m_fd, out_u8+total, length-total) ) < 0 ) {
189 if ( errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR ) {
198 DBG_PRINT(
"ByteInStream_File::read: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
201 total +=
static_cast<size_t>(len);
202 if( 0 == len || ( m_has_content_length && m_bytes_consumed + total >= m_content_size ) ) {
207 m_bytes_consumed += total;
213 ( m_has_content_length && m_content_size - m_bytes_consumed < offset + 1 ) ) {
220 abs_pos =
__posix_lseek64(m_fd,
static_cast<off64_t
>(offset), SEEK_CUR);
223 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset1 %zd) in %s, errno %d %s",
224 offset,
to_string().c_str(), errno, strerror(errno));
228 if( abs_pos ==
static_cast<off64_t
>(offset) ) {
230 while ( ( len = ::read(m_fd, out, length) ) < 0 ) {
231 if ( errno == EAGAIN || errno == EINTR ) {
237 DBG_PRINT(
"ByteInStream_File::peak: Error occurred (read) in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
242 if(
__posix_lseek64(m_fd,
static_cast<off64_t
>(m_bytes_consumed), SEEK_SET) < 0 ) {
245 DBG_PRINT(
"ByteInStream_File::peek: Error occurred (offset2 %zd) in %s, errno %d %s",
246 offset,
to_string().c_str(), errno, strerror(errno));
253 return is_open() && good() && ( !m_has_content_length || m_content_size - m_bytes_consumed >= (uint64_t)n );
258 stats(fd), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
260 if( !stats.exists() || !stats.has_access() ) {
262 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
265 m_content_size = m_has_content_length ? stats.size() : 0;
269 DBG_PRINT(
"ByteInStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
276 stats(), m_fd(-1), m_has_content_length(
false), m_content_size(0), m_bytes_consumed(0)
280 std::string path2 = path.substr(7);
285 if( !stats.exists() || !stats.has_access() ) {
287 DBG_PRINT(
"ByteInStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
290 m_has_content_length =
true;
291 m_content_size = stats.size();
293 m_has_content_length =
false;
297 const int src_flags = O_RDONLY|
O_BINARY|O_NOCTTY;
298 if( stats.has_fd() ) {
299 m_fd = ::dup( stats.fd() );
305 DBG_PRINT(
"ByteInStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
334 m_header_sync(), m_has_content_length(
false ), m_content_size( 0 ),
339 m_url_thread =
read_url_stream(m_url, m_buffer, m_header_sync, m_has_content_length, m_content_size, m_total_xfered, m_result);
340 if(
nullptr == m_url_thread ) {
347 DBG_PRINT(
"ByteInStream_URL: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
353 m_buffer.
close(
true );
354 if(
nullptr != m_url_thread && m_url_thread->joinable() ) {
355 DBG_PRINT(
"ByteInStream_URL: close.1 %s, %s",
id().c_str(), m_buffer.
toString().c_str());
356 m_url_thread->join();
358 m_url_thread =
nullptr;
359 DBG_PRINT(
"ByteInStream_URL: close.X %s, %s",
id().c_str(), to_string_int().c_str());
365 return m_buffer.size() >= n;
367 m_header_sync.wait_until_completion(m_timeout);
368 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
373 bool timeout_occured;
374 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
376 if( timeout_occured ) {
381 m_buffer.interruptWriter();
395 m_header_sync.wait_until_completion(m_timeout);
396 if( 0 == length || !good() ) {
399 bool timeout_occured =
false;
401 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
402 m_bytes_consumed += got;
403 if( timeout_occured ) {
408 m_buffer.interruptWriter();
418 ERR_PRINT(
"ByteInStream_URL::peek not implemented");
424 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
434std::string ByteInStream_URL::to_string_int() const noexcept {
441 "], "+m_buffer.toString();
444 return "ByteInStream_URL["+to_string_int()+
"]";
451 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_URL>(path_or_uri,
timeout);
452 if(
nullptr != res && !res->fail() ) {
456 std::unique_ptr<ByteInStream> res = std::make_unique<ByteInStream_File>(path_or_uri);
457 if(
nullptr != res && !res->fail() ) {
470 DBG_PRINT(
"ByteInStream_Feed: close.0 %s, %s",
id().c_str(), to_string_int().c_str());
475 m_buffer.
close(
true );
476 DBG_PRINT(
"ByteInStream_Feed: close.X %s, %s",
id().c_str(), to_string_int().c_str());
482 return m_buffer.size() >= n;
484 if( m_has_content_length && m_content_size - m_bytes_consumed < n ) {
489 bool timeout_occured;
490 const size_t avail = m_buffer.waitForElements(n, m_timeout, timeout_occured);
492 if( timeout_occured ) {
497 m_buffer.interruptWriter();
511 if( 0 == length || !good() ) {
514 bool timeout_occured =
false;
516 const size_t got = m_buffer.getBlocking(
static_cast<uint8_t*
>(out), length, 1, m_timeout, timeout_occured);
517 m_bytes_consumed += got;
518 if( timeout_occured ) {
523 m_buffer.interruptWriter();
533 ERR_PRINT(
"ByteInStream_Feed::peek not implemented");
539 ( m_has_content_length && m_bytes_consumed >= m_content_size ) )
551 bool timeout_occured;
552 if( m_buffer.putBlocking(in, in+length,
timeout, timeout_occured) ) {
553 m_total_xfered.fetch_add(length);
556 if( timeout_occured ) {
558 m_buffer.interruptWriter();
574 m_buffer.set_end_of_input(
true);
577std::string ByteInStream_Feed::to_string_int() const noexcept {
584 "], "+m_buffer.toString();
588 return "ByteInStream_Feed["+to_string_int()+
"]";
594 DBG_PRINT(
"ByteInStream_Recorder: close.X %s",
id().c_str());
599 m_rec_offset = m_bytes_consumed;
600 m_is_recording =
true;
604 m_is_recording =
false;
608 m_is_recording =
false;
614 const size_t consumed_bytes = m_parent.read(out, length);
615 m_bytes_consumed += consumed_bytes;
616 if( is_recording() ) {
617 uint8_t* out_u8 =
static_cast<uint8_t*
>(out);
618 m_buffer.insert(m_buffer.end(), out_u8, out_u8+consumed_bytes);
620 return consumed_bytes;
624 return "ByteInStream_Recorder[parent "+m_parent.
id()+
", recording[on "+
std::to_string(m_is_recording)+
631 return 1 == write(&in, 1);
635 if( 0 == length || fail() ) {
638 const uint8_t* out_u8 =
static_cast<const uint8_t*
>(out);
640 while( total < length ) {
642 while ( ( len = ::write(m_fd, out_u8+total, length-total) ) < 0 ) {
643 if ( errno == EAGAIN || errno == EINTR ) {
652 DBG_PRINT(
"ByteOutStream_File::write: Error occurred in %s, errno %d %s",
to_string().c_str(), errno, strerror(errno));
655 total +=
static_cast<size_t>(len);
661 m_bytes_consumed += total;
666: stats(fd), m_fd(-1),
669 if( !stats.exists() || !stats.has_access() ) {
671 DBG_PRINT(
"ByteOutStream_File::ctor: Error, not an existing or accessible file in %s, %s", stats.to_string().c_str(),
to_string().c_str());
676 DBG_PRINT(
"ByteOutStream_File::ctor: Error occurred in %s, %s", stats.to_string().c_str(),
to_string().c_str());
688 std::string path2 = path.substr(7);
693 if( ( stats.exists() && !stats.is_file() && !stats.has_fd() ) || !stats.has_access() ) {
695 DBG_PRINT(
"ByteOutStream_File::ctor: Error, an existing non[file, fd] or not accessible element in %s, %s", stats.to_string().c_str(),
to_string().c_str());
698 if( stats.has_fd() ) {
699 m_fd = ::dup( stats.fd() );
701 const int dst_flags = ( stats.exists() ? 0 : O_CREAT|O_EXCL ) | O_WRONLY|
O_BINARY|O_NOCTTY;
706 DBG_PRINT(
"ByteOutStream_File::ctor: Error while opening %s, %s", stats.to_string().c_str(),
to_string().c_str());
708 if( stats.is_file() ) {
712 ERR_PRINT(
"Failed to position existing file to end %s, errno %d %s",
713 to_string().c_str(), errno, strerror(errno));
static void append_bitstr(std::string &out, T mask, T bit, const std::string &bitstr, bool &comma)
#define IOSTATE_ENUM(X, M)
constexpr void copy_mem(void *out, const void *in, size_t n) noexcept
#define APPEND_BITSTR(U, V, W, M)
Platform agnostic representation of POSIX ::lstat() and ::stat() for a given pathname.
std::string to_string() const noexcept
Returns a comprehensive string representation of this element.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool is_open() const noexcept override
Checks if the stream has an associated file.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void set_eof(const async_io_result_t result) noexcept
Set end-of-data (EOS), i.e.
ByteInStream_Feed(std::string id_name, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed externally provisioned byte input stream.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
iostate rdstate() const noexcept override
Returns the current state flags.
bool write(uint8_t in[], size_t length, const jau::fraction_i64 &timeout) noexcept
Write given bytes to the async ringbuffer using explicit given timeout.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
File based byte input stream, including named file descriptor.
std::string to_string() const noexcept override
size_t peek(void *, size_t, size_t) noexcept override
Read from the source but do not modify the internal offset.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
ByteInStream_File(const std::string &path) noexcept
Construct a stream based byte input stream from filesystem path.
bool available(size_t n) noexcept override
Return whether n bytes are available in the input stream, if has_content_size() or using an asynchron...
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void clear_recording() noexcept
Clears the recording.
iostate rdstate() const noexcept override
Returns the current state flags.
void start_recording() noexcept
Starts the recording.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t read(void *, size_t) noexcept override
Read from the source.
std::string to_string() const noexcept override
void stop_recording() noexcept
Stops the recording.
std::string to_string() const noexcept override
void close() noexcept override
Close the stream if supported by the underlying mechanism.
std::string to_string() const noexcept override
size_t read(void *out, size_t length) noexcept override
Read from the source.
ByteInStream_URL(std::string url, const jau::fraction_i64 &timeout) noexcept
Construct a ringbuffer backed Http byte input stream.
iostate rdstate() const noexcept override
Returns the current state flags.
bool available(size_t n) noexcept override
Check whether n bytes are available in the input stream.
bool has_content_size() const noexcept override
Returns true if implementation is aware of content_size(), otherwise false.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
size_t peek(void *out, size_t length, size_t peek_offset) noexcept override
Read from the source but do not modify the internal offset.
bool is_open() const noexcept override
Checks if the stream has an associated file.
Abstract byte input stream object.
virtual void close() noexcept=0
Close the stream if supported by the underlying mechanism.
virtual std::string id() const noexcept
return the id of this data source
File based byte output stream, including named file descriptor.
void close() noexcept override
Close the stream if supported by the underlying mechanism.
ByteOutStream_File(const std::string &path, const jau::fs::fmode_t mode=jau::fs::fmode_t::def_file_prot) noexcept
Construct a stream based byte output stream from filesystem path, either an existing or new file.
size_t write(const void *, size_t) noexcept override
Write to the data sink.
std::string to_string() const noexcept override
virtual size_t write(const void *in, size_t length) noexcept=0
Write to the data sink.
constexpr iostate rdstate_impl() const noexcept
virtual iostate rdstate() const noexcept
Returns the current state flags.
constexpr void setstate_impl(iostate state) const noexcept
Size_type size() const noexcept
Returns the number of elements in this ring buffer.
void close(const bool zeromem=false) noexcept
Close this ringbuffer by releasing all elements available and resizing capacity to zero.
bool isEmpty() const noexcept
Returns true if this ring buffer is empty, otherwise false.
std::string toString() const noexcept
Returns a short string representation incl.
#define ERR_PRINT(...)
Use for unconditional error messages, prefix '[elapsed_time] Error @ FILE:LINE FUNC: '.
#define DBG_PRINT(...)
Use for environment-variable environment::DEBUG conditional debug messages, prefix '[elapsed_time] De...
const uint8_t * cast_char_ptr_to_uint8(const char *s) noexcept
std::string to_string(const alphabet &v) noexcept
fmode_t
Generic file type and POSIX protection mode bits as used in file_stats, touch(), mkdir() etc.
constexpr ::mode_t posix_protection_bits(const fmode_t mask) noexcept
Returns the POSIX protection bits: rwx_all | set_uid | set_gid | sticky, i.e.
iostate
Mimic std::ios_base::iostate for state functionality, see iostate_func.
std::unique_ptr< ByteInStream > to_ByteInStream(const std::string &path_or_uri, jau::fraction_i64 timeout=20_s) noexcept
Parses the given path_or_uri, if it matches a supported protocol, see jau::io::uri::protocol_supporte...
bool is_local_file_protocol(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches the local file protocol, i.e.
uint64_t read_url_stream(const std::string &url, secure_vector< uint8_t > &buffer, const StreamConsumerFunc &consumer_fn) noexcept
Synchronous URL stream reader using the given StreamConsumerFunc consumer_fn.
async_io_result_t
Asynchronous I/O operation result value.
const size_t BEST_URLSTREAM_RINGBUFFER_SIZE
std::string to_string(const iostate mask) noexcept
bool protocol_supported(const std::string_view &uri) noexcept
Returns true if the uri-scheme of given uri matches a supported by libcurl network protocols otherwis...
@ goodbit
No error occurred nor has EOS being reached.
@ failbit
Input or output operation failed (formatting or extraction error).
@ eofbit
An input operation reached the end of its stream.
@ timeout
Input or output operation failed due to timeout.
@ NONE
Operation still in progress.
@ FAILED
Operation failed.
@ SUCCESS
Operation succeeded.
constexpr T min(const T x, const T y) noexcept
Returns the minimum of two integrals (w/ branching) in O(1)
constexpr T max(const T x, const T y) noexcept
Returns the maximum of two integrals (w/ branching) in O(1)
std::string to_decstring(const value_type &v, const char separator=',', const nsize_t width=0) noexcept
Produce a decimal string representation of an integral integer value.
__pack(...): Produces MSVC, clang and gcc compatible lead-in and -out macros.
CXX_ALWAYS_INLINE _Tp load() const noexcept