Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2707257
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
80 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/src/cli.rs b/src/cli.rs
index 6e1e050..9b7ffb4 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -1,455 +1,541 @@
use colored::Colorize;
use global_placeholders::global;
use macros_rs::{crashln, string, ternary};
use psutil::process::{MemoryInfo, Process};
use regex::Regex;
use serde::Serialize;
use serde_json::json;
use std::env;
use pmc::{
config,
file::{self, Exists},
helpers::{self, ColoredString},
log,
- process::Runner,
+ process::{http, ItemSingle, Runner},
};
use tabled::{
settings::{
object::{Columns, Rows},
style::{BorderColor, Style},
themes::Colorization,
Color, Modify, Rotate, Width,
},
Table, Tabled,
};
#[derive(Clone, Debug)]
pub enum Args {
Id(usize),
Script(String),
}
fn format(server_name: &String) -> (String, String) {
let kind = ternary!(server_name == "internal", "", "remote ").to_string();
return (kind, server_name.to_string());
}
pub fn get_version(short: bool) -> String {
return match short {
true => format!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
false => format!("{} ({} {}) [{}]", env!("CARGO_PKG_VERSION"), env!("GIT_HASH"), env!("BUILD_DATE"), env!("PROFILE")),
};
}
pub fn start(name: &Option<String>, args: &Option<Args>, watch: &Option<String>, server_name: &String) {
let mut runner = Runner::new();
let config = config::read();
let (kind, list_name) = format(server_name);
match args {
Some(Args::Id(id)) => {
+ let runner: Runner = Runner::new();
println!("{} Applying {kind}action restartProcess on ({id})", *helpers::SUCCESS);
if *server_name == "internal" {
- let item = runner.get(*id);
+ let mut item = runner.get(*id);
match watch {
Some(path) => item.watch(path),
None => item.disable_watch(),
}
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
log!("process started (id={id})");
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), false) {
- Some(mut remote) => remote.get(*id).restart(),
+ Some(remote) => {
+ let mut item = remote.get(*id);
+
+ name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
+ item.restart();
+ }
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
}
}
println!("{} restarted {kind}({id}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
Some(Args::Script(script)) => {
let name = match name {
Some(name) => string!(name),
None => string!(script.split_whitespace().next().unwrap_or_default()),
};
if *server_name == "internal" {
let pattern = Regex::new(r"(?m)^[a-zA-Z0-9]+(/[a-zA-Z0-9]+)*(\.js|\.ts)?$").unwrap();
if pattern.is_match(script) {
let script = format!("{} {script}", config.runner.node);
runner.start(&name, &script, file::cwd(), watch).save();
} else {
runner.start(&name, script, file::cwd(), watch).save();
}
log!("process created (name={name})");
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), false) {
Some(mut remote) => remote.start(&name, script, file::cwd(), watch),
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
}
}
println!("{} Creating {kind}process with ({name})", *helpers::SUCCESS);
println!("{} {kind}created ({name}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
None => {}
}
}
pub fn stop(id: &usize, server_name: &String) {
+ let mut runner: Runner = Runner::new();
let (kind, list_name) = format(server_name);
println!("{} Applying {kind}action stopProcess on ({id})", *helpers::SUCCESS);
- if *server_name == "internal" {
- let mut runner = Runner::new();
- runner.get(*id).stop();
- log!("process stopped (id={id})");
- } else {
+ if *server_name != "internal" {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
- match Runner::connect(server_name.clone(), server.clone(), false) {
- Some(mut remote) => remote.get(*id).stop(),
+ runner = match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(remote) => remote,
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
}
}
+ runner.get(*id).stop();
println!("{} stopped {kind}({id}) ✓", *helpers::SUCCESS);
+ log!("process stopped {kind}(id={id})");
+
list(&string!("default"), &list_name);
}
pub fn remove(id: &usize, server_name: &String) {
+ let mut runner: Runner = Runner::new();
let (kind, _) = format(server_name);
println!("{} Applying {kind}action removeProcess on ({id})", *helpers::SUCCESS);
- if *server_name == "internal" {
- Runner::new().remove(*id);
- } else {
+ if *server_name != "internal" {
let Some(servers) = config::servers().servers else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
- match Runner::connect(server_name.clone(), server.clone(), false) {
- Some(mut remote) => remote.remove(*id),
+ runner = match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(remote) => remote,
None => crashln!("{} Failed to remove (name={server_name}, address={})", *helpers::FAIL, server.address),
};
}
}
+ runner.remove(*id);
println!("{} removed {kind}({id}) ✓", *helpers::SUCCESS);
log!("process removed (id={id})");
}
-pub fn info(id: &usize, format: &String) {
+pub fn info(id: &usize, format: &String, server_name: &String) {
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "error log path ")]
log_error: String,
#[tabled(rename = "out log path")]
log_out: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "path hash")]
hash: String,
#[tabled(rename = "watching")]
watch: String,
#[tabled(rename = "exec cwd")]
path: String,
#[tabled(rename = "script command ")]
command: String,
#[tabled(rename = "script id")]
id: String,
restarts: u64,
uptime: String,
pid: String,
name: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"id": &self.id.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"path": &self.path.trim(),
"restarts": &self.restarts,
"watch": &self.watch.trim(),
"watch": &self.hash.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"log_out": &self.log_out.trim(),
"cpu": &self.cpu_percent.trim(),
"command": &self.command.trim(),
"mem": &self.memory_usage.trim(),
"log_error": &self.log_error.trim(),
});
trimmed_json.serialize(serializer)
}
}
- if let Some(home) = home::home_dir() {
- let config = config::read().runner;
- let item = Runner::new().get(*id).clone();
- let mut memory_usage: Option<MemoryInfo> = None;
- let mut cpu_percent: Option<f32> = None;
+ let render_info = |data: Vec<Info>| {
+ let table = Table::new(data.clone())
+ .with(Rotate::Left)
+ .with(Style::rounded().remove_horizontals())
+ .with(Colorization::exact([Color::FG_CYAN], Columns::first()))
+ .with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
+ .to_string();
- if let Ok(mut process) = Process::new(item.pid as u32) {
- memory_usage = process.memory_info().ok();
- cpu_percent = process.cpu_percent().ok();
- }
+ if let Ok(json) = serde_json::to_string(&data[0]) {
+ match format.as_str() {
+ "raw" => println!("{:?}", data[0]),
+ "json" => println!("{json}"),
+ _ => {
+ println!("{}\n{table}\n", format!("Describing process with id ({id})").on_bright_white().black());
+ println!(" {}", format!("Use `pmc logs {id} [--lines <num>]` to display logs").white());
+ println!(" {}", format!("Use `pmc env {id}` to display environment variables").white());
+ }
+ };
+ };
+ };
+
+ if *server_name == "internal" {
+ if let Some(home) = home::home_dir() {
+ let config = config::read().runner;
+ let mut runner = Runner::new();
+ let item = runner.process(*id);
+
+ let mut memory_usage: Option<MemoryInfo> = None;
+ let mut cpu_percent: Option<f32> = None;
+
+ let path = file::make_relative(&item.path, &home).to_string_lossy().into_owned();
- let cpu_percent =
- match cpu_percent {
+ if let Ok(mut process) = Process::new(item.pid as u32) {
+ memory_usage = process.memory_info().ok();
+ cpu_percent = process.cpu_percent().ok();
+ }
+
+ let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
- let memory_usage = match memory_usage {
- Some(usage) => helpers::format_memory(usage.rss()),
- None => string!("0b"),
- };
+ let memory_usage = match memory_usage {
+ Some(usage) => helpers::format_memory(usage.rss()),
+ None => string!("0b"),
+ };
- let status =
- if item.running {
+ let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
- let path = file::make_relative(&item.path, &home)
- .map(|relative_path| relative_path.to_string_lossy().into_owned())
- .unwrap_or_else(|| crashln!("{} Unable to get your current directory", *helpers::FAIL));
-
- let data = vec![Info {
- cpu_percent,
- memory_usage,
- id: string!(id),
- restarts: item.restarts,
- name: item.name.clone(),
- path: format!("{} ", path),
- status: ColoredString(status),
- log_out: global!("pmc.logs.out", item.name.as_str()),
- log_error: global!("pmc.logs.error", item.name.as_str()),
- pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
- command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script),
- hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
- watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
- uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
- }];
+ let data = vec![Info {
+ cpu_percent,
+ memory_usage,
+ id: string!(id),
+ restarts: item.restarts,
+ name: item.name.clone(),
+ path: format!("{} ", path),
+ status: ColoredString(status),
+ log_out: global!("pmc.logs.out", item.name.as_str()),
+ log_error: global!("pmc.logs.error", item.name.as_str()),
+ pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
+ command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script),
+ hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
+ watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
+ uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
+ }];
+
+ render_info(data)
+ } else {
+ crashln!("{} Impossible to get your home directory", *helpers::FAIL);
+ }
+ } else {
+ let mut item: Option<(pmc::process::Process, Runner)> = None;
- let table = Table::new(data.clone())
- .with(Rotate::Left)
- .with(Style::rounded().remove_horizontals())
- .with(Colorization::exact([Color::FG_CYAN], Columns::first()))
- .with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
- .to_string();
+ let Some(servers) = config::servers().servers else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
- if let Ok(json) = serde_json::to_string(&data[0]) {
- match format.as_str() {
- "raw" => println!("{:?}", data[0]),
- "json" => println!("{json}"),
- _ => {
- println!("{}\n{table}\n", format!("Describing process with id ({id})").on_bright_white().black());
- println!(" {}", format!("Use `pmc logs {id} [--lines <num>]` to display logs").white());
- println!(" {}", format!("Use `pmc env {id}` to display environment variables").white());
+ if let Some(server) = servers.get(server_name) {
+ item = match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(mut remote) => Some((remote.process(*id).clone(), remote)),
+ None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
+ }
+
+ if let Some((item, remote)) = item {
+ let info = http::info(&remote.remote.unwrap(), *id);
+ let path = item.path.to_string_lossy().into_owned();
+
+ let status = if item.running {
+ "online ".green().bold()
+ } else {
+ match item.crash.crashed {
+ true => "crashed ",
+ false => "stopped ",
}
+ .red()
+ .bold()
};
- };
- } else {
- crashln!("{} Impossible to get your home directory", *helpers::FAIL);
+
+ if let Ok(info) = info {
+ let stats = info.json::<ItemSingle>().unwrap().stats;
+
+ let cpu_percent = match stats.cpu_percent {
+ Some(percent) => format!("{:.2}%", percent),
+ None => string!("0%"),
+ };
+
+ let memory_usage = match stats.memory_usage {
+ Some(usage) => helpers::format_memory(usage.rss()),
+ None => string!("0b"),
+ };
+
+ let data = vec![Info {
+ cpu_percent,
+ memory_usage,
+ id: string!(id),
+ path: path.clone(),
+ command: item.script,
+ restarts: item.restarts,
+ name: item.name.clone(),
+ status: ColoredString(status),
+ log_out: global!("pmc.logs.out", item.name.as_str()),
+ log_error: global!("pmc.logs.error", item.name.as_str()),
+ pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
+ hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
+ watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
+ uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
+ }];
+
+ render_info(data)
+ }
+ }
}
}
pub fn logs(id: &usize, lines: &usize) {
- let item = Runner::new().get(*id).clone();
+ let mut runner = Runner::new();
+ let item = runner.process(*id);
let log_error = global!("pmc.logs.error", item.name.as_str());
let log_out = global!("pmc.logs.out", item.name.as_str());
if Exists::file(log_error.clone()).unwrap() && Exists::file(log_out.clone()).unwrap() {
println!("{}", format!("Showing last {lines} lines for process [{id}] (change the value with --lines option)").yellow());
file::logs(*lines, &log_error, *id, "error", &item.name);
file::logs(*lines, &log_out, *id, "out", &item.name);
} else {
crashln!("{} Logs for process ({id}) not found", *helpers::FAIL);
}
}
-pub fn env(id: &usize) {
- let item = Runner::new().get(*id).clone();
- for (key, value) in item.env.iter() {
- println!("{}: {}", key, value.green());
+pub fn env(id: &usize, server_name: &String) {
+ let mut runner: Runner = Runner::new();
+
+ if *server_name != "internal" {
+ let Some(servers) = config::servers().servers else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
+
+ if let Some(server) = servers.get(server_name) {
+ runner = match Runner::connect(server_name.clone(), server.clone(), false) {
+ Some(remote) => remote,
+ None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
+ };
+ }
}
+
+ let item = runner.process(*id);
+ item.env.iter().for_each(|(key, value)| println!("{}: {}", key, value.green()));
}
pub fn list(format: &String, server_name: &String) {
let render_list = |runner: &mut Runner| {
let mut processes: Vec<ProcessItem> = Vec::new();
#[derive(Tabled, Debug)]
struct ProcessItem {
id: ColoredString,
name: String,
pid: String,
uptime: String,
#[tabled(rename = "↺")]
restarts: String,
status: ColoredString,
cpu: String,
mem: String,
#[tabled(rename = "watching")]
watch: String,
}
impl serde::Serialize for ProcessItem {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"cpu": &self.cpu.trim(),
"mem": &self.mem.trim(),
"id": &self.id.0.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"watch": &self.watch.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"restarts": &self.restarts.trim(),
});
trimmed_json.serialize(serializer)
}
}
if runner.is_empty() {
println!("{} Process table empty", *helpers::SUCCESS);
} else {
for (id, item) in runner.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.0}%", percent),
None => string!("0%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
processes.push(ProcessItem {
status: ColoredString(status),
cpu: format!("{cpu_percent} "),
mem: format!("{memory_usage} "),
restarts: format!("{} ", item.restarts),
name: format!("{} ", item.name.clone()),
id: ColoredString(id.to_string().cyan().bold()),
pid: ternary!(item.running, format!("{} ", item.pid), string!("n/a ")),
watch: ternary!(item.watch.enabled, format!("{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{} ", helpers::format_duration(item.started)), string!("none ")),
});
}
let table = Table::new(&processes)
.with(Style::rounded().remove_verticals())
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.with(Colorization::exact([Color::FG_BRIGHT_CYAN], Rows::first()))
.with(Modify::new(Columns::single(1)).with(Width::truncate(35).suffix("... ")))
.to_string();
if let Ok(json) = serde_json::to_string(&processes) {
match format.as_str() {
"raw" => println!("{:?}", processes),
"json" => println!("{json}"),
"default" => println!("{table}"),
_ => {}
};
};
}
};
if let Some(servers) = config::servers().servers {
let mut failed: Vec<(String, String)> = vec![];
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.clone(), true) {
Some(mut remote) => render_list(&mut remote),
None => println!("{} Failed to fetch (name={server_name}, address={})", *helpers::FAIL, server.address),
}
} else {
if matches!(&**server_name, "internal" | "all") {
println!("{} Internal daemon", *helpers::SUCCESS);
render_list(&mut Runner::new());
} else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL);
}
}
if *server_name == "all" {
for (name, server) in servers {
match Runner::connect(name.clone(), server.clone(), true) {
Some(mut remote) => render_list(&mut remote),
None => failed.push((name, server.address)),
}
}
}
if !failed.is_empty() {
println!("{} Failed servers:", *helpers::FAIL);
failed
.iter()
.for_each(|server| println!(" {} {} {}", "-".yellow(), format!("{}", server.0), format!("[{}]", server.1).white()));
}
} else {
render_list(&mut Runner::new());
}
}
diff --git a/src/daemon/api/routes.rs b/src/daemon/api/routes.rs
index 1ff5e00..8f58016 100644
--- a/src/daemon/api/routes.rs
+++ b/src/daemon/api/routes.rs
@@ -1,469 +1,465 @@
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{string, ternary, then};
use prometheus::{Encoder, TextEncoder};
use psutil::process::{MemoryInfo, Process};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::convert::Infallible;
use tera::{Context, Tera};
use utoipa::ToSchema;
use pmc::{
file, helpers,
process::{dump, Runner},
};
use crate::daemon::{
api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM},
pid,
};
use warp::{
hyper::body::Body,
reject,
reply::{self, json, Response},
Rejection, Reply,
};
use std::{
env,
fs::{self, File},
io::{self, BufRead, BufReader},
path::PathBuf,
};
#[allow(dead_code)]
#[derive(ToSchema)]
#[schema(as = MemoryInfo)]
pub(crate) struct DocMemoryInfo {
rss: u64,
vms: u64,
#[cfg(target_os = "linux")]
shared: u64,
#[cfg(target_os = "linux")]
text: u64,
#[cfg(target_os = "linux")]
data: u64,
#[cfg(target_os = "macos")]
page_faults: u64,
#[cfg(target_os = "macos")]
pageins: u64,
}
#[derive(Deserialize, ToSchema)]
pub(crate) struct ActionBody {
#[schema(example = "restart")]
method: String,
}
#[derive(Deserialize, ToSchema)]
pub(crate) struct CreateBody {
#[schema(example = "app")]
name: Option<String>,
#[schema(example = "node index.js")]
script: String,
- #[schema(example = "/projects/app")]
+ #[schema(value_type = String, example = "/projects/app")]
path: PathBuf,
#[schema(example = "src")]
watch: Option<String>,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct ActionResponse<'a> {
#[schema(example = true)]
done: bool,
#[schema(example = "name")]
action: &'a str,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct LogResponse {
logs: Vec<String>,
}
#[derive(Serialize, ToSchema)]
pub struct MetricsRoot<'a> {
pub version: Version<'a>,
pub daemon: Daemon,
}
#[derive(Serialize, ToSchema)]
pub struct Version<'a> {
#[schema(example = "v1.0.0")]
pub pkg: String,
pub hash: &'a str,
#[schema(example = "2000-01-01")]
pub build_date: &'a str,
#[schema(example = "release")]
pub target: &'a str,
}
#[derive(Serialize, ToSchema)]
pub struct Daemon {
pub pid: Option<i32>,
#[schema(example = true)]
pub running: bool,
pub uptime: String,
pub process_count: usize,
#[schema(example = "default")]
pub daemon_type: String,
pub stats: Stats,
}
#[derive(Serialize, ToSchema)]
pub struct Stats {
pub memory_usage: String,
pub cpu_percent: String,
}
#[inline]
fn attempt(done: bool, method: &str) -> reply::Json {
let data = json!(ActionResponse {
done,
action: ternary!(done, method, "DOES_NOT_EXIST")
});
json(&data)
}
#[inline]
fn render(name: &str, tmpl: &Tera, ctx: &Context) -> Result<String, Rejection> { tmpl.render(name, &ctx).or(Err(reject::not_found())) }
#[inline]
pub async fn login(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("login", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn dashboard(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("dashboard", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn view_process(id: usize, store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
ctx.insert("process_id", &id);
let payload = render("view", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/prometheus", responses((status = 200, description = "Get prometheus metrics", body = String)))]
pub async fn prometheus_handler() -> Result<impl Reply, Infallible> {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(format!("{}", String::from_utf8(buffer.clone()).unwrap()))
}
#[inline]
#[utoipa::path(get, path = "/dump", tag = "Process", responses((status = 200, description = "Dump processes successfully", body = [u8])))]
pub async fn dump_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(dump::raw())
}
#[inline]
#[utoipa::path(get, path = "/list", tag = "Process", responses((status = 200, description = "List processes successfully", body = [ProcessItem])))]
pub async fn list_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
let data = Runner::new().json();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(json(&data))
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
"error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
_ => global!("pmc.logs.out", item.name.as_str()),
};
match File::open(log_file) {
Ok(data) => {
let reader = BufReader::new(data);
let logs: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
timer.observe_duration();
Ok(json(&json!(LogResponse { logs })))
}
Err(_) => Ok(json(&json!(LogResponse { logs: vec![] }))),
}
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched raw", body = String),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler_raw(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
"error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
_ => global!("pmc.logs.out", item.name.as_str()),
};
let data = match fs::read_to_string(log_file) {
Ok(data) => data,
Err(err) => err.to_string(),
};
timer.observe_duration();
Ok(Response::new(Body::from(data)))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/info",
params(("id" = usize, Path, description = "Process id to get information for", example = 0)),
responses(
(status = 200, description = "Current process info retrieved", body = ItemSingle),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn info_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
+ let runner = Runner::new();
+ let mut item = runner.get(id);
HTTP_COUNTER.inc();
- match Runner::new().info(id) {
- Some(item) => {
- timer.observe_duration();
- Ok(json(&item.clone().json()))
- }
- None => {
- timer.observe_duration();
- Err(reject::not_found())
- }
- }
+ timer.observe_duration();
+ Ok(json(&item.json()))
}
#[inline]
-#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = String),
+#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = CreateBody),
responses(
(status = 200, description = "Create process successful", body = ActionResponse),
(status = INTERNAL_SERVER_ERROR, description = "Failed to create process", body = ErrorMessage)
)
)]
pub async fn create_handler(body: CreateBody) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["create"]).start_timer();
let mut runner = Runner::new();
HTTP_COUNTER.inc();
let name = match body.name {
Some(name) => string!(name),
None => string!(body.script.split_whitespace().next().unwrap_or_default()),
};
runner.start(&name, &body.script, body.path, &body.watch).save();
timer.observe_duration();
Ok(attempt(true, "create"))
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/rename", request_body(content = String),
params(("id" = usize, Path, description = "Process id to rename", example = 0)),
responses(
(status = 200, description = "Rename process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn rename_handler(id: usize, body: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
+
let mut runner = Runner::new();
+ let process = Runner::new().process(id).clone();
HTTP_COUNTER.inc();
if runner.exists(id) {
- let item = runner.get(id);
+ let mut item = runner.get(id);
item.rename(body.trim().replace("\n", ""));
- then!(item.running, item.restart());
+ then!(process.running, item.restart());
timer.observe_duration();
Ok(attempt(true, "rename"))
} else {
timer.observe_duration();
Err(reject::not_found())
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/env",
params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)),
responses(
(status = 200, description = "Current process env", body = HashMap<String, String>),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn env_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(json(&item.clone().env))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody,
params(("id" = usize, Path, description = "Process id to run action on", example = 0)),
responses(
(status = 200, description = "Run action on process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage)
)
)]
pub async fn action_handler(id: usize, body: ActionBody) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
let mut runner = Runner::new();
let method = body.method.as_str();
HTTP_COUNTER.inc();
match method {
"start" | "restart" => {
runner.get(id).restart();
timer.observe_duration();
Ok(attempt(true, method))
}
"stop" | "kill" => {
runner.get(id).stop();
timer.observe_duration();
Ok(attempt(true, method))
}
"remove" | "delete" => {
runner.remove(id);
timer.observe_duration();
Ok(attempt(true, method))
}
_ => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/metrics", responses((status = 200, description = "Get daemon metrics", body = MetricsRoot)))]
pub async fn metrics_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer();
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f32> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_rmp(global!("pmc.dump"));
HTTP_COUNTER.inc();
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(mut process) = Process::new(process_id as u32) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
}
}
let memory_usage =
match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
let response = json!(MetricsRoot {
version: Version {
pkg: format!("v{}", env!("CARGO_PKG_VERSION")),
hash: env!("GIT_HASH_FULL"),
build_date: env!("BUILD_DATE"),
target: env!("PROFILE"),
},
daemon: Daemon {
pid: pid,
running: pid::exists(),
uptime: uptime,
process_count: runner.count(),
daemon_type: global!("pmc.daemon.kind"),
stats: Stats { memory_usage, cpu_percent }
}
});
timer.observe_duration();
Ok(json(&response))
}
diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs
index cc8f4a1..b8fdc58 100644
--- a/src/daemon/mod.rs
+++ b/src/daemon/mod.rs
@@ -1,307 +1,309 @@
#[macro_use]
mod log;
mod api;
mod fork;
use api::{DAEMON_CPU_PERCENTAGE, DAEMON_MEM_USAGE, DAEMON_START_TIME};
use chrono::{DateTime, Utc};
use colored::Colorize;
use fork::{daemon, Fork};
use global_placeholders::global;
use macros_rs::{crashln, str, string, ternary, then};
use psutil::process::{MemoryInfo, Process};
use serde::Serialize;
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{process, thread::sleep, time::Duration};
use pmc::{
config, file,
helpers::{self, ColoredString},
process::{hash, id::Id, Runner, Status},
};
use tabled::{
settings::{
object::Columns,
style::{BorderColor, Style},
themes::Colorization,
Color, Rotate,
},
Table, Tabled,
};
static ENABLE_API: AtomicBool = AtomicBool::new(false);
static ENABLE_WEBUI: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_termination_signal(_: libc::c_int) {
pid::remove();
log!("[daemon] killed (pid={})", process::id());
unsafe { libc::_exit(0) }
}
fn restart_process() {
for (id, item) in Runner::new().items_mut() {
+ let mut runner = Runner::new();
+
if item.running && item.watch.enabled {
let path = item.path.join(item.watch.path.clone());
let hash = hash::create(path);
if hash != item.watch.hash {
- item.restart();
+ runner.restart(item.id, false);
log!("[daemon] watch reload {} (id={id}, hash={hash})", item.name);
continue;
}
}
if !item.running && pid::running(item.pid as i32) {
Runner::new().set_status(*id, Status::Running);
log!("[daemon] fix status {} (id={id})", item.name);
continue;
}
then!(!item.running || pid::running(item.pid as i32), continue);
if item.running && item.crash.value == config::read().daemon.restarts {
log!("[daemon] {} has crashed (id={id})", item.name);
- item.stop();
- Runner::new().set_crashed(*id).save();
+ runner.stop(item.id);
+ runner.set_crashed(*id).save();
continue;
} else {
- item.crashed();
+ runner.get(item.id).crashed();
log!("[daemon] restarted {} (id={id}, crashes={})", item.name, item.crash.value);
}
}
}
pub fn health(format: &String) {
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f32> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_rmp(global!("pmc.dump"));
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "pid file")]
pid_file: String,
#[tabled(rename = "fork path")]
path: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "daemon type")]
external: String,
#[tabled(rename = "process count")]
process_count: usize,
uptime: String,
pid: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"pid_file": &self.pid_file.trim(),
"path": &self.path.trim(),
"cpu": &self.cpu_percent.trim(),
"mem": &self.memory_usage.trim(),
"process_count": &self.process_count.to_string(),
"uptime": &self.uptime.trim(),
"pid": &self.pid.trim(),
"status": &self.status.0.trim(),
});
trimmed_json.serialize(serializer)
}
}
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(mut process) = Process::new(process_id as u32) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
}
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let memory_usage =
match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
let pid = match pid {
Some(pid) => string!(pid),
None => string!("n/a"),
};
let data = vec![Info {
pid: pid,
cpu_percent,
memory_usage,
uptime: uptime,
path: global!("pmc.base"),
external: global!("pmc.daemon.kind"),
process_count: runner.count(),
pid_file: format!("{} ", global!("pmc.pid")),
status: ColoredString(ternary!(pid::exists(), "online".green().bold(), "stopped".red().bold())),
}];
let table = Table::new(data.clone())
.with(Rotate::Left)
.with(Style::rounded().remove_horizontals())
.with(Colorization::exact([Color::FG_CYAN], Columns::first()))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.to_string();
if let Ok(json) = serde_json::to_string(&data[0]) {
match format.as_str() {
"raw" => println!("{:?}", data[0]),
"json" => println!("{json}"),
"default" => {
println!("{}\n{table}\n", format!("PMC daemon information").on_bright_white().black());
println!(" {}", format!("Use `pmc daemon restart` to restart the daemon").white());
println!(" {}", format!("Use `pmc daemon reset` to clean process id values").white());
}
_ => {}
};
};
}
pub fn stop() {
if pid::exists() {
println!("{} Stopping PMC daemon", *helpers::SUCCESS);
match pid::read() {
Ok(pid) => {
pmc::service::stop(pid as i64);
pid::remove();
log!("[daemon] stopped (pid={pid})");
println!("{} PMC daemon stopped", *helpers::SUCCESS);
}
Err(err) => crashln!("{} Failed to read PID file: {}", *helpers::FAIL, err),
}
} else {
crashln!("{} The daemon is not running", *helpers::FAIL)
}
}
pub fn start(verbose: bool) {
let external =
match global!("pmc.daemon.kind").as_str() {
"external" => true,
"default" => false,
"rust" => false,
"cc" => true,
_ => false,
};
println!("{} Spawning PMC daemon (pmc_base={})", *helpers::SUCCESS, global!("pmc.base"));
if ENABLE_API.load(Ordering::Acquire) {
println!(
"{} API server started (address={:?}, webui={})",
*helpers::SUCCESS,
config::read().get_address(),
ENABLE_WEBUI.load(Ordering::Acquire)
);
}
if pid::exists() {
match pid::read() {
Ok(pid) => then!(!pid::running(pid), pid::remove()),
Err(_) => crashln!("{} The daemon is already running", *helpers::FAIL),
}
}
#[inline]
#[tokio::main]
async extern "C" fn init() {
pid::name("PMC Restart Handler Daemon");
let config = config::read().daemon;
let api_enabled = ENABLE_API.load(Ordering::Acquire);
let ui_enabled = ENABLE_WEBUI.load(Ordering::Acquire);
unsafe { libc::signal(libc::SIGTERM, handle_termination_signal as usize) };
DAEMON_START_TIME.set(Utc::now().timestamp_millis() as f64);
pid::write(process::id());
log!("[daemon] new fork (pid={})", process::id());
if api_enabled {
log!("[api] server started (address={:?})", config::read().get_address());
tokio::spawn(async move { api::start(ui_enabled).await });
}
loop {
if api_enabled {
if let Ok(mut process) = Process::new(process::id()) {
DAEMON_CPU_PERCENTAGE.observe(process.cpu_percent().ok().unwrap() as f64);
DAEMON_MEM_USAGE.observe(process.memory_info().ok().unwrap().rss() as f64);
}
}
then!(!Runner::new().is_empty(), restart_process());
sleep(Duration::from_millis(config.interval));
}
}
println!("{} PMC Successfully daemonized (type={})", *helpers::SUCCESS, global!("pmc.daemon.kind"));
if external {
let callback = pmc::Callback(init);
pmc::service::try_fork(false, verbose, callback);
} else {
match daemon(false, verbose) {
Ok(Fork::Parent(_)) => {}
Ok(Fork::Child) => init(),
Err(err) => crashln!("{} Daemon creation failed with code {err}", *helpers::FAIL),
}
}
}
pub fn restart(api: &bool, webui: &bool, verbose: bool) {
if pid::exists() {
stop();
}
let config = config::read().daemon;
if config.web.ui || *webui {
ENABLE_API.store(true, Ordering::Release);
ENABLE_WEBUI.store(true, Ordering::Release);
} else if config.web.api {
ENABLE_API.store(true, Ordering::Release);
} else {
ENABLE_API.store(*api, Ordering::Release);
}
start(verbose);
}
pub fn reset() {
let mut runner = Runner::new();
let largest = runner.list().map(|(key, _)| *key).max();
match largest {
Some(id) => runner.set_id(Id::from(str!(id.to_string()))),
None => println!("{} Cannot reset index, no ID found", *helpers::FAIL),
}
println!("{} PMC Successfully reset (index={})", *helpers::SUCCESS, runner.id);
}
pub mod pid;
diff --git a/src/file.rs b/src/file.rs
index e50fc24..e53ccf3 100644
--- a/src/file.rs
+++ b/src/file.rs
@@ -1,111 +1,110 @@
use crate::{helpers, log};
use anyhow::Error;
use colored::Colorize;
use macros_rs::{crashln, str, string, ternary};
use std::{
env,
fs::{self, File},
io::{self, BufRead, BufReader},
- path::{Path, PathBuf, StripPrefixError},
+ path::{Path, PathBuf},
thread::sleep,
time::Duration,
};
pub fn logs(lines_to_tail: usize, log_file: &str, id: usize, log_type: &str, item_name: &str) {
let file = File::open(log_file).unwrap();
let reader = BufReader::new(file);
let lines: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
let color = ternary!(log_type == "out", "green", "red");
println!("{}", format!("\n{log_file} last {lines_to_tail} lines:").bright_black());
let start_index = if lines.len() > lines_to_tail { lines.len() - lines_to_tail } else { 0 };
for (_, line) in lines.iter().skip(start_index).enumerate() {
println!("{} {}", format!("{}|{} |", id, item_name).color(color), line);
}
}
pub fn cwd() -> PathBuf {
match env::current_dir() {
Ok(path) => path,
Err(_) => crashln!("{} Unable to find current working directory", *helpers::FAIL),
}
}
-// fix
-pub fn make_relative(current: &Path, home: &Path) -> Option<std::path::PathBuf> {
+pub fn make_relative(current: &Path, home: &Path) -> PathBuf {
match current.strip_prefix(home) {
- Ok(relative_path) => Some(Path::new("~").join(relative_path)),
- Err(StripPrefixError { .. }) => None,
+ Err(_) => Path::new(home).join(current),
+ Ok(relative_path) => Path::new("~").join(relative_path),
}
}
pub struct Exists;
impl Exists {
pub fn folder(dir_name: String) -> Result<bool, Error> { Ok(Path::new(str!(dir_name)).is_dir()) }
pub fn file(file_name: String) -> Result<bool, Error> { Ok(Path::new(str!(file_name)).exists()) }
}
pub fn raw(path: String) -> Vec<u8> {
match fs::read(&path) {
Ok(contents) => contents,
Err(err) => crashln!("{} Cannot find dumpfile.\n{}", *helpers::FAIL, string!(err).white()),
}
}
pub fn read<T: serde::de::DeserializeOwned>(path: String) -> T {
let contents = match fs::read_to_string(&path) {
Ok(contents) => contents,
Err(err) => crashln!("{} Cannot find dumpfile.\n{}", *helpers::FAIL, string!(err).white()),
};
match toml::from_str(&contents).map_err(|err| string!(err)) {
Ok(parsed) => parsed,
Err(err) => crashln!("{} Cannot parse dumpfile.\n{}", *helpers::FAIL, err.white()),
}
}
pub fn from_rmp<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> T {
match rmp_serde::from_slice(&bytes) {
Ok(parsed) => parsed,
Err(err) => crashln!("{} Cannot parse file.\n{}", *helpers::FAIL, string!(err).white()),
}
}
pub fn read_rmp<T: serde::de::DeserializeOwned>(path: String) -> T {
let mut retry_count = 0;
let max_retries = 5;
let bytes = loop {
match fs::read(&path) {
Ok(contents) => break contents,
Err(err) => {
retry_count += 1;
if retry_count >= max_retries {
log!("{} Cannot find file.\n{}", *helpers::FAIL, string!(err).white());
} else {
log!("{} Error reading file. Retrying... (Attempt {}/{})", *helpers::FAIL, retry_count, max_retries);
}
}
}
sleep(Duration::from_secs(1));
};
retry_count = 0;
loop {
match rmp_serde::from_slice(&bytes) {
Ok(parsed) => break parsed,
Err(err) => {
retry_count += 1;
if retry_count >= max_retries {
log!("{} Cannot parse file.\n{}", *helpers::FAIL, string!(err).white());
} else {
log!("{} Error parsing file. Retrying... (Attempt {}/{})", *helpers::FAIL, retry_count, max_retries);
}
}
}
sleep(Duration::from_secs(1));
}
}
diff --git a/src/main.rs b/src/main.rs
index 5eb3021..0c4fc3c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,169 +1,169 @@
mod cli;
mod daemon;
mod globals;
mod webui;
use crate::cli::Args;
use clap::{Parser, Subcommand};
use clap_verbosity_flag::Verbosity;
use macros_rs::{str, string, then};
fn validate_id_script(s: &str) -> Result<Args, String> {
if let Ok(id) = s.parse::<usize>() {
Ok(Args::Id(id))
} else {
Ok(Args::Script(s.to_owned()))
}
}
#[derive(Parser)]
#[command(version = str!(cli::get_version(false)))]
struct Cli {
#[command(subcommand)]
command: Commands,
#[clap(flatten)]
verbose: Verbosity,
}
#[derive(Subcommand)]
enum Daemon {
/// Reset process index
#[command(alias = "clean")]
Reset,
/// Stop daemon
#[command(alias = "kill")]
Stop,
/// Restart daemon
#[command(alias = "restart", alias = "start")]
Restore {
/// Daemon api
#[arg(long)]
api: bool,
/// WebUI using api
#[arg(long)]
webui: bool,
},
/// Check daemon
#[command(alias = "info", alias = "status")]
Health {
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
},
}
// add pmc restore command
#[derive(Subcommand)]
enum Commands {
/// Start/Restart a process
#[command(alias = "restart")]
Start {
/// Process name
#[arg(long)]
name: Option<String>,
#[clap(value_parser = validate_id_script)]
args: Option<Args>,
/// Watch to reload path
#[arg(long)]
watch: Option<String>,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Stop/Kill a process
#[command(alias = "kill")]
Stop {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Stop then remove a process
#[command(alias = "rm")]
Remove {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Get env of a process
#[command(alias = "cmdline")]
Env {
id: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Get information of a process
#[command(alias = "info")]
Details {
id: usize,
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// List all processes
#[command(alias = "ls")]
List {
/// Format output
#[arg(long, default_value_t = string!("default"))]
format: String,
/// Server
#[arg(short, long, default_value_t = string!("all"))]
server: String,
},
/// Get logs from a process
Logs {
id: usize,
#[arg(long, default_value_t = 15, help = "")]
lines: usize,
/// Server
#[arg(short, long, default_value_t = string!("internal"))]
server: String,
},
/// Daemon management
Daemon {
#[command(subcommand)]
command: Daemon,
},
}
fn main() {
let cli = Cli::parse();
let mut env = env_logger::Builder::new();
let level = cli.verbose.log_level_filter();
globals::init();
env.filter_level(level).init();
match &cli.command {
Commands::Start { name, args, watch, server } => cli::start(name, args, watch, server),
Commands::Stop { id, server } => cli::stop(id, server),
Commands::Remove { id, server } => cli::remove(id, server),
- Commands::Env { id, server } => cli::env(id),
- Commands::Details { id, format, server } => cli::info(id, format),
+ Commands::Env { id, server } => cli::env(id, server),
+ Commands::Details { id, format, server } => cli::info(id, format, server),
Commands::List { format, server } => cli::list(format, server),
Commands::Logs { id, lines, server } => cli::logs(id, lines),
Commands::Daemon { command } => match command {
Daemon::Stop => daemon::stop(),
Daemon::Reset => daemon::reset(),
Daemon::Health { format } => daemon::health(format),
Daemon::Restore { api, webui } => daemon::restart(api, webui, level.as_str() != "ERROR"),
},
};
if !matches!(&cli.command, Commands::Daemon { .. }) {
then!(!daemon::pid::exists(), daemon::start(false));
}
}
diff --git a/src/process/http.rs b/src/process/http.rs
index 83f9dd0..f537104 100644
--- a/src/process/http.rs
+++ b/src/process/http.rs
@@ -1,58 +1,68 @@
-use crate::process::{Process, Remote};
+use crate::process::Remote;
use macros_rs::{fmtstr, string};
use reqwest::blocking::{Client, Response};
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use serde::Serialize;
-use std::{collections::BTreeMap, path::PathBuf};
+use std::path::PathBuf;
#[derive(Serialize)]
struct ActionBody {
pub method: String,
}
#[derive(Serialize)]
struct CreateBody<'c> {
pub name: &'c String,
pub script: &'c String,
pub path: PathBuf,
pub watch: &'c Option<String>,
}
fn client(token: &Option<String>) -> (Client, HeaderMap) {
let client = Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert(AUTHORIZATION, HeaderValue::from_static(fmtstr!("token {token}")));
}
return (client, headers);
}
+pub fn info(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
+ let (client, headers) = client(token);
+ Ok(client.get(fmtstr!("{address}/process/{id}/info")).headers(headers).send()?)
+}
+
pub fn create(Remote { address, token }: &Remote, name: &String, script: &String, path: PathBuf, watch: &Option<String>) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = CreateBody { name, script, path, watch };
Ok(client.post(fmtstr!("{address}/process/create")).json(&content).headers(headers).send()?)
}
pub fn restart(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("restart") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
+pub fn rename(Remote { address, token }: &Remote, id: usize, name: String) -> Result<Response, anyhow::Error> {
+ let (client, headers) = client(token);
+ Ok(client.post(fmtstr!("{address}/process/{id}/rename")).body(name).headers(headers).send()?)
+}
+
pub fn stop(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("stop") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
pub fn remove(Remote { address, token }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("remove") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 4ac10ba..307f745 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -1,438 +1,491 @@
-mod http;
-
use crate::{
config,
config::structs::Server,
file, helpers,
service::{run, stop, ProcessMetadata},
};
+use std::{
+ env,
+ path::PathBuf,
+ sync::{Arc, Mutex},
+};
+
use chrono::serde::ts_milliseconds;
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{crashln, string, ternary, then};
use psutil::process::{self, MemoryInfo};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::{BTreeMap, HashMap};
-use std::{env, path::PathBuf};
use utoipa::ToSchema;
-#[derive(Serialize, ToSchema)]
+#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemSingle {
- info: Info,
- stats: Stats,
- watch: Watch,
- log: Log,
- raw: Raw,
+ pub info: Info,
+ pub stats: Stats,
+ pub watch: Watch,
+ pub log: Log,
+ pub raw: Raw,
}
-#[derive(Serialize, ToSchema)]
+#[derive(Serialize, Deserialize, ToSchema)]
pub struct Info {
- id: usize,
- pid: i64,
- name: String,
- status: String,
+ pub id: usize,
+ pub pid: i64,
+ pub name: String,
+ pub status: String,
#[schema(value_type = String, example = "/path")]
- path: PathBuf,
- uptime: String,
- command: String,
+ pub path: PathBuf,
+ pub uptime: String,
+ pub command: String,
}
-#[derive(Serialize, ToSchema)]
+#[derive(Serialize, Deserialize, ToSchema)]
pub struct Stats {
- restarts: u64,
- start_time: i64,
- cpu_percent: Option<f32>,
- memory_usage: Option<MemoryInfo>,
+ pub restarts: u64,
+ pub start_time: i64,
+ pub cpu_percent: Option<f32>,
+ pub memory_usage: Option<MemoryInfo>,
}
-#[derive(Serialize, ToSchema)]
+#[derive(Serialize, Deserialize, ToSchema)]
pub struct Log {
- out: String,
- error: String,
+ pub out: String,
+ pub error: String,
}
-#[derive(Serialize, ToSchema)]
+#[derive(Serialize, Deserialize, ToSchema)]
pub struct Raw {
- running: bool,
- crashed: bool,
- crashes: u64,
+ pub running: bool,
+ pub crashed: bool,
+ pub crashes: u64,
}
#[derive(Serialize, ToSchema)]
pub struct ProcessItem {
pid: i64,
id: usize,
cpu: String,
mem: String,
name: String,
restarts: u64,
status: String,
uptime: String,
#[schema(example = "/path")]
watch_path: String,
#[schema(value_type = String, example = "2000-01-01T01:00:00.000Z")]
start_time: DateTime<Utc>,
}
+#[derive(Clone)]
+pub struct ProcessWrapper {
+ pub id: usize,
+ pub runner: Arc<Mutex<Runner>>,
+}
+
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Process {
pub id: usize,
pub pid: i64,
pub name: String,
pub path: PathBuf,
pub script: String,
pub env: HashMap<String, String>,
#[serde(with = "ts_milliseconds")]
pub started: DateTime<Utc>,
pub restarts: u64,
pub running: bool,
pub crash: Crash,
pub watch: Watch,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Crash {
pub crashed: bool,
pub value: u64,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct Watch {
pub enabled: bool,
#[schema(example = "/path")]
pub path: String,
pub hash: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Runner {
pub id: id::Id,
#[serde(skip)]
pub remote: Option<Remote>,
pub list: BTreeMap<usize, Process>,
}
#[derive(Clone, Debug)]
pub struct Remote {
address: String,
token: Option<String>,
}
pub enum Status {
Offline,
Running,
}
impl Status {
pub fn to_bool(&self) -> bool {
match self {
Status::Offline => false,
Status::Running => true,
}
}
}
impl Runner {
pub fn new() -> Self { dump::read() }
pub fn connect(name: String, Server { address, token }: Server, verbose: bool) -> Option<Self> {
match dump::from(&address, token.as_deref()) {
Ok(dump) => {
then!(verbose, println!("{} Fetched remote (name={name}, address={address})", *helpers::SUCCESS));
return Some(Runner {
remote: Some(Remote { token, address: string!(address) }),
..dump
});
}
Err(err) => {
log::debug!("{err}");
return None;
}
}
}
pub fn start(&mut self, name: &String, command: &String, path: PathBuf, watch: &Option<String>) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::create(remote, name, command, path, watch) {
crashln!("{} Failed to start create {name}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let id = self.id.next();
let config = config::read().runner;
let crash = Crash { crashed: false, value: 0 };
let watch = match watch {
Some(watch) => Watch {
enabled: true,
path: string!(watch),
hash: hash::create(file::cwd().join(watch)),
},
None => Watch {
enabled: false,
path: string!(""),
hash: string!(""),
},
};
let pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
command: command.clone(),
log_path: config.log_path,
});
self.list.insert(
id,
Process {
id,
pid,
path,
watch,
crash,
restarts: 0,
running: true,
name: name.clone(),
started: Utc::now(),
script: command.clone(),
env: env::vars().collect(),
},
);
}
return self;
}
pub fn restart(&mut self, id: usize, dead: bool) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::restart(remote, id) {
crashln!("{} Failed to start process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
- let item = self.get(id);
- let Process { path, script, name, .. } = item.clone();
+ let process = self.process(id);
+ let config = config::read().runner;
+ let Process { path, script, name, .. } = process.clone();
- if let Err(err) = std::env::set_current_dir(&item.path) {
+ if let Err(err) = std::env::set_current_dir(&process.path) {
crashln!("{} Failed to set working directory {:?}\nError: {:#?}", *helpers::FAIL, path, err);
};
- item.stop();
+ stop(process.pid);
+ process.running = false;
+ process.crash.crashed = false;
+ process.crash.value = 0;
- let config = config::read().runner;
-
- item.crash.crashed = false;
- item.pid = run(ProcessMetadata {
- command: script,
+ process.pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
log_path: config.log_path,
+ command: script.to_string(),
});
- item.running = true;
- item.started = Utc::now();
- then!(dead, item.restarts += 1);
+ process.running = true;
+ process.started = Utc::now();
+ then!(dead, process.restarts += 1);
}
return self;
}
pub fn remove(&mut self, id: usize) {
if let Some(remote) = &self.remote {
if let Err(err) = http::remove(remote, id) {
crashln!("{} Failed to stop remove {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.stop(id);
self.list.remove(&id);
dump::write(&self);
}
}
pub fn set_id(&mut self, id: id::Id) {
self.id = id;
self.id.next();
dump::write(&self);
}
pub fn set_status(&mut self, id: usize, status: Status) {
- self.get(id).running = status.to_bool();
+ self.process(id).running = status.to_bool();
dump::write(&self);
}
pub fn items(&mut self) -> BTreeMap<usize, Process> { self.list.clone() }
pub fn items_mut(&mut self) -> &mut BTreeMap<usize, Process> { &mut self.list }
pub fn save(&self) { then!(self.remote.is_none(), dump::write(&self)) }
pub fn count(&mut self) -> usize { self.list().count() }
pub fn is_empty(&self) -> bool { self.list.is_empty() }
pub fn exists(&mut self, id: usize) -> bool { self.list.contains_key(&id) }
pub fn info(&mut self, id: usize) -> Option<&Process> { self.list.get(&id) }
pub fn list<'l>(&'l mut self) -> impl Iterator<Item = (&'l usize, &'l mut Process)> { self.list.iter_mut().map(|(k, v)| (k, v)) }
- pub fn get(&mut self, id: usize) -> &mut Process { self.list.get_mut(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)) }
+ pub fn process(&mut self, id: usize) -> &mut Process { self.list.get_mut(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)) }
+
+ pub fn get(self, id: usize) -> ProcessWrapper {
+ ProcessWrapper {
+ id,
+ runner: Arc::new(Mutex::new(self)),
+ }
+ }
pub fn set_crashed(&mut self, id: usize) -> &mut Self {
- self.get(id).crash.crashed = true;
+ self.process(id).crash.crashed = true;
return self;
}
pub fn new_crash(&mut self, id: usize) -> &mut Self {
- self.get(id).crash.value += 1;
+ self.process(id).crash.value += 1;
return self;
}
pub fn stop(&mut self, id: usize) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::stop(remote, id) {
crashln!("{} Failed to stop process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
- let item = self.get(id);
- stop(item.pid);
-
- item.running = false;
- item.crash.crashed = false;
- item.crash.value = 0;
+ let process = self.process(id);
+ stop(process.pid);
+ process.running = false;
+ process.crash.crashed = false;
+ process.crash.value = 0;
}
return self;
}
pub fn rename(&mut self, id: usize, name: String) -> &mut Self {
- self.get(id).name = name;
+ if let Some(remote) = &self.remote {
+ if let Err(err) = http::rename(remote, id, name) {
+ crashln!("{} Failed to rename process {id}\nError: {:#?}", *helpers::FAIL, err);
+ };
+ } else {
+ self.process(id).name = name;
+ }
+
return self;
}
pub fn watch(&mut self, id: usize, path: &str, enabled: bool) -> &mut Self {
- let item = self.get(id);
- item.watch = Watch {
+ let process = self.process(id);
+ process.watch = Watch {
enabled,
path: string!(path),
- hash: ternary!(enabled, hash::create(item.path.join(path)), string!("")),
+ hash: ternary!(enabled, hash::create(process.path.join(path)), string!("")),
};
return self;
}
pub fn json(&mut self) -> Value {
let mut processes: Vec<ProcessItem> = Vec::new();
for (id, item) in self.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status =
if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
processes.push(ProcessItem {
id,
status,
pid: item.pid,
cpu: cpu_percent,
mem: memory_usage,
restarts: item.restarts,
name: item.name.clone(),
start_time: item.started,
watch_path: item.watch.path.clone(),
uptime: helpers::format_duration(item.started),
});
}
json!(processes)
}
}
-impl Process {
- pub fn stop(&mut self) { Runner::new().stop(self.id).save(); }
- pub fn watch(&mut self, path: &str) { Runner::new().watch(self.id, path, true).save(); }
- pub fn disable_watch(&mut self) { Runner::new().watch(self.id, "", false).save(); }
- pub fn rename(&mut self, name: String) { Runner::new().rename(self.id, name).save(); }
- pub fn restart(&mut self) { Runner::new().restart(self.id, false).save(); }
+impl ProcessWrapper {
+ pub fn stop(&mut self) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.stop(self.id).save();
+ }
- pub fn crashed(&mut self) -> &mut Process {
- Runner::new().new_crash(self.id).save();
- Runner::new().restart(self.id, true).save();
- return self;
+ pub fn watch(&mut self, path: &str) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.watch(self.id, path, true).save();
+ }
+
+ pub fn disable_watch(&mut self) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.watch(self.id, "", false).save();
+ }
+
+ pub fn rename(&mut self, name: String) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.rename(self.id, name).save();
+ }
+
+ pub fn restart(&mut self) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.restart(self.id, false).save();
+ }
+
+ pub fn crashed(&mut self) {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+ runner.new_crash(self.id).save();
+ runner.restart(self.id, true).save();
}
pub fn json(&mut self) -> Value {
+ let runner_arc = Arc::clone(&self.runner);
+ let mut runner = runner_arc.lock().unwrap();
+
+ let item = runner.process(self.id);
let config = config::read().runner;
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
- if let Ok(mut process) = process::Process::new(self.pid as u32) {
+ if let Ok(mut process) = process::Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
- let status = if self.running {
+ let status = if item.running {
string!("online")
} else {
- match self.crash.crashed {
+ match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
json!(ItemSingle {
info: Info {
status,
- id: self.id,
- pid: self.pid,
- name: self.name.clone(),
- path: self.path.clone(),
- uptime: helpers::format_duration(self.started),
- command: format!("{} {} '{}'", config.shell, config.args.join(" "), self.script.clone()),
+ id: item.id,
+ pid: item.pid,
+ name: item.name.clone(),
+ path: item.path.clone(),
+ uptime: helpers::format_duration(item.started),
+ command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script.clone()),
},
stats: Stats {
cpu_percent,
memory_usage,
- restarts: self.restarts,
- start_time: self.started.timestamp_millis(),
+ restarts: item.restarts,
+ start_time: item.started.timestamp_millis(),
},
watch: Watch {
- enabled: self.watch.enabled,
- hash: self.watch.hash.clone(),
- path: self.watch.path.clone(),
+ enabled: item.watch.enabled,
+ hash: item.watch.hash.clone(),
+ path: item.watch.path.clone(),
},
log: Log {
- out: global!("pmc.logs.out", self.name.as_str()),
- error: global!("pmc.logs.error", self.name.as_str()),
+ out: global!("pmc.logs.out", item.name.as_str()),
+ error: global!("pmc.logs.error", item.name.as_str()),
},
raw: Raw {
- running: self.running,
- crashed: self.crash.crashed,
- crashes: self.crash.value,
+ running: item.running,
+ crashed: item.crash.crashed,
+ crashes: item.crash.value,
}
})
}
}
pub mod dump;
pub mod hash;
+pub mod http;
pub mod id;
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Feb 1, 2:22 PM (1 d, 2 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
494784
Default Alt Text
(80 KB)
Attached To
Mode
rPMC Process Management Controller
Attached
Detach File
Event Timeline
Log In to Comment