Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F2708251
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Size
77 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 43730ee..a39980a 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,49 +1,49 @@
stages: [build, release]
image: 'themackabu/rust:zigbuild-1.75.0'
before_script:
- mkdir binary
- apt-get update -yqq
- - apt-get install -yqq zip clang llvm
+ - apt-get install -yqq zip clang llvm pkg-config libssl-dev
- export CC="/usr/bin/clang"
- export CXX="/usr/bin/clang++"
build_linux_amd64:
stage: build
tags: [fedora]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r -vv --color always
- zip binary/pmc_${CI_COMMIT_TAG}_linux_amd64.zip target/release/pmc -j
artifacts:
paths: [binary/]
build_linux_aarch64:
stage: build
tags: [fedora]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r -vv --target aarch64-unknown-linux-gnu --color always
- zip binary/pmc_${CI_COMMIT_TAG}_linux_aarch64.zip target/aarch64-unknown-linux-gnu/release/pmc -j
artifacts:
paths: [binary/]
build_darwin_amd64:
stage: build
tags: [fedora]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r -vv --target x86_64-apple-darwin --color always
- zip binary/pmc_${CI_COMMIT_TAG}_darwin_amd64.zip target/x86_64-apple-darwin/release/pmc -j
artifacts:
paths: [binary/]
build_darwin_aarch64:
stage: build
tags: [fedora]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r -vv --target aarch64-apple-darwin --color always
- zip binary/pmc_${CI_COMMIT_TAG}_darwin_arm.zip target/aarch64-apple-darwin/release/pmc -j
artifacts:
paths: [binary/]
diff --git a/lib/process.cc b/lib/process.cc
index 5052c9f..2d9e036 100644
--- a/lib/process.cc
+++ b/lib/process.cc
@@ -1,94 +1,101 @@
#include <process.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <iostream>
+#include <algorithm>
using namespace std;
namespace process {
volatile sig_atomic_t childExitStatus = 0;
+std::string format(std::string text) {
+ std::replace(text.begin(), text.end(), ' ', '_');
+ return text;
+}
+
pair<std::string, std::string> split(const std::string& str) {
size_t length = str.length();
size_t midpoint = length / 2;
std::string firstHalf = str.substr(0, midpoint);
std::string secondHalf = str.substr(midpoint);
return make_pair(firstHalf, secondHalf);
}
void sigchld_handler(int signo) {
(void)signo;
int status;
while (waitpid(-1, &status, WNOHANG) > 0) {
childExitStatus = status;
}
}
void Runner::New(const std::string &name, const std::string &logPath) {
- std::string stdoutFileName = logPath + "/" + name + "-out.log";
- std::string stderrFileName = logPath + "/" + name + "-error.log";
+ std::string formattedName = format(name);
+ std::string stdoutFileName = logPath + "/" + formattedName + "-out.log";
+ std::string stderrFileName = logPath + "/" + formattedName + "-error.log";
stdout_fd = open(stdoutFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
stderr_fd = open(stderrFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
struct sigaction sa;
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
std::cerr << "[PMC] (cc) Error setting up SIGCHLD handler\n";
}
}
Runner::~Runner() {
if (stdout_fd != -1) {
close(stdout_fd);
}
if (stderr_fd != -1) {
close(stderr_fd);
}
}
int64_t Runner::Run(const std::string &command, const std::string &shell, Vec<String> args) {
pid_t pid = fork();
if (pid == -1) {
std::cerr << "[PMC] (cc) Unable to fork\n";
return -1;
} else if (pid == 0) {
setsid();
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
dup2(stdout_fd, STDOUT_FILENO);
dup2(stderr_fd, STDERR_FILENO);
std::vector<const char*> argsArray;
argsArray.push_back(shell.c_str());
transform(args.begin(), args.end(), std::back_inserter(argsArray),
[](rust::String& arg) { return arg.c_str(); });
argsArray.push_back(command.c_str());
argsArray.push_back((char *)nullptr);
if (execvp(shell.c_str(), const_cast<char* const*>(argsArray.data())) == -1) {
std::cerr << "[PMC] (cc) Unable to execute the command\n";
exit(EXIT_FAILURE);
}
} else {
close(stdout_fd);
close(stderr_fd);
return pid;
}
return -1;
}}
diff --git a/src/cli.rs b/src/cli.rs
index 9c99466..698c560 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -1,594 +1,594 @@
use colored::Colorize;
-use global_placeholders::global;
use macros_rs::{crashln, string, ternary};
use psutil::process::{MemoryInfo, Process};
use regex::Regex;
use serde::Serialize;
use serde_json::json;
use std::env;
use pmc::{
- config,
- file::{self, Exists},
+ config, file,
helpers::{self, ColoredString},
log,
- process::{
- http::{self, LogResponse},
- ItemSingle, Runner,
- },
+ process::{http, ItemSingle, Runner},
};
use tabled::{
settings::{
object::{Columns, Rows},
style::{BorderColor, Style},
themes::Colorization,
Color, Modify, Rotate, Width,
},
Table, Tabled,
};
#[derive(Clone, Debug)]
pub enum Args {
Id(usize),
Script(String),
}
fn format(server_name: &String) -> (String, String) {
- let kind = ternary!(server_name == "internal", "", "remote ").to_string();
+ let kind = ternary!(matches!(&**server_name, "internal" | "local"), "", "remote ").to_string();
return (kind, server_name.to_string());
}
pub fn get_version(short: bool) -> String {
return match short {
true => format!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")),
false => format!("{} ({} {}) [{}]", env!("CARGO_PKG_VERSION"), env!("GIT_HASH"), env!("BUILD_DATE"), env!("PROFILE")),
};
}
pub fn start(name: &Option<String>, args: &Option<Args>, watch: &Option<String>, server_name: &String) {
let mut runner = Runner::new();
let config = config::read();
let (kind, list_name) = format(server_name);
match args {
Some(Args::Id(id)) => {
let runner: Runner = Runner::new();
println!("{} Applying {kind}action restartProcess on ({id})", *helpers::SUCCESS);
- if *server_name == "internal" {
+ if matches!(&**server_name, "internal" | "local") {
let mut item = runner.get(*id);
match watch {
Some(path) => item.watch(path),
None => item.disable_watch(),
}
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
log!("process started (id={id})");
} else {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.get(), false) {
Some(remote) => {
let mut item = remote.get(*id);
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
}
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
- };
- }
+ }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
}
println!("{} restarted {kind}({id}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
Some(Args::Script(script)) => {
let name = match name {
Some(name) => string!(name),
None => string!(script.split_whitespace().next().unwrap_or_default()),
};
- if *server_name == "internal" {
+ if matches!(&**server_name, "internal" | "local") {
let pattern = Regex::new(r"(?m)^[a-zA-Z0-9]+(/[a-zA-Z0-9]+)*(\.js|\.ts)?$").unwrap();
if pattern.is_match(script) {
let script = format!("{} {script}", config.runner.node);
runner.start(&name, &script, file::cwd(), watch).save();
} else {
runner.start(&name, script, file::cwd(), watch).save();
}
log!("process created (name={name})");
} else {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.get(), false) {
Some(mut remote) => remote.start(&name, script, file::cwd(), watch),
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
}
println!("{} Creating {kind}process with ({name})", *helpers::SUCCESS);
println!("{} {kind}created ({name}) ✓", *helpers::SUCCESS);
list(&string!("default"), &list_name);
}
None => {}
}
}
pub fn stop(id: &usize, server_name: &String) {
let mut runner: Runner = Runner::new();
let (kind, list_name) = format(server_name);
println!("{} Applying {kind}action stopProcess on ({id})", *helpers::SUCCESS);
- if *server_name != "internal" {
+ if !matches!(&**server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
runner = match Runner::connect(server_name.clone(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
}
runner.get(*id).stop();
println!("{} stopped {kind}({id}) ✓", *helpers::SUCCESS);
log!("process stopped {kind}(id={id})");
list(&string!("default"), &list_name);
}
pub fn remove(id: &usize, server_name: &String) {
let mut runner: Runner = Runner::new();
let (kind, _) = format(server_name);
println!("{} Applying {kind}action removeProcess on ({id})", *helpers::SUCCESS);
- if *server_name != "internal" {
+ if !matches!(&**server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
runner = match Runner::connect(server_name.clone(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to remove (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
}
runner.remove(*id);
println!("{} removed {kind}({id}) ✓", *helpers::SUCCESS);
log!("process removed (id={id})");
}
pub fn info(id: &usize, format: &String, server_name: &String) {
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "error log path ")]
log_error: String,
#[tabled(rename = "out log path")]
log_out: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "path hash")]
hash: String,
#[tabled(rename = "watching")]
watch: String,
#[tabled(rename = "exec cwd")]
path: String,
#[tabled(rename = "script command ")]
command: String,
#[tabled(rename = "script id")]
id: String,
restarts: u64,
uptime: String,
pid: String,
name: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"id": &self.id.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"path": &self.path.trim(),
"restarts": &self.restarts,
"watch": &self.watch.trim(),
"watch": &self.hash.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"log_out": &self.log_out.trim(),
"cpu": &self.cpu_percent.trim(),
"command": &self.command.trim(),
"mem": &self.memory_usage.trim(),
"log_error": &self.log_error.trim(),
});
trimmed_json.serialize(serializer)
}
}
let render_info = |data: Vec<Info>| {
let table = Table::new(data.clone())
.with(Rotate::Left)
.with(Style::rounded().remove_horizontals())
.with(Colorization::exact([Color::FG_CYAN], Columns::first()))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.to_string();
if let Ok(json) = serde_json::to_string(&data[0]) {
match format.as_str() {
"raw" => println!("{:?}", data[0]),
"json" => println!("{json}"),
_ => {
println!("{}\n{table}\n", format!("Describing process with id ({id})").on_bright_white().black());
println!(" {}", format!("Use `pmc logs {id} [--lines <num>]` to display logs").white());
println!(" {}", format!("Use `pmc env {id}` to display environment variables").white());
}
};
};
};
- if *server_name == "internal" {
+ if matches!(&**server_name, "internal" | "local") {
if let Some(home) = home::home_dir() {
let config = config::read().runner;
let mut runner = Runner::new();
let item = runner.process(*id);
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
let path = file::make_relative(&item.path, &home).to_string_lossy().into_owned();
if let Ok(mut process) = Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
let data = vec![Info {
cpu_percent,
memory_usage,
id: string!(id),
restarts: item.restarts,
name: item.name.clone(),
+ log_out: item.logs().out,
path: format!("{} ", path),
+ log_error: item.logs().error,
status: ColoredString(status),
- log_out: global!("pmc.logs.out", item.name.as_str()),
- log_error: global!("pmc.logs.error", item.name.as_str()),
pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script),
hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
}];
render_info(data)
} else {
crashln!("{} Impossible to get your home directory", *helpers::FAIL);
}
} else {
- let mut item: Option<(pmc::process::Process, Runner)> = None;
-
+ let data: (pmc::process::Process, Runner);
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
- item = match Runner::connect(server_name.clone(), server.get(), false) {
- Some(mut remote) => Some((remote.process(*id).clone(), remote)),
+ data = match Runner::connect(server_name.clone(), server.get(), false) {
+ Some(mut remote) => (remote.process(*id).clone(), remote),
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
- if let Some((item, remote)) = item {
- let remote = remote.remote.unwrap();
- let info = http::info(&remote, *id);
- let path = item.path.to_string_lossy().into_owned();
+ let (item, remote) = data;
+ let remote = remote.remote.unwrap();
+ let info = http::info(&remote, *id);
+ let path = item.path.to_string_lossy().into_owned();
- let status = if item.running {
- "online ".green().bold()
- } else {
- match item.crash.crashed {
- true => "crashed ",
- false => "stopped ",
- }
- .red()
- .bold()
- };
+ let status = if item.running {
+ "online ".green().bold()
+ } else {
+ match item.crash.crashed {
+ true => "crashed ",
+ false => "stopped ",
+ }
+ .red()
+ .bold()
+ };
- if let Ok(info) = info {
- let stats = info.json::<ItemSingle>().unwrap().stats;
+ if let Ok(info) = info {
+ let stats = info.json::<ItemSingle>().unwrap().stats;
- let cpu_percent = match stats.cpu_percent {
- Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
- };
+ let cpu_percent = match stats.cpu_percent {
+ Some(percent) => format!("{percent:.2}%"),
+ None => string!("0%"),
+ };
- let memory_usage = match stats.memory_usage {
- Some(usage) => helpers::format_memory(usage.rss),
- None => string!("0b"),
- };
+ let memory_usage = match stats.memory_usage {
+ Some(usage) => helpers::format_memory(usage.rss),
+ None => string!("0b"),
+ };
- let data = vec![Info {
- cpu_percent,
- memory_usage,
- id: string!(id),
- path: path.clone(),
- restarts: item.restarts,
- name: item.name.clone(),
- status: ColoredString(status),
- log_out: format!("{}/{{}}-out.log", remote.config.log_path),
- log_error: format!("{}/{{}}-error.log", remote.config.log_path),
- pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
- hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
- command: format!("{} {} '{}'", remote.config.shell, remote.config.args.join(" "), item.script),
- watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
- uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
- }];
-
- render_info(data)
- }
+ let data = vec![Info {
+ cpu_percent,
+ memory_usage,
+ id: string!(id),
+ path: path.clone(),
+ status: status.into(),
+ restarts: item.restarts,
+ name: item.name.clone(),
+ pid: ternary!(item.running, format!("{pid}", pid = item.pid), string!("n/a")),
+ log_out: format!("{}/{}-out.log", remote.config.log_path, item.name),
+ log_error: format!("{}/{}-error.log", remote.config.log_path, item.name),
+ hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
+ command: format!("{} {} '{}'", remote.config.shell, remote.config.args.join(" "), item.script),
+ watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
+ uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
+ }];
+
+ render_info(data)
}
}
}
pub fn logs(id: &usize, lines: &usize, server_name: &String) {
let mut runner: Runner = Runner::new();
- if *server_name != "internal" {
+ if !matches!(&**server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
runner = match Runner::connect(server_name.clone(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
-
- let item = runner.clone().process(*id).clone();
- let log_out = http::logs(&runner.remote.as_ref().unwrap(), *id, "out");
- let log_error = http::logs(&runner.remote.as_ref().unwrap(), *id, "error");
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
+ let item = runner.info(*id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL));
println!("{}", format!("Showing last {lines} lines for process [{id}] (change the value with --lines option)").yellow());
- if let Ok(logs) = log_error {
- let logs = logs.json::<LogResponse>().unwrap().logs;
- file::logs_internal(logs, *lines, &item.name, *id, "error", &item.name)
- }
+ for kind in vec!["error", "out"] {
+ let logs = http::logs(&runner.remote.as_ref().unwrap(), *id, kind);
+
+ if let Ok(log) = logs {
+ if log.lines.is_empty() {
+ println!("{} No logs found for {}/{kind}", *helpers::FAIL, item.name);
+ continue;
+ }
- if let Ok(logs) = log_out {
- let logs = logs.json::<LogResponse>().unwrap().logs;
- file::logs_internal(logs, *lines, &item.name, *id, "out", &item.name)
+ file::logs_internal(log.lines, *lines, log.path, *id, kind, &item.name)
+ }
}
} else {
- let item = runner.process(*id);
- let log_out = global!("pmc.logs.out", item.name.as_str());
- let log_error = global!("pmc.logs.error", item.name.as_str());
-
- if Exists::check(&log_error).file() && Exists::check(&log_out).file() {
- println!("{}", format!("Showing last {lines} lines for process [{id}] (change the value with --lines option)").yellow());
+ let item = runner.info(*id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL));
+ println!("{}", format!("Showing last {lines} lines for process [{id}] (change the value with --lines option)").yellow());
- file::logs(*lines, &log_error, *id, "error", &item.name);
- file::logs(*lines, &log_out, *id, "out", &item.name);
- } else {
- crashln!("{} Logs for process ({id}) not found", *helpers::FAIL);
- }
+ file::logs(item, *lines, "error");
+ file::logs(item, *lines, "out");
}
}
pub fn env(id: &usize, server_name: &String) {
let mut runner: Runner = Runner::new();
- if *server_name != "internal" {
+ if !matches!(&**server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
- crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(server_name) {
runner = match Runner::connect(server_name.clone(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={server_name}, address={})", *helpers::FAIL, server.address),
};
- }
+ } else {
+ crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL)
+ };
}
let item = runner.process(*id);
item.env.iter().for_each(|(key, value)| println!("{}: {}", key, value.green()));
}
pub fn list(format: &String, server_name: &String) {
let render_list = |runner: &mut Runner, internal: bool| {
let mut processes: Vec<ProcessItem> = Vec::new();
#[derive(Tabled, Debug)]
struct ProcessItem {
id: ColoredString,
name: String,
pid: String,
uptime: String,
#[tabled(rename = "↺")]
restarts: String,
status: ColoredString,
cpu: String,
mem: String,
#[tabled(rename = "watching")]
watch: String,
}
impl serde::Serialize for ProcessItem {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"cpu": &self.cpu.trim(),
"mem": &self.mem.trim(),
"id": &self.id.0.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"watch": &self.watch.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"restarts": &self.restarts.trim(),
});
trimmed_json.serialize(serializer)
}
}
if runner.is_empty() {
println!("{} Process table empty", *helpers::SUCCESS);
} else {
for (id, item) in runner.items() {
let mut cpu_percent: String = string!("0%");
let mut memory_usage: String = string!("0b");
if internal {
let mut usage_internals: (Option<f32>, Option<MemoryInfo>) = (None, None);
if let Ok(mut process) = Process::new(item.pid as u32) {
usage_internals = (process.cpu_percent().ok(), process.memory_info().ok());
}
cpu_percent = match usage_internals.0 {
Some(percent) => format!("{:.0}%", percent),
None => string!("0%"),
};
memory_usage = match usage_internals.1 {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
} else {
let info = http::info(&runner.remote.as_ref().unwrap(), id);
if let Ok(info) = info {
let stats = info.json::<ItemSingle>().unwrap().stats;
cpu_percent = match stats.cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
memory_usage = match stats.memory_usage {
Some(usage) => helpers::format_memory(usage.rss),
None => string!("0b"),
};
}
}
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
processes.push(ProcessItem {
- status: ColoredString(status),
+ status: status.into(),
cpu: format!("{cpu_percent} "),
mem: format!("{memory_usage} "),
+ id: id.to_string().cyan().bold().into(),
restarts: format!("{} ", item.restarts),
name: format!("{} ", item.name.clone()),
- id: ColoredString(id.to_string().cyan().bold()),
pid: ternary!(item.running, format!("{} ", item.pid), string!("n/a ")),
watch: ternary!(item.watch.enabled, format!("{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{} ", helpers::format_duration(item.started)), string!("none ")),
});
}
let table = Table::new(&processes)
.with(Style::rounded().remove_verticals())
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.with(Colorization::exact([Color::FG_BRIGHT_CYAN], Rows::first()))
.with(Modify::new(Columns::single(1)).with(Width::truncate(35).suffix("... ")))
.to_string();
if let Ok(json) = serde_json::to_string(&processes) {
match format.as_str() {
"raw" => println!("{:?}", processes),
"json" => println!("{json}"),
"default" => println!("{table}"),
_ => {}
};
};
}
};
if let Some(servers) = config::servers().servers {
let mut failed: Vec<(String, String)> = vec![];
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.get(), true) {
Some(mut remote) => render_list(&mut remote, false),
None => println!("{} Failed to fetch (name={server_name}, address={})", *helpers::FAIL, server.address),
}
} else {
- if matches!(&**server_name, "internal" | "all") {
+ if matches!(&**server_name, "internal" | "all" | "local") {
println!("{} Internal daemon", *helpers::SUCCESS);
render_list(&mut Runner::new(), true);
} else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL);
}
}
if *server_name == "all" {
for (name, server) in servers {
match Runner::connect(name.clone(), server.get(), true) {
Some(mut remote) => render_list(&mut remote, false),
None => failed.push((name, server.address)),
}
}
}
if !failed.is_empty() {
println!("{} Failed servers:", *helpers::FAIL);
failed
.iter()
.for_each(|server| println!(" {} {} {}", "-".yellow(), format!("{}", server.0), format!("[{}]", server.1).white()));
}
} else {
render_list(&mut Runner::new(), true);
}
}
diff --git a/src/daemon/api/routes.rs b/src/daemon/api/routes.rs
index 06634bd..07b1c6d 100644
--- a/src/daemon/api/routes.rs
+++ b/src/daemon/api/routes.rs
@@ -1,502 +1,502 @@
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{string, ternary, then};
use prometheus::{Encoder, TextEncoder};
use psutil::process::{MemoryInfo, Process};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::convert::Infallible;
use tera::{Context, Tera};
use utoipa::ToSchema;
use pmc::{
config, file, helpers,
process::{dump, Runner},
};
use crate::daemon::{
api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM},
pid,
};
use warp::{
hyper::body::Body,
reject,
reply::{self, json, Response},
Rejection, Reply,
};
use std::{
env,
fs::{self, File},
io::{self, BufRead, BufReader},
path::PathBuf,
};
#[allow(dead_code)]
#[derive(ToSchema)]
#[schema(as = MemoryInfo)]
pub(crate) struct DocMemoryInfo {
rss: u64,
vms: u64,
#[cfg(target_os = "linux")]
shared: u64,
#[cfg(target_os = "linux")]
text: u64,
#[cfg(target_os = "linux")]
data: u64,
#[cfg(target_os = "macos")]
page_faults: u64,
#[cfg(target_os = "macos")]
pageins: u64,
}
#[derive(Deserialize, ToSchema)]
pub(crate) struct ActionBody {
#[schema(example = "restart")]
method: String,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct ConfigBody {
#[schema(example = "bash")]
shell: String,
#[schema(min_items = 1, example = json!(["-c"]))]
args: Vec<String>,
#[schema(example = "/home/user/.pmc/logs")]
log_path: String,
}
#[derive(Deserialize, ToSchema)]
pub(crate) struct CreateBody {
#[schema(example = "app")]
name: Option<String>,
#[schema(example = "node index.js")]
script: String,
#[schema(value_type = String, example = "/projects/app")]
path: PathBuf,
#[schema(example = "src")]
watch: Option<String>,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct ActionResponse<'a> {
#[schema(example = true)]
done: bool,
#[schema(example = "name")]
action: &'a str,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct LogResponse {
logs: Vec<String>,
}
#[derive(Serialize, ToSchema)]
pub struct MetricsRoot<'a> {
pub version: Version<'a>,
pub daemon: Daemon,
}
#[derive(Serialize, ToSchema)]
pub struct Version<'a> {
#[schema(example = "v1.0.0")]
pub pkg: String,
pub hash: &'a str,
#[schema(example = "2000-01-01")]
pub build_date: &'a str,
#[schema(example = "release")]
pub target: &'a str,
}
#[derive(Serialize, ToSchema)]
pub struct Daemon {
pub pid: Option<i32>,
#[schema(example = true)]
pub running: bool,
pub uptime: String,
pub process_count: usize,
#[schema(example = "default")]
pub daemon_type: String,
pub stats: Stats,
}
#[derive(Serialize, ToSchema)]
pub struct Stats {
pub memory_usage: String,
pub cpu_percent: String,
}
#[inline]
fn attempt(done: bool, method: &str) -> reply::Json {
let data = json!(ActionResponse {
done,
action: ternary!(done, method, "DOES_NOT_EXIST")
});
json(&data)
}
#[inline]
fn render(name: &str, tmpl: &Tera, ctx: &Context) -> Result<String, Rejection> { tmpl.render(name, &ctx).or(Err(reject::not_found())) }
#[inline]
pub async fn login(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("login", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn dashboard(store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
let payload = render("dashboard", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
pub async fn view_process(id: usize, store: (Tera, String)) -> Result<Box<dyn Reply>, Rejection> {
let mut ctx = Context::new();
let (tmpl, path) = store;
ctx.insert("base_path", &path);
ctx.insert("process_id", &id);
let payload = render("view", &tmpl, &ctx)?;
Ok(Box::new(reply::html(payload)))
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/prometheus",
responses((status = 200, description = "Get prometheus metrics", body = String))
)]
pub async fn prometheus_handler() -> Result<impl Reply, Infallible> {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
Ok(format!("{}", String::from_utf8(buffer.clone()).unwrap()))
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/dump", security(()),
responses((status = 200, description = "Dump processes successfully", body = [u8]))
)]
pub async fn dump_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(dump::raw())
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/config",
responses((status = 200, description = "Get daemon config successfully", body = ConfigBody))
)]
pub async fn config_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
let config = config::read().runner;
HTTP_COUNTER.inc();
let response = json!(ConfigBody {
shell: config.shell,
args: config.args,
log_path: config.log_path,
});
timer.observe_duration();
Ok(json(&response))
}
#[inline]
#[utoipa::path(get, path = "/list", tag = "Process",
responses((status = 200, description = "List processes successfully", body = [ProcessItem]))
)]
pub async fn list_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
let data = Runner::new().json();
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(json(&data))
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
- "out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
- "error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
- _ => global!("pmc.logs.out", item.name.as_str()),
+ "out" | "stdout" => item.logs().out,
+ "error" | "stderr" => item.logs().error,
+ _ => item.logs().out,
};
match File::open(log_file) {
Ok(data) => {
let reader = BufReader::new(data);
let logs: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
timer.observe_duration();
Ok(json(&json!(LogResponse { logs })))
}
Err(_) => Ok(json(&json!(LogResponse { logs: vec![] }))),
}
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw",
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched raw", body = String),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn log_handler_raw(id: usize, kind: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
- "out" | "stdout" => global!("pmc.logs.out", item.name.as_str()),
- "error" | "stderr" => global!("pmc.logs.error", item.name.as_str()),
- _ => global!("pmc.logs.out", item.name.as_str()),
+ "out" | "stdout" => item.logs().out,
+ "error" | "stderr" => item.logs().error,
+ _ => item.logs().out,
};
- let data = match fs::read_to_string(log_file) {
- Ok(data) => data,
+ let data = match fs::read_to_string(&log_file) {
+ Ok(data) => format!("# PATH {log_file}\n{data}"),
Err(err) => err.to_string(),
};
timer.observe_duration();
Ok(Response::new(Body::from(data)))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/info",
params(("id" = usize, Path, description = "Process id to get information for", example = 0)),
responses(
(status = 200, description = "Current process info retrieved", body = ItemSingle),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn info_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
let runner = Runner::new();
let mut item = runner.get(id);
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(json(&item.json()))
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = CreateBody),
responses(
(status = 200, description = "Create process successful", body = ActionResponse),
(status = INTERNAL_SERVER_ERROR, description = "Failed to create process", body = ErrorMessage)
)
)]
pub async fn create_handler(body: CreateBody) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["create"]).start_timer();
let mut runner = Runner::new();
HTTP_COUNTER.inc();
let name = match body.name {
Some(name) => string!(name),
None => string!(body.script.split_whitespace().next().unwrap_or_default()),
};
runner.start(&name, &body.script, body.path, &body.watch).save();
timer.observe_duration();
Ok(attempt(true, "create"))
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/rename", request_body(content = String),
params(("id" = usize, Path, description = "Process id to rename", example = 0)),
responses(
(status = 200, description = "Rename process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn rename_handler(id: usize, body: String) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
let mut runner = Runner::new();
let process = runner.clone().process(id).clone();
HTTP_COUNTER.inc();
if runner.exists(id) {
let mut item = runner.get(id);
item.rename(body.trim().replace("\n", ""));
then!(process.running, item.restart());
timer.observe_duration();
Ok(attempt(true, "rename"))
} else {
timer.observe_duration();
Err(reject::not_found())
}
}
#[inline]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/env",
params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)),
responses(
(status = 200, description = "Current process env", body = HashMap<String, String>),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage)
)
)]
pub async fn env_handler(id: usize) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(json(&item.clone().env))
}
None => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody,
params(("id" = usize, Path, description = "Process id to run action on", example = 0)),
responses(
(status = 200, description = "Run action on process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage)
)
)]
pub async fn action_handler(id: usize, body: ActionBody) -> Result<impl Reply, Rejection> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
let mut runner = Runner::new();
let method = body.method.as_str();
HTTP_COUNTER.inc();
match method {
"start" | "restart" => {
runner.get(id).restart();
timer.observe_duration();
Ok(attempt(true, method))
}
"stop" | "kill" => {
runner.get(id).stop();
timer.observe_duration();
Ok(attempt(true, method))
}
"remove" | "delete" => {
runner.remove(id);
timer.observe_duration();
Ok(attempt(true, method))
}
_ => {
timer.observe_duration();
Err(reject::not_found())
}
}
}
#[inline]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/metrics",
responses((status = 200, description = "Get daemon metrics", body = MetricsRoot))
)]
pub async fn metrics_handler() -> Result<impl Reply, Infallible> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer();
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f32> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_rmp(global!("pmc.dump"));
HTTP_COUNTER.inc();
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(mut process) = Process::new(process_id as u32) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
}
}
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
let response = json!(MetricsRoot {
version: Version {
pkg: format!("v{}", env!("CARGO_PKG_VERSION")),
hash: env!("GIT_HASH_FULL"),
build_date: env!("BUILD_DATE"),
target: env!("PROFILE"),
},
daemon: Daemon {
pid: pid,
running: pid::exists(),
uptime: uptime,
process_count: runner.count(),
daemon_type: global!("pmc.daemon.kind"),
stats: Stats { memory_usage, cpu_percent }
}
});
timer.observe_duration();
Ok(json(&response))
}
diff --git a/src/file.rs b/src/file.rs
index ea27250..0e236d4 100644
--- a/src/file.rs
+++ b/src/file.rs
@@ -1,117 +1,129 @@
-use crate::{helpers, log};
+use crate::{helpers, log, process::Process};
use colored::Colorize;
use macros_rs::{crashln, string, ternary};
use std::{
env,
fs::{self, File},
- io::{self, BufRead, BufReader},
+ io::{BufRead, BufReader},
path::{Path, PathBuf},
thread::sleep,
time::Duration,
};
-pub fn logs(lines_to_tail: usize, log_file: &str, id: usize, log_type: &str, item_name: &str) {
- let file = File::open(log_file).unwrap();
- let reader = BufReader::new(file);
- let lines: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
+pub fn logs(item: &Process, lines_to_tail: usize, kind: &str) {
+ let log_file = match kind {
+ "out" => item.logs().out,
+ "error" => item.logs().error,
+ _ => item.logs().out,
+ };
+
+ if !Exists::check(&log_file).empty() {
+ let file = File::open(&log_file).unwrap();
+ let reader = BufReader::new(file);
+ let lines = reader.lines().map(|line| line.unwrap_or_else(|err| format!("error reading line: {err}"))).collect();
- logs_internal(lines, lines_to_tail, log_file, id, log_type, item_name)
+ logs_internal(lines, lines_to_tail, &log_file, item.id, kind, &item.name)
+ } else {
+ println!("{} No logs found in {log_file}", *helpers::FAIL)
+ }
}
pub fn logs_internal(lines: Vec<String>, lines_to_tail: usize, log_file: &str, id: usize, log_type: &str, item_name: &str) {
- let color = ternary!(log_type == "out", "green", "red");
println!("{}", format!("\n{log_file} last {lines_to_tail} lines:").bright_black());
+ let color = ternary!(log_type == "out", "green", "red");
let start_index = if lines.len() > lines_to_tail { lines.len() - lines_to_tail } else { 0 };
+
for (_, line) in lines.iter().skip(start_index).enumerate() {
println!("{} {}", format!("{}|{} |", id, item_name).color(color), line);
}
}
pub fn cwd() -> PathBuf {
match env::current_dir() {
Ok(path) => path,
Err(_) => crashln!("{} Unable to find current working directory", *helpers::FAIL),
}
}
pub fn make_relative(current: &Path, home: &Path) -> PathBuf {
match current.strip_prefix(home) {
Err(_) => Path::new(home).join(current),
Ok(relative_path) => Path::new("~").join(relative_path),
}
}
pub struct Exists<'p> {
path: &'p str,
}
impl<'p> Exists<'p> {
pub fn check(path: &'p str) -> Self { Self { path } }
pub fn folder(&self) -> bool { Path::new(self.path).is_dir() }
pub fn file(&self) -> bool { Path::new(self.path).exists() }
+ pub fn empty(&self) -> bool { fs::metadata(Path::new(self.path)).map(|m| m.len() == 0).unwrap_or(true) }
}
pub fn raw(path: String) -> Vec<u8> {
match fs::read(&path) {
Ok(contents) => contents,
Err(err) => crashln!("{} Cannot find dumpfile.\n{}", *helpers::FAIL, string!(err).white()),
}
}
pub fn read<T: serde::de::DeserializeOwned>(path: String) -> T {
let contents = match fs::read_to_string(&path) {
Ok(contents) => contents,
Err(err) => crashln!("{} Cannot find dumpfile.\n{}", *helpers::FAIL, string!(err).white()),
};
match toml::from_str(&contents).map_err(|err| string!(err)) {
Ok(parsed) => parsed,
Err(err) => crashln!("{} Cannot parse dumpfile.\n{}", *helpers::FAIL, err.white()),
}
}
pub fn from_rmp<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> T {
match rmp_serde::from_slice(&bytes) {
Ok(parsed) => parsed,
Err(err) => crashln!("{} Cannot parse file.\n{}", *helpers::FAIL, string!(err).white()),
}
}
pub fn read_rmp<T: serde::de::DeserializeOwned>(path: String) -> T {
let mut retry_count = 0;
let max_retries = 5;
let bytes = loop {
match fs::read(&path) {
Ok(contents) => break contents,
Err(err) => {
retry_count += 1;
if retry_count >= max_retries {
log!("{} Cannot find file.\n{}", *helpers::FAIL, string!(err).white());
} else {
log!("{} Error reading file. Retrying... (Attempt {}/{})", *helpers::FAIL, retry_count, max_retries);
}
}
}
sleep(Duration::from_secs(1));
};
retry_count = 0;
loop {
match rmp_serde::from_slice(&bytes) {
Ok(parsed) => break parsed,
Err(err) => {
retry_count += 1;
if retry_count >= max_retries {
log!("{} Cannot parse file.\n{}", *helpers::FAIL, string!(err).white());
} else {
log!("{} Error parsing file. Retrying... (Attempt {}/{})", *helpers::FAIL, retry_count, max_retries);
}
}
}
sleep(Duration::from_secs(1));
}
}
diff --git a/src/helpers.rs b/src/helpers.rs
index 29eb38c..58fa211 100644
--- a/src/helpers.rs
+++ b/src/helpers.rs
@@ -1,53 +1,57 @@
use chrono::{DateTime, Utc};
use colored::Colorize;
use core::fmt;
use once_cell::sync::Lazy;
use regex::Regex;
pub static SUCCESS: Lazy<colored::ColoredString> = Lazy::new(|| "[PMC]".green());
pub static FAIL: Lazy<colored::ColoredString> = Lazy::new(|| "[PMC]".red());
#[derive(Clone, Debug)]
pub struct ColoredString(pub colored::ColoredString);
impl serde::Serialize for ColoredString {
fn serialize<S: serde::ser::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let re = Regex::new(r"\x1B\[([0-9;]+)m").unwrap();
let colored_string = &self.0;
let stripped_string = re.replace_all(colored_string, "").to_string();
serializer.serialize_str(&stripped_string)
}
}
+impl From<colored::ColoredString> for ColoredString {
+ fn from(cs: colored::ColoredString) -> Self { ColoredString(cs) }
+}
+
impl fmt::Display for ColoredString {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) }
}
pub fn format_duration(datetime: DateTime<Utc>) -> String {
let current_time = Utc::now();
let duration = current_time.signed_duration_since(datetime);
match duration.num_seconds() {
s if s >= 86400 => format!("{}d", s / 86400),
s if s >= 3600 => format!("{}h", s / 3600),
s if s >= 60 => format!("{}m", s / 60),
s => format!("{}s", s),
}
}
pub fn format_memory(bytes: u64) -> String {
const UNIT: f64 = 1024.0;
const SUFFIX: [&str; 4] = ["b", "kb", "mb", "gb"];
let size = bytes as f64;
let base = size.log10() / UNIT.log10();
if size <= 0.0 {
return "0b".to_string();
}
let mut buffer = ryu::Buffer::new();
let result = buffer.format((UNIT.powf(base - base.floor()) * 10.0).round() / 10.0).trim_end_matches(".0");
[result, SUFFIX[base.floor() as usize]].join("")
}
diff --git a/src/process/http.rs b/src/process/http.rs
index 1e63271..6c53d5a 100644
--- a/src/process/http.rs
+++ b/src/process/http.rs
@@ -1,78 +1,84 @@
use crate::process::Remote;
use macros_rs::{fmtstr, string};
use reqwest::blocking::{Client, Response};
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
-use serde::{Deserialize, Serialize};
+use serde::Serialize;
use std::path::PathBuf;
#[derive(Serialize)]
struct ActionBody {
pub method: String,
}
-#[derive(Deserialize)]
pub struct LogResponse {
- pub logs: Vec<String>,
+ pub path: &'static str,
+ pub lines: Vec<String>,
}
#[derive(Serialize)]
struct CreateBody<'c> {
pub name: &'c String,
pub script: &'c String,
pub path: PathBuf,
pub watch: &'c Option<String>,
}
fn client(token: &Option<String>) -> (Client, HeaderMap) {
let client = Client::new();
let mut headers = HeaderMap::new();
if let Some(token) = token {
headers.insert(AUTHORIZATION, HeaderValue::from_static(fmtstr!("token {token}")));
}
return (client, headers);
}
pub fn info(Remote { address, token, .. }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
Ok(client.get(fmtstr!("{address}/process/{id}/info")).headers(headers).send()?)
}
-pub fn logs(Remote { address, token, .. }: &Remote, id: usize, kind: &str) -> Result<Response, anyhow::Error> {
+pub fn logs(Remote { address, token, .. }: &Remote, id: usize, kind: &str) -> Result<LogResponse, anyhow::Error> {
let (client, headers) = client(token);
- Ok(client.get(fmtstr!("{address}/process/{id}/logs/{kind}")).headers(headers).send()?)
+ let response = client.get(fmtstr!("{address}/process/{id}/logs/{kind}/raw")).headers(headers).send()?;
+ let log = response.text()?;
+
+ Ok(LogResponse {
+ lines: log.lines().skip(1).map(|line| line.to_string()).collect::<Vec<String>>(),
+ path: Box::leak(Box::from(log.lines().next().unwrap_or("").split_whitespace().last().unwrap_or(""))),
+ })
}
pub fn create(Remote { address, token, .. }: &Remote, name: &String, script: &String, path: PathBuf, watch: &Option<String>) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = CreateBody { name, script, path, watch };
Ok(client.post(fmtstr!("{address}/process/create")).json(&content).headers(headers).send()?)
}
pub fn restart(Remote { address, token, .. }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("restart") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
pub fn rename(Remote { address, token, .. }: &Remote, id: usize, name: String) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
Ok(client.post(fmtstr!("{address}/process/{id}/rename")).body(name).headers(headers).send()?)
}
pub fn stop(Remote { address, token, .. }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("stop") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
pub fn remove(Remote { address, token, .. }: &Remote, id: usize) -> Result<Response, anyhow::Error> {
let (client, headers) = client(token);
let content = ActionBody { method: string!("remove") };
Ok(client.post(fmtstr!("{address}/process/{id}/action")).json(&content).headers(headers).send()?)
}
diff --git a/src/process/mod.rs b/src/process/mod.rs
index 4b50cd3..de2ebb3 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -1,523 +1,535 @@
use crate::{
config,
config::structs::Server,
file, helpers,
service::{run, stop, ProcessMetadata},
};
use std::{
env,
path::PathBuf,
sync::{Arc, Mutex},
};
use chrono::serde::ts_milliseconds;
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{crashln, string, ternary, then};
use psutil::process;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::{BTreeMap, HashMap};
use utoipa::ToSchema;
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemSingle {
pub info: Info,
pub stats: Stats,
pub watch: Watch,
pub log: Log,
pub raw: Raw,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Info {
pub id: usize,
pub pid: i64,
pub name: String,
pub status: String,
#[schema(value_type = String, example = "/path")]
pub path: PathBuf,
pub uptime: String,
pub command: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Stats {
pub restarts: u64,
pub start_time: i64,
pub cpu_percent: Option<f32>,
pub memory_usage: Option<MemoryInfo>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct MemoryInfo {
pub rss: u64,
pub vms: u64,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Log {
pub out: String,
pub error: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Raw {
pub running: bool,
pub crashed: bool,
pub crashes: u64,
}
+#[derive(Clone)]
+pub struct LogInfo {
+ pub out: String,
+ pub error: String,
+}
+
#[derive(Serialize, ToSchema)]
pub struct ProcessItem {
pid: i64,
id: usize,
cpu: String,
mem: String,
name: String,
restarts: u64,
status: String,
uptime: String,
#[schema(example = "/path")]
watch_path: String,
#[schema(value_type = String, example = "2000-01-01T01:00:00.000Z")]
start_time: DateTime<Utc>,
}
#[derive(Clone)]
pub struct ProcessWrapper {
pub id: usize,
pub runner: Arc<Mutex<Runner>>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Process {
pub id: usize,
pub pid: i64,
pub name: String,
pub path: PathBuf,
pub script: String,
pub env: HashMap<String, String>,
#[serde(with = "ts_milliseconds")]
pub started: DateTime<Utc>,
pub restarts: u64,
pub running: bool,
pub crash: Crash,
pub watch: Watch,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Crash {
pub crashed: bool,
pub value: u64,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct Watch {
pub enabled: bool,
#[schema(example = "/path")]
pub path: String,
pub hash: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Runner {
pub id: id::Id,
#[serde(skip)]
pub remote: Option<Remote>,
pub list: BTreeMap<usize, Process>,
}
#[derive(Clone, Debug)]
pub struct Remote {
address: String,
token: Option<String>,
pub config: RemoteConfig,
}
#[derive(Clone, Debug, Deserialize)]
pub struct RemoteConfig {
pub shell: String,
pub args: Vec<String>,
pub log_path: String,
}
pub enum Status {
Offline,
Running,
}
impl Status {
pub fn to_bool(&self) -> bool {
match self {
Status::Offline => false,
Status::Running => true,
}
}
}
+macro_rules! lock {
+ ($runner:expr) => {{
+ match $runner.lock() {
+ Ok(runner) => runner,
+ Err(err) => crashln!("Unable to lock mutex: {err}"),
+ }
+ }};
+}
+
impl Runner {
pub fn new() -> Self { dump::read() }
pub fn connect(name: String, Server { address, token }: Server, verbose: bool) -> Option<Self> {
let remote_config = match config::from(&address, token.as_deref()) {
Ok(config) => config,
Err(err) => {
log::error!("{err}");
return None;
}
};
if let Ok(dump) = dump::from(&address, token.as_deref()) {
then!(verbose, println!("{} Fetched remote (name={name}, address={address})", *helpers::SUCCESS));
Some(Runner {
remote: Some(Remote {
token,
address: string!(address),
config: remote_config,
}),
..dump
})
} else {
None
}
}
pub fn start(&mut self, name: &String, command: &String, path: PathBuf, watch: &Option<String>) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::create(remote, name, command, path, watch) {
crashln!("{} Failed to start create {name}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let id = self.id.next();
let config = config::read().runner;
let crash = Crash { crashed: false, value: 0 };
let watch = match watch {
Some(watch) => Watch {
enabled: true,
path: string!(watch),
hash: hash::create(file::cwd().join(watch)),
},
None => Watch {
enabled: false,
path: string!(""),
hash: string!(""),
},
};
let pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
command: command.clone(),
log_path: config.log_path,
});
self.list.insert(
id,
Process {
id,
pid,
path,
watch,
crash,
restarts: 0,
running: true,
name: name.clone(),
started: Utc::now(),
script: command.clone(),
env: env::vars().collect(),
},
);
}
return self;
}
pub fn restart(&mut self, id: usize, dead: bool) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::restart(remote, id) {
crashln!("{} Failed to start process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let process = self.process(id);
let config = config::read().runner;
let Process { path, script, name, .. } = process.clone();
if let Err(err) = std::env::set_current_dir(&process.path) {
crashln!("{} Failed to set working directory {:?}\nError: {:#?}", *helpers::FAIL, path, err);
};
stop(process.pid);
process.running = false;
process.crash.crashed = false;
process.pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
log_path: config.log_path,
command: script.to_string(),
});
process.running = true;
process.started = Utc::now();
then!(!dead, process.crash.value = 0);
then!(dead, process.restarts += 1);
}
return self;
}
pub fn remove(&mut self, id: usize) {
if let Some(remote) = &self.remote {
if let Err(err) = http::remove(remote, id) {
crashln!("{} Failed to stop remove {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.stop(id);
self.list.remove(&id);
dump::write(&self);
}
}
pub fn set_id(&mut self, id: id::Id) {
self.id = id;
self.id.next();
dump::write(&self);
}
pub fn set_status(&mut self, id: usize, status: Status) {
self.process(id).running = status.to_bool();
dump::write(&self);
}
pub fn items(&mut self) -> BTreeMap<usize, Process> { self.list.clone() }
pub fn items_mut(&mut self) -> &mut BTreeMap<usize, Process> { &mut self.list }
pub fn save(&self) { then!(self.remote.is_none(), dump::write(&self)) }
pub fn count(&mut self) -> usize { self.list().count() }
pub fn is_empty(&self) -> bool { self.list.is_empty() }
pub fn exists(&mut self, id: usize) -> bool { self.list.contains_key(&id) }
- pub fn info(&mut self, id: usize) -> Option<&Process> { self.list.get(&id) }
+ pub fn info(&self, id: usize) -> Option<&Process> { self.list.get(&id) }
pub fn list<'l>(&'l mut self) -> impl Iterator<Item = (&'l usize, &'l mut Process)> { self.list.iter_mut().map(|(k, v)| (k, v)) }
pub fn process(&mut self, id: usize) -> &mut Process { self.list.get_mut(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)) }
pub fn get(self, id: usize) -> ProcessWrapper {
ProcessWrapper {
id,
runner: Arc::new(Mutex::new(self)),
}
}
pub fn set_crashed(&mut self, id: usize) -> &mut Self {
self.process(id).crash.crashed = true;
return self;
}
pub fn new_crash(&mut self, id: usize) -> &mut Self {
self.process(id).crash.value += 1;
return self;
}
pub fn stop(&mut self, id: usize) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::stop(remote, id) {
crashln!("{} Failed to stop process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let process = self.process(id);
stop(process.pid);
process.running = false;
process.crash.crashed = false;
process.crash.value = 0;
}
return self;
}
pub fn rename(&mut self, id: usize, name: String) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::rename(remote, id, name) {
crashln!("{} Failed to rename process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.process(id).name = name;
}
return self;
}
pub fn watch(&mut self, id: usize, path: &str, enabled: bool) -> &mut Self {
let process = self.process(id);
process.watch = Watch {
enabled,
path: string!(path),
hash: ternary!(enabled, hash::create(process.path.join(path)), string!("")),
};
return self;
}
pub fn json(&mut self) -> Value {
let mut processes: Vec<ProcessItem> = Vec::new();
for (id, item) in self.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
let mem_info_psutil = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
memory_usage = Some(MemoryInfo {
rss: mem_info_psutil.as_ref().unwrap().rss(),
vms: mem_info_psutil.as_ref().unwrap().vms(),
});
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss),
None => string!("0b"),
};
let status = if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
processes.push(ProcessItem {
id,
status,
pid: item.pid,
cpu: cpu_percent,
mem: memory_usage,
restarts: item.restarts,
name: item.name.clone(),
start_time: item.started,
watch_path: item.watch.path.clone(),
uptime: helpers::format_duration(item.started),
});
}
json!(processes)
}
}
-impl ProcessWrapper {
- pub fn stop(&mut self) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
- runner.stop(self.id).save();
- }
+impl Process {
+ /// Get a log paths of the process item
+ pub fn logs(&self) -> LogInfo {
+ let name = self.name.replace(" ", "_");
- pub fn watch(&mut self, path: &str) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
- runner.watch(self.id, path, true).save();
+ LogInfo {
+ out: global!("pmc.logs.out", name.as_str()),
+ error: global!("pmc.logs.error", name.as_str()),
+ }
}
+}
- pub fn disable_watch(&mut self) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
- runner.watch(self.id, "", false).save();
- }
+impl ProcessWrapper {
+ /// Stop the process item
+ pub fn stop(&mut self) { lock!(self.runner).stop(self.id).save(); }
- pub fn rename(&mut self, name: String) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
- runner.rename(self.id, name).save();
- }
+ /// Restart the process item
+ pub fn restart(&mut self) { lock!(self.runner).restart(self.id, false).save(); }
- pub fn restart(&mut self) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
- runner.restart(self.id, false).save();
- }
+ /// Rename the process item
+ pub fn rename(&mut self, name: String) { lock!(self.runner).rename(self.id, name).save(); }
+
+ /// Enable watching a path on the process item
+ pub fn watch(&mut self, path: &str) { lock!(self.runner).watch(self.id, path, true).save(); }
+
+ /// Disable watching on the process item
+ pub fn disable_watch(&mut self) { lock!(self.runner).watch(self.id, "", false).save(); }
+ /// Set the process item as crashed
pub fn crashed(&mut self) {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
+ let mut runner = lock!(self.runner);
runner.new_crash(self.id).save();
runner.restart(self.id, true).save();
}
+ /// Get a json dump of the process item
pub fn json(&mut self) -> Value {
- let runner_arc = Arc::clone(&self.runner);
- let mut runner = runner_arc.lock().unwrap();
+ let mut runner = lock!(self.runner);
let item = runner.process(self.id);
let config = config::read().runner;
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
let mem_info_psutil = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
memory_usage = Some(MemoryInfo {
rss: mem_info_psutil.as_ref().unwrap().rss(),
vms: mem_info_psutil.as_ref().unwrap().vms(),
});
}
let status = if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
json!(ItemSingle {
info: Info {
status,
id: item.id,
pid: item.pid,
name: item.name.clone(),
path: item.path.clone(),
uptime: helpers::format_duration(item.started),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script.clone()),
},
stats: Stats {
cpu_percent,
memory_usage,
restarts: item.restarts,
start_time: item.started.timestamp_millis(),
},
watch: Watch {
enabled: item.watch.enabled,
hash: item.watch.hash.clone(),
path: item.watch.path.clone(),
},
log: Log {
- out: global!("pmc.logs.out", item.name.as_str()),
- error: global!("pmc.logs.error", item.name.as_str()),
+ out: item.logs().out,
+ error: item.logs().error,
},
raw: Raw {
running: item.running,
crashed: item.crash.crashed,
crashes: item.crash.value,
}
})
}
}
pub mod dump;
pub mod hash;
pub mod http;
pub mod id;
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sun, Feb 1, 9:17 PM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
494927
Default Alt Text
(77 KB)
Attached To
Mode
rPMC Process Management Controller
Attached
Detach File
Event Timeline
Log In to Comment