From 37d047c22f7a19ce558a29bd35c48c01de7a51e8 Mon Sep 17 00:00:00 2001 From: kale Date: Mon, 5 Aug 2024 00:19:19 +0200 Subject: [PATCH] feat(server)!: Move all media serving functionality to its own module --- server/src/main.rs | 95 ++-------------------------------- server/src/media_server/mod.rs | 89 +++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 90 deletions(-) create mode 100644 server/src/media_server/mod.rs diff --git a/server/src/main.rs b/server/src/main.rs index c1d0c91..ad929c3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,20 +1,12 @@ pub mod error; +pub mod media_server; -use error::{handle_rejection, InternalServerError, NotFoundError}; -use hyper::{body, Body}; +use error::handle_rejection; use std::path::PathBuf; -use bytes::Bytes; use clap::Parser as _; -use tokio::io::AsyncReadExt; -use tracing::{debug, span, warn, Instrument, Level}; -use warp::{ - filters::{any::any, path::param}, - http::HeaderMap, - hyper, - reject::Rejection, - Filter, -}; +use tracing::Level; +use warp::Filter; #[derive(clap::Parser, Clone)] pub struct Args { @@ -27,86 +19,9 @@ async fn main() { tracing_subscriber::fmt().with_max_level(Level::INFO).init(); - let route = warp::get() - .and(warp::path("track")) - .and(serve_local_tracks(config)); + let route = warp::path("media").and(media_server::serve_local_tracks(&config.music_dir)); warp::serve(route.recover(handle_rejection)) .run(([127, 0, 0, 1], 8080)) .await; } - -pub fn serve_local_tracks( - config: &'static Args, -) -> impl Filter + Clone { - any() - .map(move || config) - .and(warp::header::headers_cloned()) - .and(param()) - .and_then(handle_file_request) -} - -pub async fn handle_file_request( - config: &Args, - _headers: HeaderMap, - // TODO: request files based off of their MusicBrainz Identifier instead - track_name: String, -) -> Result { - let mut location = config.music_dir.clone(); - // FIXME: for now, file paths need to be URL safe to be able to be requested - location.push(PathBuf::from(track_name)); - - let (Ok(file), Some(file_name)) = ( - tokio::fs::File::options().read(true).open(&location).await, - location.file_name().map(|name| name.to_string_lossy()), - ) else { - return Err(NotFoundError( - format!("The requested song could not be found on disk. Tried loading {location:?}") - .into(), - ) - .into()); - }; - - // TODO: handle range requests - - let (sender, body) = Body::channel(); - - tokio::task::spawn(stream_file(sender, file).instrument(span!( - Level::DEBUG, - "stream_file", - file_name = file_name.as_ref() - ))); - - if let Ok(response) = hyper::Response::builder() - .header("Content-Type", "audio/mpeg") // TODO: Infer from filetype - // .header("Accept-Ranges", "bytes") // TODO: handle range requests - .body(body) - { - Ok(response) - } else { - Err(InternalServerError("Failed to build Response".into()).into()) - } -} - -pub async fn stream_file(mut dest: body::Sender, mut file: tokio::fs::File) { - debug!("Starting stream"); - - let mut buf = [0u8; 512]; - - loop { - let bytes_read = match file.read(&mut buf).await { - Ok(0) => break debug!("Done streaming the file"), - Ok(bytes_read) => bytes_read, - Err(err) => break warn!("Couldn't read part of the file. Got err: {err}"), - }; - - let res = dest - .send_data(Bytes::copy_from_slice(&buf[..bytes_read])) - .await; - - if res.is_err() { - warn!("Connection dropped by peer"); - break; - } - } -} diff --git a/server/src/media_server/mod.rs b/server/src/media_server/mod.rs new file mode 100644 index 0000000..5e2f80c --- /dev/null +++ b/server/src/media_server/mod.rs @@ -0,0 +1,89 @@ +use crate::error::{InternalServerError, NotFoundError}; +use hyper::{body, Body}; +use std::path::Path; + +use bytes::Bytes; +use tokio::io::AsyncReadExt; +use tracing::{debug, span, warn, Instrument, Level}; +use warp::{ + filters::{any::any, path::param}, + http::HeaderMap, + hyper, + reject::Rejection, + Filter, +}; + +pub fn serve_local_tracks( + music_dir: &'_ Path, +) -> impl Filter + Clone + '_ { + any() + .map(move || music_dir) + .and(warp::header::headers_cloned()) + .and(param()) + .and_then(handle_file_request) +} + +pub async fn handle_file_request( + music_dir: &Path, + _headers: HeaderMap, + // TODO: request files based off of their MusicBrainz Identifier instead + track_name: String, +) -> Result { + let mut location = music_dir.to_owned(); + // FIXME: for now, file paths need to be URL safe to be able to be requested + location.push(track_name); + + let (Ok(file), Some(file_name)) = ( + tokio::fs::File::options().read(true).open(&location).await, + location.file_name().map(|name| name.to_string_lossy()), + ) else { + return Err(NotFoundError( + format!("The requested song could not be found on disk. Tried loading {location:?}") + .into(), + ) + .into()); + }; + + // TODO: handle range requests + + let (sender, body) = Body::channel(); + + tokio::task::spawn(stream_file(sender, file).instrument(span!( + Level::DEBUG, + "stream_file", + file_name = file_name.as_ref() + ))); + + if let Ok(response) = hyper::Response::builder() + .header("Content-Type", "audio/mpeg") // TODO: Infer from filetype + // .header("Accept-Ranges", "bytes") // TODO: handle range requests + .body(body) + { + Ok(response) + } else { + Err(InternalServerError("Failed to build Response".into()).into()) + } +} + +pub async fn stream_file(mut dest: body::Sender, mut file: tokio::fs::File) { + debug!("Starting stream"); + + let mut buf = [0u8; 512]; + + loop { + let bytes_read = match file.read(&mut buf).await { + Ok(0) => break debug!("Done streaming the file"), + Ok(bytes_read) => bytes_read, + Err(err) => break warn!("Couldn't read part of the file. Got err: {err}"), + }; + + let res = dest + .send_data(Bytes::copy_from_slice(&buf[..bytes_read])) + .await; + + if res.is_err() { + warn!("Connection dropped by peer"); + break; + } + } +}