summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohn Preston <johnprestonmail@gmail.com>2019-12-19 18:14:05 +0300
committerJohn Preston <johnprestonmail@gmail.com>2019-12-19 18:14:05 +0300
commitf51f133832ee2024ff0f30560bf4c944860307df (patch)
tree27cc4189e51d4636a73cae7584171f248f2cf75a
parentc4319a73701eb1c606165e601c11fb47429810e2 (diff)
Send packets for processing in batches.
-rw-r--r--Telegram/SourceFiles/history/view/media/history_view_gif.cpp2
-rw-r--r--Telegram/SourceFiles/media/audio/media_audio_loaders.cpp6
-rw-r--r--Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h2
-rw-r--r--Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp5
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp25
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h4
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_file.cpp42
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_file.h6
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h7
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_player.cpp96
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_player.h4
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp45
-rw-r--r--Telegram/SourceFiles/media/streaming/media_streaming_video_track.h2
13 files changed, 168 insertions, 78 deletions
diff --git a/Telegram/SourceFiles/history/view/media/history_view_gif.cpp b/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
index af8b57c1a1..554621b7c6 100644
--- a/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
+++ b/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
@@ -1188,7 +1188,7 @@ void Gif::createStreamedPlayer() {
if (_streamed->instance.ready()) {
streamingReady(base::duplicate(_streamed->instance.info()));
}
- startStreamedPlayer();
+ checkStreamedIsStarted();
}
void Gif::startStreamedPlayer() const {
diff --git a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
index 44b3d534b9..82c0041a35 100644
--- a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
+++ b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
@@ -33,7 +33,11 @@ void Loaders::feedFromExternal(ExternalSoundPart &&part) {
QMutexLocker lock(&_fromExternalMutex);
invoke = _fromExternalQueues.empty()
&& _fromExternalForceToBuffer.empty();
- _fromExternalQueues[part.audio].push_back(std::move(part.packet));
+ auto &queue = _fromExternalQueues[part.audio];
+ queue.insert(
+ end(queue),
+ std::make_move_iterator(part.packets.begin()),
+ std::make_move_iterator(part.packets.end()));
}
if (invoke) {
_fromExternalNotify.call();
diff --git a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
index 7b9e18a8af..95fb9a3207 100644
--- a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
+++ b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
@@ -22,7 +22,7 @@ struct ExternalSoundData {
struct ExternalSoundPart {
AudioMsgId audio;
- FFmpeg::Packet packet;
+ gsl::span<FFmpeg::Packet> packets;
};
class ChildFFMpegLoader : public AbstractAudioFFMpegLoader {
diff --git a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
index e4dfbe18ec..68b56a3888 100644
--- a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
+++ b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
@@ -490,9 +490,10 @@ FFMpegReaderImplementation::PacketResult FFMpegReaderImplementation::readPacket(
if (res == AVERROR_EOF) {
if (_audioStreamId >= 0) {
// queue terminating packet to audio player
+ auto empty = FFmpeg::Packet();
Player::mixer()->feedFromExternal({
_audioMsgId,
- FFmpeg::Packet()
+ gsl::make_span(&empty, 1)
});
}
return PacketResult::EndOfFile;
@@ -519,7 +520,7 @@ void FFMpegReaderImplementation::processPacket(FFmpeg::Packet &&packet) {
// queue packet to audio player
Player::mixer()->feedFromExternal({
_audioMsgId,
- std::move(packet)
+ gsl::make_span(&packet, 1)
});
}
}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
index 00f21d856e..dc573c06c7 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
@@ -47,14 +47,21 @@ crl::time AudioTrack::streamDuration() const {
return _stream.duration;
}
-void AudioTrack::process(FFmpeg::Packet &&packet) {
- if (packet.empty()) {
+void AudioTrack::process(std::vector<FFmpeg::Packet> &&packets) {
+ if (packets.empty()) {
+ return;
+ } else if (packets.front().empty()) {
+ Assert(packets.size() == 1);
_readTillEnd = true;
}
- if (initialized()) {
- mixerEnqueue(std::move(packet));
- } else if (!tryReadFirstFrame(std::move(packet))) {
- _error(Error::InvalidData);
+ for (auto i = begin(packets), e = end(packets); i != e; ++i) {
+ if (initialized()) {
+ mixerEnqueue(gsl::make_span(&*i, (e - i)));
+ break;
+ } else if (!tryReadFirstFrame(std::move(*i))) {
+ _error(Error::InvalidData);
+ break;
+ }
}
}
@@ -148,10 +155,10 @@ void AudioTrack::callReady() {
base::take(_ready)({ VideoInformation(), data });
}
-void AudioTrack::mixerEnqueue(FFmpeg::Packet &&packet) {
+void AudioTrack::mixerEnqueue(gsl::span<FFmpeg::Packet> packets) {
Media::Player::mixer()->feedFromExternal({
_audioId,
- std::move(packet)
+ packets
});
}
@@ -172,8 +179,6 @@ void AudioTrack::resume(crl::time time) {
}
void AudioTrack::stop() {
- Expects(initialized());
-
if (_audioId.externalPlayId()) {
Media::Player::mixer()->stop(_audioId);
}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
index 5d05c58b2c..e0ecedfc04 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
@@ -46,7 +46,7 @@ public:
[[nodiscard]] crl::time streamDuration() const;
// Called from the same unspecified thread.
- void process(FFmpeg::Packet &&packet);
+ void process(std::vector<FFmpeg::Packet> &&packets);
void waitForData();
// Called from the main thread.
@@ -59,7 +59,7 @@ private:
[[nodiscard]] bool fillStateFromFrame();
[[nodiscard]] bool processFirstFrame();
void mixerInit();
- void mixerEnqueue(FFmpeg::Packet &&packet);
+ void mixerEnqueue(gsl::span<FFmpeg::Packet> packets);
void mixerForceToBuffer();
void callReady();
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
index cec4f762f0..19fd90eb59 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
@@ -16,6 +16,7 @@ namespace Streaming {
namespace {
constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
+constexpr auto kMaxQueuedPackets = 1024;
} // namespace
@@ -54,6 +55,7 @@ int File::Context::read(bytes::span buffer) {
buffer = buffer.subspan(0, amount);
while (!_reader->fill(_offset, buffer, &_semaphore)) {
+ processQueuedPackets(SleepPolicy::Disallowed);
_delegate->fileWaitingForData();
_semaphore.acquire();
if (_interrupted) {
@@ -264,6 +266,13 @@ void File::Context::start(crl::time position) {
return;
}
+ if (video.codec) {
+ _queuedPackets[video.index].reserve(kMaxQueuedPackets);
+ }
+ if (audio.codec) {
+ _queuedPackets[audio.index].reserve(kMaxQueuedPackets);
+ }
+
const auto header = _reader->headerSize();
if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
return fail(Error::OpenFailed);
@@ -287,24 +296,28 @@ void File::Context::readNextPacket() {
if (unroll()) {
return;
} else if (const auto packet = base::get_if<FFmpeg::Packet>(&result)) {
- const auto more = _delegate->fileProcessPacket(std::move(*packet));
- if (!more) {
- do {
- _reader->startSleep(&_semaphore);
- _semaphore.acquire();
- _reader->stopSleep();
- } while (!unroll() && !_delegate->fileReadMore());
+ const auto index = packet->fields().stream_index;
+ const auto i = _queuedPackets.find(index);
+ if (i == end(_queuedPackets)) {
+ return;
+ }
+ i->second.push_back(std::move(*packet));
+ if (i->second.size() == kMaxQueuedPackets) {
+ processQueuedPackets(SleepPolicy::Allowed);
}
} else {
// Still trying to read by drain.
Assert(result.is<FFmpeg::AvErrorWrap>());
Assert(result.get<FFmpeg::AvErrorWrap>().code() == AVERROR_EOF);
- handleEndOfFile();
+ processQueuedPackets(SleepPolicy::Allowed);
+ if (!finished()) {
+ handleEndOfFile();
+ }
}
}
void File::Context::handleEndOfFile() {
- const auto more = _delegate->fileProcessPacket(FFmpeg::Packet());
+ const auto more = _delegate->fileProcessEndOfFile();
if (_delegate->fileReadMore()) {
_readTillEnd = false;
auto error = FFmpeg::AvErrorWrap(av_seek_frame(
@@ -320,6 +333,17 @@ void File::Context::handleEndOfFile() {
}
}
+void File::Context::processQueuedPackets(SleepPolicy policy) {
+ const auto more = _delegate->fileProcessPackets(_queuedPackets);
+ if (!more && policy == SleepPolicy::Allowed) {
+ do {
+ _reader->startSleep(&_semaphore);
+ _semaphore.acquire();
+ _reader->stopSleep();
+ } while (!unroll() && !_delegate->fileReadMore());
+ }
+}
+
void File::Context::interrupt() {
_interrupted = true;
_semaphore.release();
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
index 773cbe9266..63450e89ee 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
@@ -58,6 +58,10 @@ private:
void waitTillInterrupted();
private:
+ enum class SleepPolicy {
+ Allowed,
+ Disallowed,
+ };
static int Read(void *opaque, uint8_t *buffer, int bufferSize);
static int64_t Seek(void *opaque, int64_t offset, int whence);
@@ -82,6 +86,7 @@ private:
// TODO base::expected.
[[nodiscard]] auto readPacket()
-> base::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap>;
+ void processQueuedPackets(SleepPolicy policy);
void handleEndOfFile();
void sendFullInCache(bool force = false);
@@ -89,6 +94,7 @@ private:
const not_null<FileDelegate*> _delegate;
const not_null<Reader*> _reader;
+ base::flat_map<int, std::vector<FFmpeg::Packet>> _queuedPackets;
int _offset = 0;
int _size = 0;
bool _failed = false;
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h b/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
index 9b001b2ded..2facdbe74b 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
@@ -29,9 +29,10 @@ public:
// Return true if reading and processing more packets is desired.
// Return false if sleeping until 'wake()' is called is desired.
- // Return true after the EOF packet if looping is desired.
- [[nodiscard]] virtual bool fileProcessPacket(
- FFmpeg::Packet &&packet) = 0;
+ [[nodiscard]] virtual bool fileProcessPackets(
+ base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) = 0;
+ // Return true if looping is desired.
+ [[nodiscard]] virtual bool fileProcessEndOfFile() = 0;
[[nodiscard]] virtual bool fileReadMore() = 0;
};
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
index 56c8aa8873..96f85e17fc 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
@@ -343,54 +343,82 @@ void Player::fileWaitingForData() {
}
}
-bool Player::fileProcessPacket(FFmpeg::Packet &&packet) {
+bool Player::fileProcessPackets(
+ base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) {
_waitingForData = false;
-
- const auto &native = packet.fields();
- const auto index = native.stream_index;
- if (packet.empty()) {
- _readTillEnd = true;
- setDurationByPackets();
- if (_audio) {
- const auto till = _loopingShift + computeAudioDuration();
+ auto audioTill = kTimeUnknown;
+ auto videoTill = kTimeUnknown;
+ for (auto &[index, list] : packets) {
+ if (list.empty()) {
+ continue;
+ }
+ if (_audio && _audio->streamIndex() == index) {
+ //for (const auto &packet : list) {
+ // // Maybe it is enough to count by list.back()?.. hope so.
+ // accumulate_max(
+ // _durationByLastAudioPacket,
+ // durationByPacket(*_audio, packet));
+ //}
+ accumulate_max(
+ _durationByLastAudioPacket,
+ durationByPacket(*_audio, list.back()));
+ const auto till = _loopingShift + std::clamp(
+ FFmpeg::PacketPosition(
+ list.back(),
+ _audio->streamTimeBase()),
+ crl::time(0),
+ computeAudioDuration() - 1);
crl::on_main(&_sessionGuard, [=] {
audioReceivedTill(till);
});
- _audio->process(FFmpeg::Packet());
- }
- if (_video) {
- const auto till = _loopingShift + computeVideoDuration();
+ _audio->process(base::take(list));
+ } else if (_video && _video->streamIndex() == index) {
+ //for (const auto &packet : list) {
+ // // Maybe it is enough to count by list.back()?.. hope so.
+ // accumulate_max(
+ // _durationByLastVideoPacket,
+ // durationByPacket(*_video, packet));
+ //}
+ accumulate_max(
+ _durationByLastVideoPacket,
+ durationByPacket(*_video, list.back()));
+ const auto till = _loopingShift + std::clamp(
+ FFmpeg::PacketPosition(
+ list.back(),
+ _video->streamTimeBase()),
+ crl::time(0),
+ computeVideoDuration() - 1);
crl::on_main(&_sessionGuard, [=] {
videoReceivedTill(till);
});
- _video->process(FFmpeg::Packet());
+ _video->process(base::take(list));
}
- } else if (_audio && _audio->streamIndex() == native.stream_index) {
- accumulate_max(
- _durationByLastAudioPacket,
- durationByPacket(*_audio, packet));
-
- const auto till = _loopingShift + std::clamp(
- FFmpeg::PacketPosition(packet, _audio->streamTimeBase()),
- crl::time(0),
- computeAudioDuration() - 1);
+ }
+ return fileReadMore();
+}
+
+bool Player::fileProcessEndOfFile() {
+ _waitingForData = false;
+ _readTillEnd = true;
+ setDurationByPackets();
+ const auto generateEmptyQueue = [] {
+ auto result = std::vector<FFmpeg::Packet>();
+ result.emplace_back();
+ return result;
+ };
+ if (_audio) {
+ const auto till = _loopingShift + computeAudioDuration();
crl::on_main(&_sessionGuard, [=] {
audioReceivedTill(till);
});
- _audio->process(std::move(packet));
- } else if (_video && _video->streamIndex() == native.stream_index) {
- accumulate_max(
- _durationByLastVideoPacket,
- durationByPacket(*_video, packet));
-
- const auto till = _loopingShift + std::clamp(
- FFmpeg::PacketPosition(packet, _video->streamTimeBase()),
- crl::time(0),
- computeVideoDuration() - 1);
+ _audio->process(generateEmptyQueue());
+ }
+ if (_video) {
+ const auto till = _loopingShift + computeVideoDuration();
crl::on_main(&_sessionGuard, [=] {
videoReceivedTill(till);
});
- _video->process(std::move(packet));
+ _video->process(generateEmptyQueue());
}
return fileReadMore();
}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
index 6182af4fb7..73eecb7d33 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
@@ -97,7 +97,9 @@ private:
void fileError(Error error) override;
void fileWaitingForData() override;
void fileFullInCache(bool fullInCache) override;
- bool fileProcessPacket(FFmpeg::Packet &&packet) override;
+ bool fileProcessPackets(
+ base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) override;
+ bool fileProcessEndOfFile() override;
bool fileReadMore() override;
// Called from the main thread.
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
index a1e1e4b2bd..0587394420 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
@@ -34,7 +34,7 @@ public:
FnMut<void(const Information &)> ready,
Fn<void(Error)> error);
- void process(FFmpeg::Packet &&packet);
+ void process(std::vector<FFmpeg::Packet> &&packets);
[[nodisacrd]] rpl::producer<> checkNextFrame() const;
[[nodisacrd]] rpl::producer<> waitingForData() const;
@@ -149,25 +149,42 @@ rpl::producer<> VideoTrackObject::waitingForData() const {
: _waitingForData.events();
}
-void VideoTrackObject::process(FFmpeg::Packet &&packet) {
- if (interrupted()) {
+void VideoTrackObject::process(std::vector<FFmpeg::Packet> &&packets) {
+ if (interrupted() || packets.empty()) {
return;
}
- if (packet.empty()) {
+ if (packets.front().empty()) {
+ Assert(packets.size() == 1);
_readTillEnd = true;
} else if (!_readTillEnd) {
+ //for (const auto &packet : packets) {
+ // // Maybe it is enough to count by list.back()?.. hope so.
+ // accumulate_max(
+ // _durationByLastPacket,
+ // durationByPacket(packet));
+ // if (interrupted()) {
+ // return;
+ // }
+ //}
accumulate_max(
_durationByLastPacket,
- durationByPacket(packet));
+ durationByPacket(packets.back()));
if (interrupted()) {
return;
}
}
- if (_shared->initialized()) {
- _stream.queue.push_back(std::move(packet));
- queueReadFrames();
- } else if (!tryReadFirstFrame(std::move(packet))) {
- fail(Error::InvalidData);
+ for (auto i = begin(packets), e = end(packets); i != e; ++i) {
+ if (_shared->initialized()) {
+ _stream.queue.insert(
+ end(_stream.queue),
+ std::make_move_iterator(i),
+ std::make_move_iterator(e));
+ queueReadFrames();
+ break;
+ } else if (!tryReadFirstFrame(std::move(*i))) {
+ fail(Error::InvalidData);
+ break;
+ }
}
}
@@ -547,6 +564,7 @@ void VideoTrackObject::callReady() {
? _stream.duration
: _syncTimePoint.trackTime;
base::take(_ready)({ data });
+ LOG(("READY CALLED!"));
}
TimePoint VideoTrackObject::trackTime() const {
@@ -887,11 +905,12 @@ crl::time VideoTrack::streamDuration() const {
return _streamDuration;
}
-void VideoTrack::process(FFmpeg::Packet &&packet) {
+void VideoTrack::process(std::vector<FFmpeg::Packet> &&packets) {
+ LOG(("PACKETS! (%1)").arg(packets.size()));
_wrapped.with([
- packet = std::move(packet)
+ packets = std::move(packets)
](Implementation &unwrapped) mutable {
- unwrapped.process(std::move(packet));
+ unwrapped.process(std::move(packets));
});
}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
index 3ed55c5ea5..434e061ee7 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
@@ -37,7 +37,7 @@ public:
[[nodiscard]] crl::time streamDuration() const;
// Called from the same unspecified thread.
- void process(FFmpeg::Packet &&packet);
+ void process(std::vector<FFmpeg::Packet> &&packets);
void waitForData();
// Called from the main thread.