feat: Streaming files
This commit is contained in:
parent
dcf4aeb913
commit
0ac7b54a49
|
@ -1,12 +1,13 @@
|
|||
pub mod error;
|
||||
|
||||
use error::{handle_rejection, InternalServerError, NotFoundError};
|
||||
use hyper::Body;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::Parser as _;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::Level;
|
||||
use tracing::{debug, warn, Level};
|
||||
use warp::{
|
||||
filters::{any::any, path::param},
|
||||
http::HeaderMap,
|
||||
|
@ -24,9 +25,7 @@ pub struct Args {
|
|||
async fn main() {
|
||||
let config: &'static Args = Box::leak(Box::new(Args::parse()));
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(Level::TRACE)
|
||||
.init();
|
||||
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
|
||||
|
||||
let route = warp::get()
|
||||
.and(warp::path("track"))
|
||||
|
@ -50,10 +49,7 @@ pub async fn serve_local_tracks(
|
|||
// FIXME: for now, file paths need to be URL safe to be able to be requested
|
||||
location.push(PathBuf::from(track_name));
|
||||
|
||||
let (Ok(mut file), Some(filename)) = (
|
||||
tokio::fs::File::options().read(true).open(&location).await,
|
||||
location.file_name().map(|f| f.to_string_lossy()),
|
||||
) else {
|
||||
let Ok(mut file) = tokio::fs::File::options().read(true).open(&location).await else {
|
||||
return Err(NotFoundError(
|
||||
format!("The requested song could not be found on disk. Tried loading {location:?}")
|
||||
.into(),
|
||||
|
@ -62,17 +58,35 @@ pub async fn serve_local_tracks(
|
|||
};
|
||||
|
||||
// TODO: handle range requests
|
||||
// TODO: stream the file instead of fully reading it
|
||||
|
||||
let mut bytes = Vec::new();
|
||||
let _ = file.read_to_end(&mut bytes).await.map_err(|_| {
|
||||
InternalServerError(format!("Failed to read music file \"{filename}\" to the end").into())
|
||||
let (mut sender, body) = Body::channel();
|
||||
|
||||
debug!("Starting to stream file");
|
||||
tokio::task::spawn(async move {
|
||||
let mut buf = [0u8; 512];
|
||||
|
||||
loop {
|
||||
let bytes_read = match file.read(&mut buf).await {
|
||||
Ok(0) => break debug!("Done streaming a file"),
|
||||
Ok(bytes_read) => bytes_read,
|
||||
Err(err) => break warn!("Couldn't read part of a file. Got err: {err}"),
|
||||
};
|
||||
|
||||
let res = sender
|
||||
.send_data(Bytes::copy_from_slice(&buf[..bytes_read]))
|
||||
.await;
|
||||
|
||||
if res.is_err() {
|
||||
warn!("Failed to send some data to a client");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if let Ok(response) = hyper::Response::builder()
|
||||
.header("Content-Type", "audio/mpeg")
|
||||
// .header("Accept-Ranges", "bytes") // TODO: handle range requests
|
||||
.body(Bytes::from(bytes).into())
|
||||
.body(body)
|
||||
{
|
||||
Ok(response)
|
||||
} else {
|
||||
|
|
Loading…
Reference in a new issue