Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2707875
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
65 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/cli.rs b/src/cli.rs
index b0ecd41..6e1e050 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -1,426 +1,455 @@
use colored::Colorize;
use global_placeholders::global;
use macros_rs::{crashln, string, ternary};
use psutil::process::{MemoryInfo, Process};
+use regex::Regex;
use serde::Serialize;
use serde_json::json;
use std::env;
use pmc::{
config,
file::{self, Exists},
helpers::{self, ColoredString},
log,
process::Runner,
};
use tabled::{
settings::{
object::{Columns, Rows},
style::{BorderColor, Style},
themes::Colorization,
Color, Modify, Rotate, Width,
},
Table, Tabled,
};
#[derive(Clone, Debug)]
pub enum Args {
Id(usize),
Script(String),
}
fn format(server_name: &String) -> (String, String) {
let kind = ternary!(server_name == "internal", "", "remote ").to_string();
- let list_name = ternary!(*server_name == "internal", "all", &*server_name).to_string();
-
- return (kind, list_name);
+ return (kind, server_name.to_string());
}
pub fn get_version(short: bool) -> String {
return match short {
true => format!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
false => format!("{} ({} {}) [{}]", env!("CARGO_PKG_VERSION"), env!("GIT_HASH"), env!("BUILD_DATE"), env!("PROFILE")),
};
}
pub fn start(name: &Option<String>, args: &Option<Args>, watch: &Option<String>, server_name: &String) {
let mut runner = Runner::new();
let config = config::read();
+ let (kind, list_name) = format(server_name);
match args {
Some(Args::Id(id)) => {
- let (kind, list_name) = format(server_name);
println!("{} Applying {kind}action restartProcess on ({id})", *helpers::SUCCESS);
if *server_name == "internal" {
let item = runner.get(*id);
match watch {
Some(path) => item.watch(path),
None => item.disable_watch(),
}
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
log!("process started (id={id})");
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), false) {
- Some(mut runner) => runner.get(*id).restart(),
- None => println!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
- }
+ Some(mut remote) => remote.get(*id).restart(),
+ None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
}
}
println!("{} restarted {kind}({id}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
Some(Args::Script(script)) => {
let name = match name {
Some(name) => string!(name),
None => string!(script.split_whitespace().next().unwrap_or_default()),
};
+ if *server_name == "internal" {
+ let pattern = Regex::new(r"(?m)^[a-zA-Z0-9]+(/[a-zA-Z0-9]+)*(\.js|\.ts)?$").unwrap();
- // fix
- println!("{} Creating process with ({name})", *helpers::SUCCESS);
- if name.ends_with(".ts") || name.ends_with(".js") {
- let script = format!("{} {script}", config.runner.node);
- runner.start(&name, &script, watch).save();
+ if pattern.is_match(script) {
+ let script = format!("{} {script}", config.runner.node);
+ runner.start(&name, &script, file::cwd(), watch).save();
+ } else {
+ runner.start(&name, script, file::cwd(), watch).save();
+ }
+
+ log!("process created (name={name})");
} else {
- runner.start(&name, script, watch).save();
+ let Some(servers) = config::servers().servers else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
+
+ if let Some(server) = servers.get(server_name) {
+ match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(mut remote) => remote.start(&name, script, file::cwd(), watch),
+ None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
+ }
}
- println!("{} created ({name}) ✓", *helpers::SUCCESS);
- log!("process created (name={name})");
- list(&string!("default"), &string!("all"));
+ println!("{} Creating {kind}process with ({name})", *helpers::SUCCESS);
+
+ println!("{} {kind}created ({name}) ✓", *helpers::SUCCESS);
+ list(&string!("default"), &list_name);
}
None => {}
}
}
pub fn stop(id: &usize, server_name: &String) {
let (kind, list_name) = format(server_name);
println!("{} Applying {kind}action stopProcess on ({id})", *helpers::SUCCESS);
if *server_name == "internal" {
let mut runner = Runner::new();
runner.get(*id).stop();
log!("process stopped (id={id})");
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), false) {
- Some(mut runner) => runner.get(*id).stop(),
- None => println!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
- }
+ Some(mut remote) => remote.get(*id).stop(),
+ None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
}
}
println!("{} stopped {kind}({id}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
-pub fn remove(id: &usize) {
- println!("{} Applying action removeProcess on ({id})", *helpers::SUCCESS);
- Runner::new().remove(*id);
+pub fn remove(id: &usize, server_name: &String) {
+ let (kind, _) = format(server_name);
+ println!("{} Applying {kind}action removeProcess on ({id})", *helpers::SUCCESS);
+
+ if *server_name == "internal" {
+ Runner::new().remove(*id);
+ } else {
+ let Some(servers) = config::servers().servers else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
+
+ if let Some(server) = servers.get(server_name) {
+ match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(mut remote) => remote.remove(*id),
+ None => crashln!("{} Failed to remove (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
+ }
+ }
- println!("{} removed ({id}) ✓", *helpers::SUCCESS);
+ println!("{} removed {kind}({id}) ✓", *helpers::SUCCESS);
log!("process removed (id={id})");
}
pub fn info(id: &usize, format: &String) {
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "error log path ")]
log_error: String,
#[tabled(rename = "out log path")]
log_out: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "path hash")]
hash: String,
#[tabled(rename = "watching")]
watch: String,
#[tabled(rename = "exec cwd")]
path: String,
#[tabled(rename = "script command ")]
command: String,
#[tabled(rename = "script id")]
id: String,
restarts: u64,
uptime: String,
pid: String,
name: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"id": &self.id.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"path": &self.path.trim(),
"restarts": &self.restarts,
"watch": &self.watch.trim(),
"watch": &self.hash.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"log_out": &self.log_out.trim(),
"cpu": &self.cpu_percent.trim(),
"command": &self.command.trim(),
"mem": &self.memory_usage.trim(),
"log_error": &self.log_error.trim(),
});
trimmed_json.serialize(serializer)
}
}
if let Some(home) = home::home_dir() {
let config = config::read().runner;
let item = Runner::new().get(*id).clone();
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent =
match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status =
if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
let path = file::make_relative(&item.path, &home)
.map(|relative_path| relative_path.to_string_lossy().into_owned())
.unwrap_or_else(|| crashln!("{} Unable to get your current directory", *helpers::FAIL));
let data = vec![Info {
cpu_percent,
memory_usage,
id: string!(id),
restarts: item.restarts,
name: item.name.clone(),
path: format!("{} ", path),
status: ColoredString(status),
log_out: global!("pmc.logs.out", item.name.as_str()),
log_error: global!("pmc.logs.error", item.name.as_str()),
pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script),
hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
}];
let table = Table::new(data.clone())
.with(Rotate::Left)
.with(Style::rounded().remove_horizontals())
.with(Colorization::exact([Color::FG_CYAN], Columns::first()))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.to_string();
if let Ok(json) = serde_json::to_string(&data[0]) {
match format.as_str() {
"raw" => println!("{:?}", data[0]),
"json" => println!("{json}"),
_ => {
println!("{}\n{table}\n", format!("Describing process with id ({id})").on_bright_white().black());
println!(" {}", format!("Use `pmc logs {id} [--lines <num>]` to display logs").white());
println!(" {}", format!("Use `pmc env {id}` to display environment variables").white());
}
};
};
} else {
crashln!("{} Impossible to get your home directory", *helpers::FAIL);
}
}
pub fn logs(id: &usize, lines: &usize) {
let item = Runner::new().get(*id).clone();
let log_error = global!("pmc.logs.error", item.name.as_str());
let log_out = global!("pmc.logs.out", item.name.as_str());
if Exists::file(log_error.clone()).unwrap() && Exists::file(log_out.clone()).unwrap() {
println!("{}", format!("Showing last {lines} lines for process [{id}] (change the value with --lines option)").yellow());
file::logs(*lines, &log_error, *id, "error", &item.name);
file::logs(*lines, &log_out, *id, "out", &item.name);
} else {
crashln!("{} Logs for process ({id}) not found", *helpers::FAIL);
}
}
pub fn env(id: &usize) {
let item = Runner::new().get(*id).clone();
for (key, value) in item.env.iter() {
println!("{}: {}", key, value.green());
}
}
pub fn list(format: &String, server_name: &String) {
let render_list = |runner: &mut Runner| {
let mut processes: Vec<ProcessItem> = Vec::new();
#[derive(Tabled, Debug)]
struct ProcessItem {
id: ColoredString,
name: String,
pid: String,
uptime: String,
#[tabled(rename = "↺")]
restarts: String,
status: ColoredString,
cpu: String,
mem: String,
#[tabled(rename = "watching")]
watch: String,
}
impl serde::Serialize for ProcessItem {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"cpu": &self.cpu.trim(),
"mem": &self.mem.trim(),
"id": &self.id.0.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"watch": &self.watch.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"restarts": &self.restarts.trim(),
});
trimmed_json.serialize(serializer)
}
}
if runner.is_empty() {
println!("{} Process table empty", *helpers::SUCCESS);
} else {
for (id, item) in runner.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.0}%", percent),
None => string!("0%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
processes.push(ProcessItem {
status: ColoredString(status),
cpu: format!("{cpu_percent} "),
mem: format!("{memory_usage} "),
restarts: format!("{} ", item.restarts),
name: format!("{} ", item.name.clone()),
id: ColoredString(id.to_string().cyan().bold()),
pid: ternary!(item.running, format!("{} ", item.pid), string!("n/a ")),
watch: ternary!(item.watch.enabled, format!("{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{} ", helpers::format_duration(item.started)), string!("none ")),
});
}
let table = Table::new(&processes)
.with(Style::rounded().remove_verticals())
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.with(Colorization::exact([Color::FG_BRIGHT_CYAN], Rows::first()))
.with(Modify::new(Columns::single(1)).with(Width::truncate(35).suffix("... ")))
.to_string();
if let Ok(json) = serde_json::to_string(&processes) {
match format.as_str() {
"raw" => println!("{:?}", processes),
"json" => println!("{json}"),
"default" => println!("{table}"),
_ => {}
};
};
}
};
if let Some(servers) = config::servers().servers {
let mut failed: Vec<(String, String)> = vec![];
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), true) {
- Some(mut runner) => render_list(&mut runner),
+ Some(mut remote) => render_list(&mut remote),
None => println!("{} Failed to fetch (name={server_name}, address={})", *helpers::FAIL, server.address),
}
} else {
if matches!(&**server_name, "internal" | "all") {
println!("{} Internal daemon", *helpers::SUCCESS);
render_list(&mut Runner::new());
} else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL);
}
}
if *server_name == "all" {
for (name, server) in servers {
match Runner::connect(name.clone(), server.clone(), true) {
- Some(mut runner) => render_list(&mut runner),
+ Some(mut remote) => render_list(&mut remote),
None => failed.push((name, server.address)),
}
}
}
if !failed.is_empty() {
println!("{} Failed servers:", *helpers::FAIL);
failed
.iter()
.for_each(|server| println!(" {} {} {}", "-".yellow(), format!("{}", server.0), format!("[{}]", server.1).white()));
}
} else {
render_list(&mut Runner::new());
}
}
diff --git a/src/daemon/api/mod.rs b/src/daemon/api/mod.rs
index 1cbabc9..1cb5737 100644
--- a/src/daemon/api/mod.rs
+++ b/src/daemon/api/mod.rs
@@ -1,219 +1,219 @@
mod routes;
use crate::webui;
use bytes::Bytes;
use lazy_static::lazy_static;
use macros_rs::{crashln, fmtstr};
use pmc::{config, process};
use prometheus::{opts, register_counter, register_gauge, register_histogram, register_histogram_vec};
use prometheus::{Counter, Gauge, Histogram, HistogramVec};
use serde::Serialize;
use serde_json::json;
use static_dir::static_dir;
use std::{convert::Infallible, str::FromStr};
use utoipa::{OpenApi, ToSchema};
use utoipa_rapidoc::RapiDoc;
use warp::{
body, filters, get, header,
http::{StatusCode, Uri},
path, post, redirect, reject,
reply::{self, html, json},
serve, Filter, Rejection, Reply,
};
-use routes::{
- action_handler, dashboard, dump_handler, env_handler, info_handler, list_handler, log_handler, log_handler_raw, login, metrics_handler, prometheus_handler, rename_handler, view_process,
-};
-
#[derive(Serialize, ToSchema)]
struct ErrorMessage {
#[schema(example = 404)]
code: u16,
#[schema(example = "NOT_FOUND")]
message: String,
}
#[inline]
async fn convert_to_string(bytes: Bytes) -> Result<String, Rejection> { String::from_utf8(bytes.to_vec()).map_err(|_| reject()) }
#[inline]
fn string_filter(limit: u64) -> impl Filter<Extract = (String,), Error = Rejection> + Clone { body::content_length_limit(limit).and(body::bytes()).and_then(convert_to_string) }
lazy_static! {
pub static ref HTTP_COUNTER: Counter = register_counter!(opts!("http_requests_total", "Number of HTTP requests made.")).unwrap();
pub static ref DAEMON_START_TIME: Gauge = register_gauge!(opts!("process_start_time_seconds", "The uptime of the daemon.")).unwrap();
pub static ref DAEMON_MEM_USAGE: Histogram = register_histogram!("daemon_memory_usage", "The memory usage graph of the daemon.").unwrap();
pub static ref DAEMON_CPU_PERCENTAGE: Histogram = register_histogram!("daemon_cpu_percentage", "The cpu usage graph of the daemon.").unwrap();
pub static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!("http_request_duration_seconds", "The HTTP request latencies in seconds.", &["route"]).unwrap();
}
pub async fn start(webui: bool) {
const DOCS: &str = include_str!("docs/index.html");
let config = config::read().daemon.web;
let s_path = config::read().get_path();
let docs_path = fmtstr!("{}/docs.json", s_path.trim_end_matches('/').to_string());
let auth = header::exact("authorization", fmtstr!("token {}", config.secure.token));
let tmpl =
match webui::create_template_filter() {
Ok(template) => template,
Err(err) => crashln!("{err}"),
};
#[derive(OpenApi)]
#[openapi(
paths(
routes::action_handler,
routes::env_handler,
routes::info_handler,
routes::dump_handler,
routes::list_handler,
routes::log_handler,
routes::log_handler_raw,
routes::metrics_handler,
routes::prometheus_handler,
+ routes::create_handler,
routes::rename_handler
),
components(schemas(
ErrorMessage,
process::Log,
process::Raw,
process::Info,
process::Stats,
process::Watch,
process::ItemSingle,
process::ProcessItem,
routes::Stats,
routes::Daemon,
routes::Version,
routes::ActionBody,
+ routes::CreateBody,
routes::MetricsRoot,
routes::LogResponse,
routes::DocMemoryInfo,
routes::ActionResponse,
))
)]
struct ApiDoc;
- let app_dump = path!("dump").and(get()).and_then(dump_handler);
- let app_metrics = path!("metrics").and(get()).and_then(metrics_handler);
- let app_prometheus = path!("prometheus").and(get()).and_then(prometheus_handler);
+ let app_dump = path!("dump").and(get()).and_then(routes::dump_handler);
+ let app_metrics = path!("metrics").and(get()).and_then(routes::metrics_handler);
+ let app_prometheus = path!("prometheus").and(get()).and_then(routes::prometheus_handler);
let app_docs_json = path!("docs.json").and(get()).map(|| json(&ApiDoc::openapi()));
let app_docs = path!("docs").and(get()).map(|| html(RapiDoc::new(docs_path).custom_html(DOCS).to_html()));
- let process_list = path!("list").and(get()).and_then(list_handler);
- let process_env = path!("process" / usize / "env").and(get()).and_then(env_handler);
- let process_info = path!("process" / usize / "info").and(get()).and_then(info_handler);
- let process_logs = path!("process" / usize / "logs" / String).and(get()).and_then(log_handler);
- let process_raw_logs = path!("process" / usize / "logs" / String / "raw").and(get()).and_then(log_handler_raw);
- let process_action = path!("process" / usize / "action").and(post()).and(body::json()).and_then(action_handler);
- let process_rename = path!("process" / usize / "rename").and(post()).and(string_filter(1024 * 16)).and_then(rename_handler);
+ let process_list = path!("list").and(get()).and_then(routes::list_handler);
+ let process_env = path!("process" / usize / "env").and(get()).and_then(routes::env_handler);
+ let process_info = path!("process" / usize / "info").and(get()).and_then(routes::info_handler);
+ let process_logs = path!("process" / usize / "logs" / String).and(get()).and_then(routes::log_handler);
+ let process_raw_logs = path!("process" / usize / "logs" / String / "raw").and(get()).and_then(routes::log_handler_raw);
+ let process_create = path!("process" / "create").and(post()).and(body::json()).and_then(routes::create_handler);
+ let process_action = path!("process" / usize / "action").and(post()).and(body::json()).and_then(routes::action_handler);
+ let process_rename = path!("process" / usize / "rename").and(post()).and(string_filter(1024 * 16)).and_then(routes::rename_handler);
- let web_login = warp::get().and(path!("login")).and(tmpl.clone()).and_then(login);
- let web_dashboard = warp::get().and(path::end()).and(tmpl.clone()).and_then(dashboard);
- let web_view_process = warp::get().and(path!("view" / usize)).and(tmpl.clone()).and_then(view_process);
+ let web_login = warp::get().and(path!("login")).and(tmpl.clone()).and_then(routes::login);
+ let web_dashboard = warp::get().and(path::end()).and(tmpl.clone()).and_then(routes::dashboard);
+ let web_view_process = warp::get().and(path!("view" / usize)).and(tmpl.clone()).and_then(routes::view_process);
let log = warp::log::custom(|info| {
log!(
"[api] {} (method={}, status={}, ms={:?}, ver={:?})",
info.path(),
info.method(),
info.status().as_u16(),
info.elapsed(),
info.version()
)
});
let base = s_path
.split('/')
.enumerate()
.filter(|(_, p)| !p.is_empty() || *p == s_path)
.fold(warp::any().boxed(), |f, (_, path)| f.and(warp::path(path.to_owned())).boxed());
let routes = process_list
.or(process_env)
.or(process_info)
.or(process_logs)
.or(process_raw_logs)
+ .or(process_create)
.or(process_action)
.or(process_rename)
.or(app_metrics)
.or(app_prometheus)
.or(app_dump);
let use_routes_basic = || async {
let base_route = path::end().map(|| json(&json!({"healthy": true})).into_response());
let internal = match config.secure.enabled {
true => routes.clone().and(auth).or(root_redirect()).or(base_route).or(app_docs_json).or(app_docs).boxed(),
false => routes.clone().or(root_redirect()).or(base_route).or(app_docs_json).or(app_docs).boxed(),
};
serve(base.clone().and(internal).recover(handle_rejection).with(log)).run(config::read().get_address()).await
};
let use_routes_web = || async {
let web_routes = web_login.or(web_dashboard).or(web_view_process).or(static_dir!("src/webui/assets"));
let internal = match config.secure.enabled {
true => routes.clone().and(auth).or(root_redirect()).or(web_routes).or(app_docs_json).or(app_docs).boxed(),
false => routes.clone().or(root_redirect()).or(web_routes).or(app_docs_json).or(app_docs).boxed(),
};
serve(base.clone().and(internal).recover(handle_rejection).with(log)).run(config::read().get_address()).await
};
match webui {
true => use_routes_web().await,
false => use_routes_basic().await,
}
}
async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
let code;
let message;
HTTP_COUNTER.inc();
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
message = "NOT_FOUND";
} else if let Some(_) = err.find::<reject::MissingHeader>() {
code = StatusCode::UNAUTHORIZED;
message = "UNAUTHORIZED";
} else if let Some(_) = err.find::<reject::InvalidHeader>() {
code = StatusCode::UNAUTHORIZED;
message = "UNAUTHORIZED";
} else if let Some(_) = err.find::<reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
message = "METHOD_NOT_ALLOWED";
} else {
log!("[api] unhandled rejection (err={:?})", err);
code = StatusCode::INTERNAL_SERVER_ERROR;
message = "INTERNAL_SERVER_ERROR";
}
let json = json(&ErrorMessage {
code: code.as_u16(),
message: message.into(),
});
Ok(reply::with_status(json, code))
}
fn root_redirect() -> filters::BoxedFilter<(impl Reply,)> {
warp::path::full()
.and_then(move |path: path::FullPath| async move {
let path = path.as_str();
if path.ends_with("/") || path.contains(".") {
return Err(warp::reject());
}
Ok(redirect::redirect(Uri::from_str(&[path, "/"].concat()).unwrap()))
})
.boxed()
}
diff --git a/src/daemon/api/routes.rs b/src/daemon/api/routes.rs
index 939e64e..1ff5e00 100644
--- a/src/daemon/api/routes.rs
+++ b/src/daemon/api/routes.rs
@@ -1,432 +1,469 @@
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{string, ternary, then};
use prometheus::{Encoder, TextEncoder};
use psutil::process::{MemoryInfo, Process};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::convert::Infallible;
use tera::{Context, Tera};
use utoipa::ToSchema;
use pmc::{
file, helpers,
process::{dump, Runner},
};
use crate::daemon::{
api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM},
pid,
};
use warp::{
hyper::body::Body,
reject,
reply::{self, json, Response},
Rejection, Reply,
};
use std::{
env,
fs::{self, File},
io::{self, BufRead, BufReader},
+ path::PathBuf,
};
#[allow(dead_code)]
#[derive(ToSchema)]
#[schema(as = MemoryInfo)]
pub(crate) struct DocMemoryInfo {
rss: u64,
vms: u64,
#[cfg(target_os = "linux")]
shared: u64,
#[cfg(target_os = "linux")]
text: u64,
#[cfg(target_os = "linux")]
data: u64,
#[cfg(target_os = "macos")]
page_faults: u64,
#[cfg(target_os = "macos")]
pageins: u64,
}
#[derive(Deserialize, ToSchema)]
pub(crate) struct ActionBody {
#[schema(example = "restart")]
method: String,
}
+#[derive(Deserialize, ToSchema)]
+pub(crate) struct CreateBody {
+ #[schema(example = "app")]
+ name: Option<String>,
+ #[schema(example = "node index.js")]
+ script: String,
+ #[schema(example = "/projects/app")]
+ path: PathBuf,
+ #[schema(example = "src")]
+ watch: Option<String>,
+}
+
#[derive(Serialize, ToSchema)]
pub(crate) struct ActionResponse<'a> {
#[schema(example = true)]
done: bool,
#[schema(example = "name")]
action: &'a str,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct LogResponse {
logs: Vec<String>,
}
#[derive(Serialize, ToSchema)]
pub struct MetricsRoot<'a> {
pub version: Version<'a>,
pub daemon: Daemon,
}
#[derive(Serialize, ToSchema)]
pub struct Version<'a> {
#[schema(example = "v1.0.0")]
pub pkg: String,
pub hash: &'a str,
#[schema(example = "2000-01-01")]
pub build_date: &'a str,
#[schema(example = "release")]
pub target: &'a str,
}
#[derive(Serialize, ToSchema)]
pub struct Daemon {
pub pid: Option<i32>,
#[schema(example = true)]
pub running: bool,
pub uptime: String,
pub process_count: usize,
#[schema(example = "default")]
pub daemon_type: String,
pub stats: Stats,
}
#[derive(Serialize, ToSchema)]
pub struct Stats {
pub memory_usage: String,
pub cpu_percent: String,
}
#[inline]
fn attempt(done: bool, method: &str) -> reply::Json {
let data = json!(ActionResponse {
done,
action: ternary!(done, method, "DOES_NOT_EXIST")
});
json(&data)
}
#[inline]
fn render(name: &str, tmpl: &Tera, ctx: &Context) -> Result<String, Rejection> { tmpl.render(name, &ctx).or(Err(reject::not_found())) }
#[inline]
pub async fn login(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("login", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn dashboard(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("dashboard", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn view_process(id: usize, store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
ctx.insert("process_id", &id);
let payload = render("view", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/prometheus", responses((status = 200, description = "Get prometheus metrics", body = String)))]
pub async fn prometheus_handler() -> Result<impl Reply, Infallible> {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(format!("{}", String::from_utf8(buffer.clone()).unwrap()))
}
#[inline]
#[utoipa::path(get, path = "/dump", tag = "Process", responses((status = 200, description = "Dump processes successfully", body = [u8])))]
pub async fn dump_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(dump::raw())
}
#[inline]
#[utoipa::path(get, path = "/list", tag = "Process", responses((status = 200, description = "List processes successfully", body = [ProcessItem])))]
pub async fn list_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
let data = Runner::new().json();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(json(&data))
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
"error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
_ => global!("pmc.logs.out", item.name.as_str()),
};
match File::open(log_file) {
Ok(data) => {
let reader = BufReader::new(data);
let logs: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
timer.observe_duration();
Ok(json(&json!(LogResponse { logs })))
}
Err(_) => Ok(json(&json!(LogResponse { logs: vec![] }))),
}
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched raw", body = String),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler_raw(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
"error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
_ => global!("pmc.logs.out", item.name.as_str()),
};
let data = match fs::read_to_string(log_file) {
Ok(data) => data,
Err(err) => err.to_string(),
};
timer.observe_duration();
Ok(Response::new(Body::from(data)))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/info",
params(("id" = usize, Path, description = "Process id to get information for", example = 0)),
responses(
(status = 200, description = "Current process info retrieved", body = ItemSingle),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn info_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(json(&item.clone().json()))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
+#[inline]
+#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = String),
+ responses(
+ (status = 200, description = "Create process successful", body = ActionResponse),
+ (status = INTERNAL_SERVER_ERROR, description = "Failed to create process", body = ErrorMessage)
+ )
+)]
+pub async fn create_handler(body: CreateBody) -> Result<impl Reply, Rejection> {
+ let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["create"]).start_timer();
+ let mut runner = Runner::new();
+
+ HTTP_COUNTER.inc();
+
+ let name = match body.name {
+ Some(name) => string!(name),
+ None => string!(body.script.split_whitespace().next().unwrap_or_default()),
+ };
+
+ runner.start(&name, &body.script, body.path, &body.watch).save();
+ timer.observe_duration();
+
+ Ok(attempt(true, "create"))
+}
+
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/rename", request_body(content = String),
params(("id" = usize, Path, description = "Process id to rename", example = 0)),
responses(
(status = 200, description = "Rename process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn rename_handler(id: usize, body: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
let mut runner = Runner::new();
HTTP_COUNTER.inc();
if runner.exists(id) {
let item = runner.get(id);
item.rename(body.trim().replace("\n", ""));
then!(item.running, item.restart());
timer.observe_duration();
Ok(attempt(true, "rename"))
} else {
timer.observe_duration();
Err(reject::not_found())
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/env",
params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)),
responses(
(status = 200, description = "Current process env", body = HashMap<String, String>),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn env_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(json(&item.clone().env))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody,
params(("id" = usize, Path, description = "Process id to run action on", example = 0)),
responses(
(status = 200, description = "Run action on process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage)
)
)]
pub async fn action_handler(id: usize, body: ActionBody) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
let mut runner = Runner::new();
let method = body.method.as_str();
HTTP_COUNTER.inc();
match method {
"start" | "restart" => {
runner.get(id).restart();
timer.observe_duration();
Ok(attempt(true, method))
}
"stop" | "kill" => {
runner.get(id).stop();
timer.observe_duration();
Ok(attempt(true, method))
}
"remove" | "delete" => {
runner.remove(id);
timer.observe_duration();
Ok(attempt(true, method))
}
_ => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/metrics", responses((status = 200, description = "Get daemon metrics", body = MetricsRoot)))]
pub async fn metrics_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer();
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f32> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_rmp(global!("pmc.dump"));
HTTP_COUNTER.inc();
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(mut process) = Process::new(process_id as u32) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
}
}
let memory_usage =
match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
let response = json!(MetricsRoot {
version: Version {
pkg: format!("v{}", env!("CARGO_PKG_VERSION")),
hash: env!("GIT_HASH_FULL"),
build_date: env!("BUILD_DATE"),
target: env!("PROFILE"),
},
daemon: Daemon {
pid: pid,
running: pid::exists(),
uptime: uptime,
process_count: runner.count(),
daemon_type: global!("pmc.daemon.kind"),
stats: Stats { memory_usage, cpu_percent }
}
});
timer.observe_duration();
Ok(json(&response))
}
diff --git a/src/main.rs b/src/main.rs
index ebe1f69..5eb3021 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,169 +1,169 @@
mod cli;
mod daemon;
mod globals;
mod webui;
use crate::cli::Args;
use clap::{Parser, Subcommand};
use clap_verbosity_flag::Verbosity;
use macros_rs::{str, string, then};
fn validate_id_script(s: &str) -> Result<Args, String> {
if let Ok(id) = s.parse::<usize>() {
Ok(Args::Id(id))
} else {
Ok(Args::Script(s.to_owned()))
}
}
#[derive(Parser)]
#[command(version = str!(cli::get_version(false)))]
struct Cli {
#[command(subcommand)]
command: Commands,
#[clap(flatten)]
verbose: Verbosity,
}
#[derive(Subcommand)]
enum Daemon {
/// Reset process index
#[command(alias = "clean")]
Reset,
/// Stop daemon
#[command(alias = "kill")]
Stop,
/// Restart daemon
#[command(alias = "restart", alias = "start")]
Restore {
/// Daemon api
#[arg(long)]
api: bool,
/// WebUI using api
#[arg(long)]
webui: bool,
},
/// Check daemon
#[command(alias = "info", alias = "status")]
Health {
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
},
}
// add pmc restore command
#[derive(Subcommand)]
enum Commands {
/// Start/Restart a process
#[command(alias = "restart")]
Start {
/// Process name
#[arg(long)]
name: Option<String>,
#[clap(value_parser = validate_id_script)]
args: Option<Args>,
/// Watch to reload path
#[arg(long)]
watch: Option<String>,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Stop/Kill a process
#[command(alias = "kill")]
Stop {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Stop then remove a process
#[command(alias = "rm")]
Remove {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Get env of a process
#[command(alias = "cmdline")]
Env {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Get information of a process
#[command(alias = "info")]
Details {
id: usize,
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// List all processes
#[command(alias = "ls")]
List {
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
/// Server
#[arg(short, long, default_value_t = string!("all"))]
server: String,
},
/// Get logs from a process
Logs {
id: usize,
#[arg(long, default_value_t = 15, help = "")]
lines: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Daemon management
Daemon {
#[command(subcommand)]
command: Daemon,
},
}
fn main() {
let cli = Cli::parse();
let mut env = env_logger::Builder::new();
let level = cli.verbose.log_level_filter();
globals::init();
env.filter_level(level).init();
match &cli.command {
Commands::Start { name, args, watch, server } => cli::start(name, args, watch, server),
Commands::Stop { id, server } => cli::stop(id, server),
- Commands::Remove { id, server } => cli::remove(id),
+ Commands::Remove { id, server } => cli::remove(id, server),
Commands::Env { id, server } => cli::env(id),
Commands::Details { id, format, server } => cli::info(id, format),
Commands::List { format, server } => cli::list(format, server),
Commands::Logs { id, lines, server } => cli::logs(id, lines),
Commands::Daemon { command } => match command {
Daemon::Stop => daemon::stop(),
Daemon::Reset => daemon::reset(),
Daemon::Health { format } => daemon::health(format),
Daemon::Restore { api, webui } => daemon::restart(api, webui, level.as_str() != "ERROR"),
},
};
if !matches!(&cli.command, Commands::Daemon { .. }) {
then!(!daemon::pid::exists(), daemon::start(false));
}
}
diff --git a/src/process/http.rs b/src/process/http.rs
index d47ab91..83f9dd0 100644
--- a/src/process/http.rs
+++ b/src/process/http.rs
@@ -1,43 +1,58 @@
use crate::process::{Process, Remote};
use macros_rs::{fmtstr, string};
use reqwest::blocking::{Client, Response};
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use serde::Serialize;
-use std::collections::BTreeMap;
+use std::{collections::BTreeMap, path::PathBuf};
#[derive(Serialize)]
struct ActionBody {
pub method: String,
}
+#[derive(Serialize)]
+struct CreateBody<'c> {
+ pub name: &'c String,
+ pub script: &'c String,
+ pub path: PathBuf,
+ pub watch: &'c Option<String>,
+}
+
fn client(token: &Option<String>) -> (Client, HeaderMap) {
let client = Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert(AUTHORIZATION, HeaderValue::from_static(fmtstr!("token {token}")));
}
return (client, headers);
}
+pub fn create(Remote { address, token }: &Remote, name: &String, script: &String, path: PathBuf, watch: &Option<String>) -> Result<Response, anyhow::Error> {
+ let (client, headers) = client(token);
+ let content = CreateBody { name, script, path, watch };
+
+ Ok(client.post(fmtstr!("{address}/process/create")).json(&content).headers(headers).send()?)
+}
+
pub fn restart(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("restart") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
pub fn stop(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("stop") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
pub fn remove(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("remove") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 7ef1d6a..4ac10ba 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -1,434 +1,438 @@
mod http;
use crate::{
config,
config::structs::Server,
file, helpers,
service::{run, stop, ProcessMetadata},
};
use chrono::serde::ts_milliseconds;
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{crashln, string, ternary, then};
use psutil::process::{self, MemoryInfo};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::{BTreeMap, HashMap};
use std::{env, path::PathBuf};
use utoipa::ToSchema;
#[derive(Serialize, ToSchema)]
pub struct ItemSingle {
info: Info,
stats: Stats,
watch: Watch,
log: Log,
raw: Raw,
}
#[derive(Serialize, ToSchema)]
pub struct Info {
id: usize,
pid: i64,
name: String,
status: String,
#[schema(value_type = String, example = "/path")]
path: PathBuf,
uptime: String,
command: String,
}
#[derive(Serialize, ToSchema)]
pub struct Stats {
restarts: u64,
start_time: i64,
cpu_percent: Option<f32>,
memory_usage: Option<MemoryInfo>,
}
#[derive(Serialize, ToSchema)]
pub struct Log {
out: String,
error: String,
}
#[derive(Serialize, ToSchema)]
pub struct Raw {
running: bool,
crashed: bool,
crashes: u64,
}
#[derive(Serialize, ToSchema)]
pub struct ProcessItem {
pid: i64,
id: usize,
cpu: String,
mem: String,
name: String,
restarts: u64,
status: String,
uptime: String,
#[schema(example = "/path")]
watch_path: String,
#[schema(value_type = String, example = "2000-01-01T01:00:00.000Z")]
start_time: DateTime<Utc>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Process {
pub id: usize,
pub pid: i64,
pub name: String,
pub path: PathBuf,
pub script: String,
pub env: HashMap<String, String>,
#[serde(with = "ts_milliseconds")]
pub started: DateTime<Utc>,
pub restarts: u64,
pub running: bool,
pub crash: Crash,
pub watch: Watch,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Crash {
pub crashed: bool,
pub value: u64,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct Watch {
pub enabled: bool,
#[schema(example = "/path")]
pub path: String,
pub hash: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Runner {
pub id: id::Id,
#[serde(skip)]
pub remote: Option<Remote>,
pub list: BTreeMap<usize, Process>,
}
#[derive(Clone, Debug)]
pub struct Remote {
address: String,
token: Option<String>,
}
pub enum Status {
Offline,
Running,
}
impl Status {
pub fn to_bool(&self) -> bool {
match self {
Status::Offline => false,
Status::Running => true,
}
}
}
impl Runner {
pub fn new() -> Self { dump::read() }
pub fn connect(name: String, Server { address, token }: Server, verbose: bool) -> Option<Self> {
match dump::from(&address, token.as_deref()) {
Ok(dump) => {
then!(verbose, println!("{} Fetched remote (name={name}, address={address})", *helpers::SUCCESS));
return Some(Runner {
remote: Some(Remote { token, address: string!(address) }),
..dump
});
}
Err(err) => {
log::debug!("{err}");
return None;
}
}
}
- pub fn start(&mut self, name: &String, command: &String, watch: &Option<String>) -> &mut Self {
- let id = self.id.next();
- let config = config::read().runner;
- let crash = Crash { crashed: false, value: 0 };
-
- let watch = match watch {
- Some(watch) => Watch {
- enabled: true,
- path: string!(watch),
- hash: hash::create(file::cwd().join(watch)),
- },
- None => {
- Watch {
+ pub fn start(&mut self, name: &String, command: &String, path: PathBuf, watch: &Option<String>) -> &mut Self {
+ if let Some(remote) = &self.remote {
+ if let Err(err) = http::create(remote, name, command, path, watch) {
+ crashln!("{} Failed to start create {name}\nError: {:#?}", *helpers::FAIL, err);
+ };
+ } else {
+ let id = self.id.next();
+ let config = config::read().runner;
+ let crash = Crash { crashed: false, value: 0 };
+
+ let watch = match watch {
+ Some(watch) => Watch {
+ enabled: true,
+ path: string!(watch),
+ hash: hash::create(file::cwd().join(watch)),
+ },
+ None => Watch {
enabled: false,
path: string!(""),
hash: string!(""),
- }
- }
- };
+ },
+ };
- let pid = run(ProcessMetadata {
- args: config.args,
- name: name.clone(),
- shell: config.shell,
- command: command.clone(),
- log_path: config.log_path,
- });
-
- self.list.insert(
- id,
- Process {
- id,
- pid,
- watch,
- crash,
- restarts: 0,
- running: true,
- path: file::cwd(),
+ let pid = run(ProcessMetadata {
+ args: config.args,
name: name.clone(),
- started: Utc::now(),
- script: command.clone(),
- env: env::vars().collect(),
- },
- );
+ shell: config.shell,
+ command: command.clone(),
+ log_path: config.log_path,
+ });
+
+ self.list.insert(
+ id,
+ Process {
+ id,
+ pid,
+ path,
+ watch,
+ crash,
+ restarts: 0,
+ running: true,
+ name: name.clone(),
+ started: Utc::now(),
+ script: command.clone(),
+ env: env::vars().collect(),
+ },
+ );
+ }
return self;
}
pub fn restart(&mut self, id: usize, dead: bool) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::restart(remote, id) {
crashln!("{} Failed to start process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let item = self.get(id);
let Process { path, script, name, .. } = item.clone();
if let Err(err) = std::env::set_current_dir(&item.path) {
crashln!("{} Failed to set working directory {:?}\nError: {:#?}", *helpers::FAIL, path, err);
};
item.stop();
let config = config::read().runner;
item.crash.crashed = false;
item.pid = run(ProcessMetadata {
command: script,
args: config.args,
name: name.clone(),
shell: config.shell,
log_path: config.log_path,
});
item.running = true;
item.started = Utc::now();
then!(dead, item.restarts += 1);
}
return self;
}
pub fn remove(&mut self, id: usize) {
if let Some(remote) = &self.remote {
if let Err(err) = http::remove(remote, id) {
crashln!("{} Failed to stop remove {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.stop(id);
self.list.remove(&id);
dump::write(&self);
}
}
pub fn set_id(&mut self, id: id::Id) {
self.id = id;
self.id.next();
dump::write(&self);
}
pub fn set_status(&mut self, id: usize, status: Status) {
self.get(id).running = status.to_bool();
dump::write(&self);
}
pub fn items(&mut self) -> BTreeMap<usize, Process> { self.list.clone() }
pub fn items_mut(&mut self) -> &mut BTreeMap<usize, Process> { &mut self.list }
pub fn save(&self) { then!(self.remote.is_none(), dump::write(&self)) }
pub fn count(&mut self) -> usize { self.list().count() }
pub fn is_empty(&self) -> bool { self.list.is_empty() }
pub fn exists(&mut self, id: usize) -> bool { self.list.contains_key(&id) }
pub fn info(&mut self, id: usize) -> Option<&Process> { self.list.get(&id) }
pub fn list<'l>(&'l mut self) -> impl Iterator<Item = (&'l usize, &'l mut Process)> { self.list.iter_mut().map(|(k, v)| (k, v)) }
pub fn get(&mut self, id: usize) -> &mut Process { self.list.get_mut(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)) }
pub fn set_crashed(&mut self, id: usize) -> &mut Self {
self.get(id).crash.crashed = true;
return self;
}
pub fn new_crash(&mut self, id: usize) -> &mut Self {
self.get(id).crash.value += 1;
return self;
}
pub fn stop(&mut self, id: usize) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::stop(remote, id) {
crashln!("{} Failed to stop process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let item = self.get(id);
stop(item.pid);
item.running = false;
item.crash.crashed = false;
item.crash.value = 0;
}
return self;
}
pub fn rename(&mut self, id: usize, name: String) -> &mut Self {
self.get(id).name = name;
return self;
}
pub fn watch(&mut self, id: usize, path: &str, enabled: bool) -> &mut Self {
let item = self.get(id);
item.watch = Watch {
enabled,
path: string!(path),
hash: ternary!(enabled, hash::create(item.path.join(path)), string!("")),
};
return self;
}
pub fn json(&mut self) -> Value {
let mut processes: Vec<ProcessItem> = Vec::new();
for (id, item) in self.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status =
if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
processes.push(ProcessItem {
id,
status,
pid: item.pid,
cpu: cpu_percent,
mem: memory_usage,
restarts: item.restarts,
name: item.name.clone(),
start_time: item.started,
watch_path: item.watch.path.clone(),
uptime: helpers::format_duration(item.started),
});
}
json!(processes)
}
}
impl Process {
pub fn stop(&mut self) { Runner::new().stop(self.id).save(); }
pub fn watch(&mut self, path: &str) { Runner::new().watch(self.id, path, true).save(); }
pub fn disable_watch(&mut self) { Runner::new().watch(self.id, "", false).save(); }
pub fn rename(&mut self, name: String) { Runner::new().rename(self.id, name).save(); }
pub fn restart(&mut self) { Runner::new().restart(self.id, false).save(); }
pub fn crashed(&mut self) -> &mut Process {
Runner::new().new_crash(self.id).save();
Runner::new().restart(self.id, true).save();
return self;
}
pub fn json(&mut self) -> Value {
let config = config::read().runner;
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(self.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let status = if self.running {
string!("online")
} else {
match self.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
json!(ItemSingle {
info: Info {
status,
id: self.id,
pid: self.pid,
name: self.name.clone(),
path: self.path.clone(),
uptime: helpers::format_duration(self.started),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), self.script.clone()),
},
stats: Stats {
cpu_percent,
memory_usage,
restarts: self.restarts,
start_time: self.started.timestamp_millis(),
},
watch: Watch {
enabled: self.watch.enabled,
hash: self.watch.hash.clone(),
path: self.watch.path.clone(),
},
log: Log {
out: global!("pmc.logs.out", self.name.as_str()),
error: global!("pmc.logs.error", self.name.as_str()),
},
raw: Raw {
running: self.running,
crashed: self.crash.crashed,
crashes: self.crash.value,
}
})
}
}
pub mod dump;
pub mod hash;
pub mod id;
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Feb 1, 6:05 PM (1 d, 17 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
493978
Default Alt Text
(65 KB)
Attached To
Mode
rPMC Process Management Controller
Attached
Detach File
Event Timeline
Log In to Comment