feat(server)!: Move all media serving functionality to its own module
This commit is contained in:
parent
0a60079a3f
commit
37d047c22f
|
@ -1,20 +1,12 @@
|
|||
pub mod error;
|
||||
pub mod media_server;
|
||||
|
||||
use error::{handle_rejection, InternalServerError, NotFoundError};
|
||||
use hyper::{body, Body};
|
||||
use error::handle_rejection;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use bytes::Bytes;
|
||||
use clap::Parser as _;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::{debug, span, warn, Instrument, Level};
|
||||
use warp::{
|
||||
filters::{any::any, path::param},
|
||||
http::HeaderMap,
|
||||
hyper,
|
||||
reject::Rejection,
|
||||
Filter,
|
||||
};
|
||||
use tracing::Level;
|
||||
use warp::Filter;
|
||||
|
||||
#[derive(clap::Parser, Clone)]
|
||||
pub struct Args {
|
||||
|
@ -27,86 +19,9 @@ async fn main() {
|
|||
|
||||
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
|
||||
|
||||
let route = warp::get()
|
||||
.and(warp::path("track"))
|
||||
.and(serve_local_tracks(config));
|
||||
let route = warp::path("media").and(media_server::serve_local_tracks(&config.music_dir));
|
||||
|
||||
warp::serve(route.recover(handle_rejection))
|
||||
.run(([127, 0, 0, 1], 8080))
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn serve_local_tracks(
|
||||
config: &'static Args,
|
||||
) -> impl Filter<Extract = (warp::reply::Response,), Error = Rejection> + Clone {
|
||||
any()
|
||||
.map(move || config)
|
||||
.and(warp::header::headers_cloned())
|
||||
.and(param())
|
||||
.and_then(handle_file_request)
|
||||
}
|
||||
|
||||
pub async fn handle_file_request(
|
||||
config: &Args,
|
||||
_headers: HeaderMap,
|
||||
// TODO: request files based off of their MusicBrainz Identifier instead
|
||||
track_name: String,
|
||||
) -> Result<warp::reply::Response, Rejection> {
|
||||
let mut location = config.music_dir.clone();
|
||||
// FIXME: for now, file paths need to be URL safe to be able to be requested
|
||||
location.push(PathBuf::from(track_name));
|
||||
|
||||
let (Ok(file), Some(file_name)) = (
|
||||
tokio::fs::File::options().read(true).open(&location).await,
|
||||
location.file_name().map(|name| name.to_string_lossy()),
|
||||
) else {
|
||||
return Err(NotFoundError(
|
||||
format!("The requested song could not be found on disk. Tried loading {location:?}")
|
||||
.into(),
|
||||
)
|
||||
.into());
|
||||
};
|
||||
|
||||
// TODO: handle range requests
|
||||
|
||||
let (sender, body) = Body::channel();
|
||||
|
||||
tokio::task::spawn(stream_file(sender, file).instrument(span!(
|
||||
Level::DEBUG,
|
||||
"stream_file",
|
||||
file_name = file_name.as_ref()
|
||||
)));
|
||||
|
||||
if let Ok(response) = hyper::Response::builder()
|
||||
.header("Content-Type", "audio/mpeg") // TODO: Infer from filetype
|
||||
// .header("Accept-Ranges", "bytes") // TODO: handle range requests
|
||||
.body(body)
|
||||
{
|
||||
Ok(response)
|
||||
} else {
|
||||
Err(InternalServerError("Failed to build Response".into()).into())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_file(mut dest: body::Sender, mut file: tokio::fs::File) {
|
||||
debug!("Starting stream");
|
||||
|
||||
let mut buf = [0u8; 512];
|
||||
|
||||
loop {
|
||||
let bytes_read = match file.read(&mut buf).await {
|
||||
Ok(0) => break debug!("Done streaming the file"),
|
||||
Ok(bytes_read) => bytes_read,
|
||||
Err(err) => break warn!("Couldn't read part of the file. Got err: {err}"),
|
||||
};
|
||||
|
||||
let res = dest
|
||||
.send_data(Bytes::copy_from_slice(&buf[..bytes_read]))
|
||||
.await;
|
||||
|
||||
if res.is_err() {
|
||||
warn!("Connection dropped by peer");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
89
server/src/media_server/mod.rs
Normal file
89
server/src/media_server/mod.rs
Normal file
|
@ -0,0 +1,89 @@
|
|||
use crate::error::{InternalServerError, NotFoundError};
|
||||
use hyper::{body, Body};
|
||||
use std::path::Path;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tracing::{debug, span, warn, Instrument, Level};
|
||||
use warp::{
|
||||
filters::{any::any, path::param},
|
||||
http::HeaderMap,
|
||||
hyper,
|
||||
reject::Rejection,
|
||||
Filter,
|
||||
};
|
||||
|
||||
pub fn serve_local_tracks(
|
||||
music_dir: &'_ Path,
|
||||
) -> impl Filter<Extract = (warp::reply::Response,), Error = Rejection> + Clone + '_ {
|
||||
any()
|
||||
.map(move || music_dir)
|
||||
.and(warp::header::headers_cloned())
|
||||
.and(param())
|
||||
.and_then(handle_file_request)
|
||||
}
|
||||
|
||||
pub async fn handle_file_request(
|
||||
music_dir: &Path,
|
||||
_headers: HeaderMap,
|
||||
// TODO: request files based off of their MusicBrainz Identifier instead
|
||||
track_name: String,
|
||||
) -> Result<warp::reply::Response, Rejection> {
|
||||
let mut location = music_dir.to_owned();
|
||||
// FIXME: for now, file paths need to be URL safe to be able to be requested
|
||||
location.push(track_name);
|
||||
|
||||
let (Ok(file), Some(file_name)) = (
|
||||
tokio::fs::File::options().read(true).open(&location).await,
|
||||
location.file_name().map(|name| name.to_string_lossy()),
|
||||
) else {
|
||||
return Err(NotFoundError(
|
||||
format!("The requested song could not be found on disk. Tried loading {location:?}")
|
||||
.into(),
|
||||
)
|
||||
.into());
|
||||
};
|
||||
|
||||
// TODO: handle range requests
|
||||
|
||||
let (sender, body) = Body::channel();
|
||||
|
||||
tokio::task::spawn(stream_file(sender, file).instrument(span!(
|
||||
Level::DEBUG,
|
||||
"stream_file",
|
||||
file_name = file_name.as_ref()
|
||||
)));
|
||||
|
||||
if let Ok(response) = hyper::Response::builder()
|
||||
.header("Content-Type", "audio/mpeg") // TODO: Infer from filetype
|
||||
// .header("Accept-Ranges", "bytes") // TODO: handle range requests
|
||||
.body(body)
|
||||
{
|
||||
Ok(response)
|
||||
} else {
|
||||
Err(InternalServerError("Failed to build Response".into()).into())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stream_file(mut dest: body::Sender, mut file: tokio::fs::File) {
|
||||
debug!("Starting stream");
|
||||
|
||||
let mut buf = [0u8; 512];
|
||||
|
||||
loop {
|
||||
let bytes_read = match file.read(&mut buf).await {
|
||||
Ok(0) => break debug!("Done streaming the file"),
|
||||
Ok(bytes_read) => bytes_read,
|
||||
Err(err) => break warn!("Couldn't read part of the file. Got err: {err}"),
|
||||
};
|
||||
|
||||
let res = dest
|
||||
.send_data(Bytes::copy_from_slice(&buf[..bytes_read]))
|
||||
.await;
|
||||
|
||||
if res.is_err() {
|
||||
warn!("Connection dropped by peer");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue