Page MenuHomePhorge

No OneTemporary

Size
100 KB
Referenced Files
None
Subscribers
None
diff --git a/lib/bridge.cc b/lib/bridge.cc
index f53737a..9b8ca52 100644
--- a/lib/bridge.cc
+++ b/lib/bridge.cc
@@ -1,130 +1,129 @@
-#include <bridge.h>
-#include <process.h>
+#include "include/bridge.h"
+#include "include/process.h"
+
#include <iostream>
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <vector>
#ifdef __linux__
-#include <cstring>
-#include <string>
#include <cstdlib>
+#include <cstring>
#include <dirent.h>
+#include <string>
#include <sys/prctl.h>
#elif __APPLE__
+#include <crt_externs.h>
+#include <iostream>
#include <libproc.h>
#include <sys/proc_info.h>
-#include <libproc.h>
-#include <sys/proc_info.h>
-#include <iostream>
-#include <crt_externs.h>
#endif
using namespace std;
-void set_program_name(String name) {
- #ifdef __linux__
+extern "C++" void set_program_name(String name) {
+#ifdef __linux__
prctl(PR_SET_NAME, name.c_str());
- #elif __APPLE__
+#elif __APPLE__
setprogname(name.c_str());
- #endif
+#endif
}
int64_t get_child_pid(int64_t parentPID) {
#ifdef __linux__
DIR *dir = opendir("/proc");
if (!dir) {
std::cerr << "[PMC] (cc) Error opening /proc directory.\n";
perror("get_child_pid");
return -1;
}
int targetPID = -1;
dirent *entry;
while ((entry = readdir(dir)) != nullptr) {
if (entry->d_type == DT_DIR && isdigit(entry->d_name[0])) {
int pid = atoi(entry->d_name);
char statusPath[256];
snprintf(statusPath, sizeof(statusPath), "/proc/%d/status", pid);
FILE *statusFile = fopen(statusPath, "r");
if (statusFile) {
char buffer[256];
while (fgets(buffer, sizeof(buffer), statusFile) != nullptr) {
if (strncmp(buffer, "PPid:", 5) == 0) {
int parentID;
if (sscanf(buffer + 5, "%d", &parentID) == 1 && parentID == parentPID) {
targetPID = pid; break;
} break;
}
}
fclose(statusFile);
}
}
}
closedir(dir);
return targetPID;
#elif __APPLE__
pid_t pidList[1024];
int count = proc_listpids(PROC_ALL_PIDS, 0, pidList, sizeof(pidList));
if (count <= 0) {
std::cerr << "Error retrieving process list." << std::endl;
perror("get_child_pid");
return -1;
}
for (int i = 0; i < count; ++i) {
struct proc_bsdinfo procInfo;
if (proc_pidinfo(pidList[i], PROC_PIDTBSDINFO, 0, &procInfo, sizeof(procInfo)) > 0) {
if (procInfo.pbi_ppid == parentPID) {
return static_cast<int>(pidList[i]);
}
}
}
return -1;
#else
return -1;
#endif
}
rust::Vec<rust::i64> find_chidren(int64_t pid) {
rust::Vec<rust::i64> children;
int64_t child;
while ((child = get_child_pid(pid)) != -1) {
children.push_back(child);
pid = child;
}
return children;
}
int64_t stop(int64_t pid) {
vector<pid_t> children;
int64_t child;
while ((child = get_child_pid(pid)) != -1) {
children.push_back(child);
pid = child;
}
for (size_t i = 0; i < children.size(); i++) {
kill(children[i], SIGTERM);
}
return kill(pid, SIGTERM);
}
int64_t run(ProcessMetadata metadata) {
process::Runner runner;
runner.New(std::string(metadata.name), std::string(metadata.log_path));
return runner.Run(std::string(metadata.command), std::string(metadata.shell), metadata.args, metadata.env);
}
\ No newline at end of file
diff --git a/lib/fork.cc b/lib/fork.cc
index 91a0ca4..85fe6d5 100644
--- a/lib/fork.cc
+++ b/lib/fork.cc
@@ -1,93 +1,93 @@
-#include <fork.h>
-#include <stdexcept>
+#include "include/fork.h"
+
#include <cstdlib>
+#include <cstring>
#include <iostream>
+#include <stdexcept>
#include <unistd.h>
-#include <cstring>
#ifdef _WIN32
#include <windows.h>
#else
#include <pwd.h>
#include <unistd.h>
#endif
using namespace std;
std::string home() {
- #ifdef _WIN32
- const char* userProfile = std::getenv("USERPROFILE");
- if (userProfile) {
- return std::string(userProfile);
- } else {
- return "";
- }
- #else
- struct passwd* pw = getpwuid(getuid());
- if (pw && pw->pw_dir) {
- return std::string(pw->pw_dir);
- } else {
- return "";
- }
- #endif
+#ifdef _WIN32
+ const char *userProfile = std::getenv("USERPROFILE");
+ if (userProfile) {
+ return std::string(userProfile);
+ } else {
+ return "";
+ }
+#else
+ struct passwd *pw = getpwuid(getuid());
+ if (pw && pw->pw_dir) {
+ return std::string(pw->pw_dir);
+ } else {
+ return "";
+ }
+#endif
}
-
Fork fork_process() {
- pid_t res = ::fork();
- if (res == -1) {
- perror("fork_process");
- throw std::runtime_error("fork() failed");
- } else if (res == 0) {
- return Fork::Child;
- } else {
- return Fork::Parent;
- }
+ pid_t res = ::fork();
+ if (res == -1) {
+ perror("fork_process");
+ throw std::runtime_error("fork() failed");
+ } else if (res == 0) {
+ return Fork::Child;
+ } else {
+ return Fork::Parent;
+ }
}
pid_t set_sid() {
- pid_t res = ::setsid();
- if (res == -1) {
- perror("set_sid");
- throw std::runtime_error("setsid() failed");
- }
- return res;
+ pid_t res = ::setsid();
+ if (res == -1) {
+ perror("set_sid");
+ throw std::runtime_error("setsid() failed");
+ }
+ return res;
}
void close_fd() {
- bool res = false;
- for (int i = 0; i <= 2; ++i) {
- res |= (::close(i) == -1);
- }
+ bool res = false;
+ for (int i = 0; i <= 2; ++i) {
+ res |= (::close(i) == -1);
+ }
- if (res) {
- perror("close_fd");
- throw std::runtime_error("close_fd() failed");
- }
+ if (res) {
+ perror("close_fd");
+ throw std::runtime_error("close_fd() failed");
+ }
}
int32_t try_fork(bool nochdir, bool noclose, Callback callback) {
- try {
- Fork forkResult = fork_process();
- if (forkResult == Fork::Parent) {
- exit(0);
- } else if (forkResult == Fork::Child) {
- set_sid();
- if (!nochdir) {
- std::string home_dir = home() + ".pmc";
- chdir(home_dir.c_str());
- }
- if (!noclose) {
- close_fd();
- }
- forkResult = fork_process();
- }
- return static_cast<int32_t>(forkResult);
- } catch (const std::exception& e) {
- std::cerr << "[PMC] (cc) Error setting up daemon handler\n";
- perror("try_fork");
+ try {
+ Fork forkResult = fork_process();
+ if (forkResult == Fork::Parent) {
+ exit(0);
+ } else if (forkResult == Fork::Child) {
+ set_sid();
+ if (!nochdir) {
+ std::string home_dir = home() + ".pmc";
+ chdir(home_dir.c_str());
+ }
+ if (!noclose) {
+ close_fd();
+ }
+ forkResult = fork_process();
}
-
- callback();
- return -1;
+ return static_cast<int32_t>(forkResult);
+ } catch (const std::exception &e) {
+ std::cerr << "[PMC] (cc) Error setting up daemon handler\n";
+ perror("try_fork");
+ }
+
+ callback();
+ return -1;
}
\ No newline at end of file
diff --git a/lib/include/bridge.h b/lib/include/bridge.h
index 57fefb5..ed33639 100644
--- a/lib/include/bridge.h
+++ b/lib/include/bridge.h
@@ -1,25 +1,25 @@
#ifndef BRIDGE_H
#define BRIDGE_H
-#include <rust.h>
+#include "rust.h"
using namespace rust;
#ifndef CXXBRIDGE1_STRUCT_ProcessMetadata
#define CXXBRIDGE1_STRUCT_ProcessMetadata
struct ProcessMetadata final {
String name;
String shell;
String command;
String log_path;
Vec<String> args;
Vec<String> env;
using IsRelocatable = std::true_type;
};
#endif
extern "C++" int64_t stop(int64_t pid);
extern "C++" int64_t run(ProcessMetadata metadata);
extern "C++" void set_program_name(String name);
extern "C++" int64_t get_child_pid(int64_t parentPID);
extern "C++" Vec<i64> find_chidren(int64_t pid);
#endif
diff --git a/lib/include/fork.h b/lib/include/fork.h
index 7ce089a..fe1550e 100644
--- a/lib/include/fork.h
+++ b/lib/include/fork.h
@@ -1,20 +1,20 @@
#ifndef FORK_H
#define FORK_H
-#include <string>
#include <cstdint>
+#include <string>
#ifndef CXXBRIDGE1_ENUM_Fork
#define CXXBRIDGE1_ENUM_Fork
-enum class Fork: std::uint8_t {
- Parent,
- Child
+enum class Fork: std::uint8_t {
+ Parent,
+ Child
};
#endif
-using Callback = void(*)();
+using Callback = void (*)();
pid_t set_sid();
void close_fd();
Fork fork_process();
extern "C" int32_t try_fork(bool nochdir, bool noclose, Callback callback);
#endif
diff --git a/lib/include/process.h b/lib/include/process.h
index fe721c7..81c0c2b 100644
--- a/lib/include/process.h
+++ b/lib/include/process.h
@@ -1,20 +1,20 @@
#ifndef PROCESS_H
#define PROCESS_H
-#include <rust.h>
+#include "rust.h"
using namespace rust;
namespace process {
class Runner {
public:
void New(const std::string &name, const std::string &logPath);
int64_t Run(const std::string &command, const std::string &shell, Vec<String> args, Vec<String> env);
~Runner();
private:
int stdout_fd;
int stderr_fd;
};
}
#endif
diff --git a/lib/include/psutil.h b/lib/include/psutil.h
index abefd9e..f98c476 100644
--- a/lib/include/psutil.h
+++ b/lib/include/psutil.h
@@ -1,8 +1,8 @@
#ifndef PSUTIL_H
#define PSUTIL_H
-#include <rust.h>
+#include "rust.h"
using namespace rust;
extern "C++" double get_process_cpu_usage_percentage(int64_t pid);
#endif
diff --git a/lib/process.cc b/lib/process.cc
index 679844b..00deb88 100644
--- a/lib/process.cc
+++ b/lib/process.cc
@@ -1,133 +1,134 @@
-#include <process.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <sys/wait.h>
-#include <signal.h>
-#include <chrono>
-#include <thread>
-#include <iostream>
+#include "include/process.h"
+
#include <algorithm>
+#include <chrono>
+#include <fcntl.h>
#include <fstream>
+#include <iostream>
+#include <signal.h>
#include <sstream>
+#include <sys/wait.h>
+#include <thread>
+#include <unistd.h>
#ifdef __APPLE__
-#include <sys/types.h>
#include <sys/sysctl.h>
+#include <sys/types.h>
#endif
using namespace std;
namespace process {
volatile sig_atomic_t childExitStatus = 0;
std::string format(std::string text) {
- std::replace(text.begin(), text.end(), ' ', '_');
- return text;
+ std::replace(text.begin(), text.end(), ' ', '_');
+ return text;
}
-pair<std::string, std::string> split(const std::string& str) {
- size_t length = str.length();
- size_t midpoint = length / 2;
+pair<std::string, std::string> split(const std::string &str) {
+ size_t length = str.length();
+ size_t midpoint = length / 2;
- std::string firstHalf = str.substr(0, midpoint);
- std::string secondHalf = str.substr(midpoint);
+ std::string firstHalf = str.substr(0, midpoint);
+ std::string secondHalf = str.substr(midpoint);
- return make_pair(firstHalf, secondHalf);
+ return make_pair(firstHalf, secondHalf);
}
void sigchld_handler(int signo) {
(void)signo;
int status;
while (waitpid(-1, &status, WNOHANG) > 0) {
childExitStatus = status;
}
}
void Runner::New(const std::string &name, const std::string &logPath) {
std::string formattedName = format(name);
std::string stdoutFileName = logPath + "/" + formattedName + "-out.log";
std::string stderrFileName = logPath + "/" + formattedName + "-error.log";
-
+
stdout_fd = open(stdoutFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
stderr_fd = open(stderrFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
struct sigaction sa;
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
std::cerr << "[PMC] (cc) Error setting up SIGCHLD handler\n";
perror("Runner::New");
}
}
Runner::~Runner() {
if (stdout_fd != -1) {
close(stdout_fd);
}
if (stderr_fd != -1) {
close(stderr_fd);
}
}
int64_t Runner::Run(const std::string &command, const std::string &shell, Vec<String> args, Vec<String> env) {
pid_t pid = fork();
if (pid == -1) {
std::cerr << "[PMC] (cc) Unable to fork\n";
perror("Runner::Run");
return -1;
} else if (pid == 0) {
setsid();
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
dup2(stdout_fd, STDOUT_FILENO);
dup2(stderr_fd, STDERR_FILENO);
-
- std::vector<const char*> argsArray;
- std::vector<const char*> envArray;
+
+ std::vector<const char *> argsArray;
+ std::vector<const char *> envArray;
argsArray.push_back(shell.c_str());
transform(args.begin(), args.end(), std::back_inserter(argsArray),
[](rust::String& arg) { return arg.c_str(); });
transform(env.begin(), env.end(), std::back_inserter(envArray),
[](rust::String& env) { return env.c_str(); });
argsArray.push_back(command.c_str());
- argsArray.push_back(nullptr);
- envArray.push_back(nullptr);
+ argsArray.push_back(nullptr);
+ envArray.push_back(nullptr);
if (execve(shell.c_str(), const_cast<char* const*>(argsArray.data()), const_cast<char* const*>(envArray.data())) == -1) {
std::cerr << "[PMC] (cc) Unable to execute the command\n";
perror("execvp");
exit(EXIT_FAILURE);
}
} else {
close(stdout_fd);
close(stderr_fd);
-
+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::string proc_path = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(pid) + "/children";
std::ifstream proc_file(proc_path);
if (proc_file.is_open()) {
std::string line;
if (std::getline(proc_file, line)) {
std::istringstream iss(line);
pid_t child_pid;
if (iss >> child_pid) {
return child_pid;
}
}
}
-
+
return pid;
}
-
+
return -1;
}}
diff --git a/lib/psutil.cc b/lib/psutil.cc
index 8b3df3e..0528ca8 100644
--- a/lib/psutil.cc
+++ b/lib/psutil.cc
@@ -1,99 +1,109 @@
+#include "include/psutil.h"
+
#include <chrono>
#include <thread>
-#include <cmath>
#ifdef __APPLE__
#include <libproc.h>
#include <sys/proc_info.h>
#include <sys/sysctl.h>
#else
+#include <cmath>
+#include <cstdlib>
#include <fstream>
#include <sstream>
-#include <vector>
-#include <cstdlib>
#include <unistd.h>
+#include <vector>
+#include <iostream>
#endif
int get_num_cores() {
#ifdef __APPLE__
- int nm[2];
- size_t len = 4;
- uint32_t count;
-
- nm[0] = CTL_HW; nm[1] = HW_AVAILCPU;
- sysctl(nm, 2, &count, &len, NULL, 0);
-
- if(count < 1) {
- nm[1] = HW_NCPU;
- sysctl(nm, 2, &count, &len, NULL, 0);
- }
- return count > 0 ? static_cast<int>(count) : 1;
+ int nm[2];
+ size_t len = 4;
+ uint32_t count;
+
+ nm[0] = CTL_HW;
+ nm[1] = HW_AVAILCPU;
+ sysctl(nm, 2, &count, &len, NULL, 0);
+
+ if (count < 1) {
+ nm[1] = HW_NCPU;
+ sysctl(nm, 2, &count, &len, NULL, 0);
+ }
+
+ return count > 0 ? static_cast<int>(count) : 1;
#else
- return static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
+ return static_cast<int>(sysconf(_SC_NPROCESSORS_ONLN));
#endif
}
double get_cpu_time(int64_t pid) {
#ifdef __APPLE__
- struct proc_taskinfo pti;
- int ret = proc_pidinfo(pid, PROC_PIDTASKINFO, 0, &pti, sizeof(pti));
- if (ret <= 0) {
- return 0.0;
- }
- return (pti.pti_total_user + pti.pti_total_system) / 1e9;
+ struct proc_taskinfo pti;
+ int ret = proc_pidinfo(pid, PROC_PIDTASKINFO, 0, &pti, sizeof(pti));
+
+ if (ret <= 0) {
+ return 0.0;
+ }
+
+ return (pti.pti_total_user + pti.pti_total_system) / 1e9;
#else
- std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
- std::ifstream stat_file(stat_path);
-
- if (!stat_file.is_open()) {
- return 0.0;
- }
-
- std::string line;
- std::getline(stat_file, line);
-
- std::istringstream iss(line);
- std::string token;
- std::vector<std::string> tokens;
-
- while (std::getline(iss, token, ' ')) {
- tokens.push_back(token);
- }
-
- if (tokens.size() < 15) {
- return 0.0;
- }
-
- unsigned long long utime = std::stoull(tokens[13]);
- unsigned long long stime = std::stoull(tokens[14]);
-
- return (utime + stime) / sysconf(_SC_CLK_TCK);
+ std::string stat_path = "/proc/" + std::to_string(pid) + "/stat";
+ std::ifstream stat_file(stat_path);
+
+ if (!stat_file.is_open()) {
+ std::cerr << "Failed to open " << stat_path << std::endl;
+ return -1.0;
+ }
+
+ std::string line;
+ std::getline(stat_file, line);
+
+ std::istringstream iss(line);
+ std::string token;
+ std::vector<std::string> tokens;
+
+ while (std::getline(iss, token, ' ')) {
+ tokens.push_back(token);
+ }
+
+ if (tokens.size() < 15) {
+ std::cerr << "Unexpected format in " << stat_path << std::endl;
+ return -1.0;
+ }
+
+ unsigned long long utime = std::stoull(tokens[13]);
+ unsigned long long stime = std::stoull(tokens[14]);
+ double ticks_per_second = static_cast<double>(sysconf(_SC_CLK_TCK));
+
+ return (utime + stime) / ticks_per_second;
#endif
}
-extern "C++" double get_process_cpu_usage_percentage(int64_t pid) {
- const std::chrono::milliseconds measurement_interval(300);
+double get_process_cpu_usage_percentage(int64_t pid) {
+ const std::chrono::milliseconds measurement_interval(100);
+ double cpu_time_start = get_cpu_time(pid);
- double cpu_time_start = get_cpu_time(pid);
- if (cpu_time_start < 0) {
- return 0.0;
- }
+ if (cpu_time_start < 0) {
+ return 0.0;
+ }
- auto start_time = std::chrono::steady_clock::now();
- std::this_thread::sleep_for(measurement_interval);
- auto end_time = std::chrono::steady_clock::now();
+ auto start_time = std::chrono::steady_clock::now();
+ std::this_thread::sleep_for(measurement_interval);
+ auto end_time = std::chrono::steady_clock::now();
- double cpu_time_end = get_cpu_time(pid);
- if (cpu_time_end < 0) {
- return 0.0;
- }
+ double cpu_time_end = get_cpu_time(pid);
+ if (cpu_time_end < 0) {
+ return 0.0;
+ }
- double cpu_time_diff = cpu_time_end - cpu_time_start;
- std::chrono::duration<double> elapsed = end_time - start_time;
- double elapsed_seconds = elapsed.count();
- double cpu_usage_percentage = (cpu_time_diff / elapsed_seconds) * 100.0;
+ long num_cores = get_num_cores();
+ double cpu_time_diff = cpu_time_end - cpu_time_start;
+ std::chrono::duration<double> elapsed = end_time - start_time;
- long num_cores = get_num_cores();
+ double elapsed_seconds = elapsed.count();
+ double cpu_usage_percentage = (cpu_time_diff / elapsed_seconds) * (100.0 * num_cores);
- return std::min(cpu_usage_percentage, 100.0 * num_cores);
+ return std::min(cpu_usage_percentage, 100.0 * num_cores);
}
\ No newline at end of file
diff --git a/src/cli/internal.rs b/src/cli/internal.rs
index 70718a3..cd428f7 100644
--- a/src/cli/internal.rs
+++ b/src/cli/internal.rs
@@ -1,652 +1,652 @@
use colored::Colorize;
use macros_rs::{crashln, string, ternary, then};
use psutil::process::{MemoryInfo, Process};
use regex::Regex;
use serde::Serialize;
use serde_json::json;
use pmc::{
config, file,
helpers::{self, ColoredString},
log,
process::{http, ItemSingle, Runner},
};
use tabled::{
settings::{
object::{Columns, Rows},
style::{BorderColor, Style},
themes::Colorization,
Color, Modify, Rotate, Width,
},
Table, Tabled,
};
pub struct Internal<'i> {
pub id: usize,
pub runner: Runner,
pub kind: String,
pub server_name: &'i str,
}
impl<'i> Internal<'i> {
pub fn create(mut self, script: &String, name: &Option<String>, watch: &Option<String>, silent: bool) -> Runner {
let config = config::read();
let name = match name {
Some(name) => string!(name),
None => string!(script.split_whitespace().next().unwrap_or_default()),
};
if matches!(self.server_name, "internal" | "local") {
let pattern = Regex::new(r"(?m)^[a-zA-Z0-9]+(/[a-zA-Z0-9]+)*(\.js|\.ts)?$").unwrap();
if pattern.is_match(script) {
let script = format!("{} {script}", config.runner.node);
self.runner.start(&name, &script, file::cwd(), watch).save();
} else {
self.runner.start(&name, script, file::cwd(), watch).save();
}
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
match Runner::connect(self.server_name.into(), server.get(), false) {
Some(mut remote) => remote.start(&name, script, file::cwd(), watch),
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name,)
};
}
then!(!silent, println!("{} Creating {}process with ({name})", *helpers::SUCCESS, self.kind));
then!(!silent, println!("{} {}Created ({name}) ✓", *helpers::SUCCESS, self.kind));
return self.runner;
}
pub fn restart(mut self, name: &Option<String>, watch: &Option<String>, reset_env: bool, silent: bool) -> Runner {
then!(!silent, println!("{} Applying {}action restartProcess on ({})", *helpers::SUCCESS, self.kind, self.id));
if matches!(self.server_name, "internal" | "local") {
let mut item = self.runner.get(self.id);
match watch {
Some(path) => item.watch(path),
None => item.disable_watch(),
}
then!(reset_env, item.clear_env());
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
self.runner = item.get_runner().clone();
} else {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => {
let mut item = remote.get(self.id);
then!(reset_env, item.clear_env());
name.as_ref().map(|n| item.rename(n.trim().replace("\n", "")));
item.restart();
}
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
}
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
}
if !silent {
println!("{} Restarted {}({}) ✓", *helpers::SUCCESS, self.kind, self.id);
log!("process started (id={})", self.id);
}
return self.runner;
}
pub fn stop(mut self, silent: bool) -> Runner {
then!(!silent, println!("{} Applying {}action stopProcess on ({})", *helpers::SUCCESS, self.kind, self.id));
if !matches!(self.server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
self.runner = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
}
let mut item = self.runner.get(self.id);
item.stop();
self.runner = item.get_runner().clone();
if !silent {
println!("{} Stopped {}({}) ✓", *helpers::SUCCESS, self.kind, self.id);
log!("process stopped {}(id={})", self.kind, self.id);
}
return self.runner;
}
pub fn remove(mut self) {
println!("{} Applying {}action removeProcess on ({})", *helpers::SUCCESS, self.kind, self.id);
if !matches!(self.server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
self.runner = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to remove (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
}
self.runner.remove(self.id);
println!("{} Removed {}({}) ✓", *helpers::SUCCESS, self.kind, self.id);
log!("process removed (id={})", self.id);
}
pub fn flush(&mut self) {
println!("{} Applying {}action flushLogs on ({})", *helpers::SUCCESS, self.kind, self.id);
if !matches!(self.server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
self.runner = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to remove (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
}
self.runner.flush(self.id);
println!("{} Flushed Logs {}({}) ✓", *helpers::SUCCESS, self.kind, self.id);
log!("process logs cleaned (id={})", self.id);
}
pub fn info(&self, format: &String) {
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "error log path ")]
log_error: String,
#[tabled(rename = "out log path")]
log_out: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "path hash")]
hash: String,
#[tabled(rename = "watching")]
watch: String,
children: String,
#[tabled(rename = "exec cwd")]
path: String,
#[tabled(rename = "script command ")]
command: String,
#[tabled(rename = "script id")]
id: String,
restarts: u64,
uptime: String,
pid: String,
name: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"id": &self.id.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"path": &self.path.trim(),
"restarts": &self.restarts,
"hash": &self.hash.trim(),
"watch": &self.watch.trim(),
"children": &self.children,
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"log_out": &self.log_out.trim(),
"cpu": &self.cpu_percent.trim(),
"command": &self.command.trim(),
"mem": &self.memory_usage.trim(),
"log_error": &self.log_error.trim(),
});
trimmed_json.serialize(serializer)
}
}
let render_info = |data: Vec<Info>| {
let table = Table::new(data.clone())
.with(Rotate::Left)
.with(Style::rounded().remove_horizontals())
.with(Colorization::exact([Color::FG_CYAN], Columns::first()))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.to_string();
if let Ok(json) = serde_json::to_string(&data[0]) {
match format.as_str() {
"raw" => println!("{:?}", data[0]),
"json" => println!("{json}"),
_ => {
println!("{}\n{table}\n", format!("Describing {}process with id ({})", self.kind, self.id).on_bright_white().black());
println!(" {}", format!("Use `pmc logs {} [--lines <num>]` to display logs", self.id).white());
println!(" {}", format!("Use `pmc env {}` to display environment variables", self.id).white());
}
};
};
};
if matches!(self.server_name, "internal" | "local") {
if let Some(home) = home::home_dir() {
let config = config::read().runner;
let mut runner = Runner::new();
let item = runner.process(self.id);
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f64> = None;
let path = file::make_relative(&item.path, &home).to_string_lossy().into_owned();
let children = if item.children.is_empty() { "none".to_string() } else { format!("{:?}", item.children) };
if let Ok(process) = Process::new(item.pid as u32) {
memory_usage = process.memory_info().ok();
cpu_percent = Some(pmc::service::get_process_cpu_usage_percentage(item.pid as i64));
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
+ None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
let data = vec![Info {
children,
cpu_percent,
memory_usage,
id: string!(self.id),
restarts: item.restarts,
name: item.name.clone(),
log_out: item.logs().out,
path: format!("{} ", path),
log_error: item.logs().error,
status: ColoredString(status),
pid: ternary!(item.running, format!("{}", item.pid), string!("n/a")),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script),
hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
}];
render_info(data)
} else {
crashln!("{} Impossible to get your home directory", *helpers::FAIL);
}
} else {
let data: (pmc::process::Process, Runner);
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
data = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(mut remote) => (remote.process(self.id).clone(), remote),
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
let (item, remote) = data;
let remote = remote.remote.unwrap();
let info = http::info(&remote, self.id);
let path = item.path.to_string_lossy().into_owned();
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
if let Ok(info) = info {
let stats = info.json::<ItemSingle>().unwrap().stats;
let children = if item.children.is_empty() { "none".to_string() } else { format!("{:?}", item.children) };
let cpu_percent = match stats.cpu_percent {
Some(percent) => format!("{percent:.2}%"),
- None => string!("0%"),
+ None => string!("0.00%"),
};
let memory_usage = match stats.memory_usage {
Some(usage) => helpers::format_memory(usage.rss),
None => string!("0b"),
};
let data = vec![Info {
children,
cpu_percent,
memory_usage,
id: string!(self.id),
path: path.clone(),
status: status.into(),
restarts: item.restarts,
name: item.name.clone(),
pid: ternary!(item.running, format!("{pid}", pid = item.pid), string!("n/a")),
log_out: format!("{}/{}-out.log", remote.config.log_path, item.name),
log_error: format!("{}/{}-error.log", remote.config.log_path, item.name),
hash: ternary!(item.watch.enabled, format!("{} ", item.watch.hash), string!("none ")),
command: format!("{} {} '{}'", remote.config.shell, remote.config.args.join(" "), item.script),
watch: ternary!(item.watch.enabled, format!("{path}/{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{}", helpers::format_duration(item.started)), string!("none")),
}];
render_info(data)
}
}
}
pub fn logs(mut self, lines: &usize) {
if !matches!(self.server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
self.runner = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
let item = self.runner.info(self.id).unwrap_or_else(|| crashln!("{} Process ({}) not found", *helpers::FAIL, self.id));
println!(
"{}",
format!("Showing last {lines} lines for {}process [{}] (change the value with --lines option)", self.kind, self.id).yellow()
);
for kind in vec!["error", "out"] {
let logs = http::logs(&self.runner.remote.as_ref().unwrap(), self.id, kind);
if let Ok(log) = logs {
if log.lines.is_empty() {
println!("{} No logs found for {}/{kind}", *helpers::FAIL, item.name);
continue;
}
file::logs_internal(log.lines, *lines, log.path, self.id, kind, &item.name)
}
}
} else {
let item = self.runner.info(self.id).unwrap_or_else(|| crashln!("{} Process ({}) not found", *helpers::FAIL, self.id));
println!(
"{}",
format!("Showing last {lines} lines for {}process [{}] (change the value with --lines option)", self.kind, self.id).yellow()
);
file::logs(item, *lines, "error");
file::logs(item, *lines, "out");
}
}
pub fn env(mut self) {
println!("{}", format!("Showing env for {}process {}:\n", self.kind, self.id).bright_yellow());
if !matches!(self.server_name, "internal" | "local") {
let Some(servers) = config::servers().servers else {
crashln!("{} Failed to read servers", *helpers::FAIL)
};
if let Some(server) = servers.get(self.server_name) {
self.runner = match Runner::connect(self.server_name.into(), server.get(), false) {
Some(remote) => remote,
None => crashln!("{} Failed to connect (name={}, address={})", *helpers::FAIL, self.server_name, server.address),
};
} else {
crashln!("{} Server '{}' does not exist", *helpers::FAIL, self.server_name)
};
}
let item = self.runner.process(self.id);
item.env.iter().for_each(|(key, value)| println!("{}: {}", key, value.green()));
}
pub fn save(server_name: &String) {
if !matches!(&**server_name, "internal" | "local") {
crashln!("{} Cannot force save on remote servers", *helpers::FAIL)
}
println!("{} Saved current processes to dumpfile", *helpers::SUCCESS);
Runner::new().save();
}
pub fn restore(server_name: &String) {
let mut runner = Runner::new();
let (kind, list_name) = super::format(server_name);
if !matches!(&**server_name, "internal" | "local") {
crashln!("{} Cannot restore on remote servers", *helpers::FAIL)
}
Runner::new().list().for_each(|(id, p)| {
if p.running == true {
runner = Internal {
id: *id,
server_name,
kind: kind.clone(),
runner: runner.clone(),
}
.restart(&None, &None, false, true);
}
});
println!("{} Restored process statuses from dumpfile", *helpers::SUCCESS);
Internal::list(&string!("default"), &list_name);
}
pub fn list(format: &String, server_name: &String) {
let render_list = |runner: &mut Runner, internal: bool| {
let mut processes: Vec<ProcessItem> = Vec::new();
#[derive(Tabled, Debug)]
struct ProcessItem {
id: ColoredString,
name: String,
pid: String,
uptime: String,
#[tabled(rename = "↺")]
restarts: String,
status: ColoredString,
cpu: String,
mem: String,
#[tabled(rename = "watching")]
watch: String,
}
impl serde::Serialize for ProcessItem {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"cpu": &self.cpu.trim(),
"mem": &self.mem.trim(),
"id": &self.id.0.trim(),
"pid": &self.pid.trim(),
"name": &self.name.trim(),
"watch": &self.watch.trim(),
"uptime": &self.uptime.trim(),
"status": &self.status.0.trim(),
"restarts": &self.restarts.trim(),
});
trimmed_json.serialize(serializer)
}
}
if runner.is_empty() {
println!("{} Process table empty", *helpers::SUCCESS);
} else {
for (id, item) in runner.items() {
let mut cpu_percent: String = string!("0%");
let mut memory_usage: String = string!("0b");
if internal {
let mut usage_internals: (Option<f64>, Option<MemoryInfo>) = (None, None);
if let Ok(process) = Process::new(item.pid as u32) {
usage_internals = (Some(pmc::service::get_process_cpu_usage_percentage(item.pid as i64)), process.memory_info().ok());
}
cpu_percent = match usage_internals.0 {
Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
+ None => string!("0.00%"),
};
memory_usage = match usage_internals.1 {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
} else {
let info = http::info(&runner.remote.as_ref().unwrap(), id);
if let Ok(info) = info {
let stats = info.json::<ItemSingle>().unwrap().stats;
cpu_percent = match stats.cpu_percent {
Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
+ None => string!("0.00%"),
};
memory_usage = match stats.memory_usage {
Some(usage) => helpers::format_memory(usage.rss),
None => string!("0b"),
};
}
}
let status = if item.running {
"online ".green().bold()
} else {
match item.crash.crashed {
true => "crashed ",
false => "stopped ",
}
.red()
.bold()
};
processes.push(ProcessItem {
status: status.into(),
cpu: format!("{cpu_percent} "),
mem: format!("{memory_usage} "),
id: id.to_string().cyan().bold().into(),
restarts: format!("{} ", item.restarts),
name: format!("{} ", item.name.clone()),
pid: ternary!(item.running, format!("{} ", item.pid), string!("n/a ")),
watch: ternary!(item.watch.enabled, format!("{} ", item.watch.path), string!("disabled ")),
uptime: ternary!(item.running, format!("{} ", helpers::format_duration(item.started)), string!("none ")),
});
}
let table = Table::new(&processes)
.with(Style::rounded().remove_verticals())
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.with(Colorization::exact([Color::FG_BRIGHT_CYAN], Rows::first()))
.with(Modify::new(Columns::single(1)).with(Width::truncate(35).suffix("... ")))
.to_string();
if let Ok(json) = serde_json::to_string(&processes) {
match format.as_str() {
"raw" => println!("{:?}", processes),
"json" => println!("{json}"),
"default" => println!("{table}"),
_ => {}
};
};
}
};
if let Some(servers) = config::servers().servers {
let mut failed: Vec<(String, String)> = vec![];
if let Some(server) = servers.get(server_name) {
match Runner::connect(server_name.clone(), server.get(), true) {
Some(mut remote) => render_list(&mut remote, false),
None => println!("{} Failed to fetch (name={server_name}, address={})", *helpers::FAIL, server.address),
}
} else {
if matches!(&**server_name, "internal" | "all" | "global" | "local") {
if *server_name == "all" || *server_name == "global" {
println!("{} Internal daemon", *helpers::SUCCESS);
}
render_list(&mut Runner::new(), true);
} else {
crashln!("{} Server '{server_name}' does not exist", *helpers::FAIL);
}
}
if *server_name == "all" || *server_name == "global" {
for (name, server) in servers {
match Runner::connect(name.clone(), server.get(), true) {
Some(mut remote) => render_list(&mut remote, false),
None => failed.push((name, server.address)),
}
}
}
if !failed.is_empty() {
println!("{} Failed servers:", *helpers::FAIL);
failed
.iter()
.for_each(|server| println!(" {} {} {}", "-".yellow(), format!("{}", server.0), format!("[{}]", server.1).white()));
}
} else {
render_list(&mut Runner::new(), true);
}
}
}
diff --git a/src/daemon/api/routes.rs b/src/daemon/api/routes.rs
index b284a44..e3258c3 100644
--- a/src/daemon/api/routes.rs
+++ b/src/daemon/api/routes.rs
@@ -1,964 +1,964 @@
#![allow(non_snake_case)]
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{fmtstr, string, ternary, then};
use prometheus::{Encoder, TextEncoder};
-use psutil::process::{MemoryInfo, Process};
+use psutil::process::Process;
use reqwest::header::HeaderValue;
use tera::Context;
use utoipa::ToSchema;
use rocket::{
get,
http::{ContentType, Status},
post,
response::stream::{Event, EventStream},
serde::{json::Json, Deserialize, Serialize},
State,
};
use super::{
helpers::{generic_error, not_found, GenericError, NotFound},
structs::ErrorMessage,
EnableWebUI, TeraState,
};
use pmc::{
config, file, helpers,
process::{dump, http::client, ItemSingle, ProcessItem, Runner},
};
use crate::daemon::{
api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM},
pid::{self, Pid},
};
use std::{
collections::BTreeMap,
env,
fs::{self, File},
io::{self, BufRead, BufReader},
path::PathBuf,
thread::sleep,
time::Duration,
};
pub(crate) struct Token;
type EnvList = Json<BTreeMap<String, String>>;
#[allow(dead_code)]
#[derive(ToSchema)]
#[schema(as = MemoryInfo)]
pub(crate) struct DocMemoryInfo {
rss: u64,
vms: u64,
#[cfg(target_os = "linux")]
shared: u64,
#[cfg(target_os = "linux")]
text: u64,
#[cfg(target_os = "linux")]
data: u64,
#[cfg(target_os = "macos")]
page_faults: u64,
#[cfg(target_os = "macos")]
pageins: u64,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct ActionBody {
#[schema(example = "restart")]
method: String,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct ConfigBody {
#[schema(example = "bash")]
shell: String,
#[schema(min_items = 1, example = json!(["-c"]))]
args: Vec<String>,
#[schema(example = "/home/user/.pmc/logs")]
log_path: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct CreateBody {
#[schema(example = "app")]
name: Option<String>,
#[schema(example = "node index.js")]
script: String,
#[schema(value_type = String, example = "/projects/app")]
path: PathBuf,
#[schema(example = "src")]
watch: Option<String>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct ActionResponse {
#[schema(example = true)]
done: bool,
#[schema(example = "name")]
action: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct LogResponse {
logs: Vec<String>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct MetricsRoot {
pub raw: Raw,
pub version: Version,
pub os: crate::globals::Os,
pub daemon: Daemon,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Raw {
- pub memory_usage: Option<MemoryInfo>,
+ pub memory_usage: Option<u64>,
pub cpu_percent: Option<f64>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Version {
#[schema(example = "v1.0.0")]
pub pkg: String,
pub hash: Option<String>,
#[schema(example = "2000-01-01")]
pub build_date: String,
#[schema(example = "release")]
pub target: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Daemon {
pub pid: Option<Pid>,
#[schema(example = true)]
pub running: bool,
pub uptime: String,
pub process_count: usize,
#[schema(example = "default")]
pub daemon_type: String,
pub stats: Stats,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Stats {
pub memory_usage: String,
pub cpu_percent: String,
}
fn attempt(done: bool, method: &str) -> ActionResponse {
ActionResponse {
done,
action: ternary!(done, Box::leak(Box::from(method)), "DOES_NOT_EXIST").to_string(),
}
}
fn render(name: &str, state: &State<TeraState>, ctx: &mut Context) -> Result<String, NotFound> {
ctx.insert("base_path", &state.path);
ctx.insert("build_version", env!("CARGO_PKG_VERSION"));
state.tera.render(name, &ctx).or(Err(not_found("Page was not found")))
}
#[get("/")]
pub async fn dashboard(state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> { Ok((ContentType::HTML, render("dashboard", &state, &mut Context::new())?)) }
#[get("/servers")]
pub async fn servers(state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> { Ok((ContentType::HTML, render("servers", &state, &mut Context::new())?)) }
#[get("/login")]
pub async fn login(state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> { Ok((ContentType::HTML, render("login", &state, &mut Context::new())?)) }
#[get("/view/<id>")]
pub async fn view_process(id: usize, state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> {
let mut ctx = Context::new();
ctx.insert("process_id", &id);
Ok((ContentType::HTML, render("view", &state, &mut ctx)?))
}
#[get("/status/<name>")]
pub async fn server_status(name: String, state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> {
let mut ctx = Context::new();
ctx.insert("server_name", &name);
Ok((ContentType::HTML, render("status", &state, &mut ctx)?))
}
#[get("/daemon/prometheus")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/prometheus", security((), ("api_key" = [])),
responses(
(
description = "Get prometheus metrics", body = String, status = 200,
example = json!("# HELP daemon_cpu_percentage The cpu usage graph of the daemon.\n# TYPE daemon_cpu_percentage histogram\ndaemon_cpu_percentage_bucket{le=\"0.005\"} 0"),
),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn prometheus_handler(_t: Token) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer.clone()).unwrap()
}
#[get("/daemon/servers")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/servers", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon servers successfully", body = [String]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn servers_handler(_t: Token) -> Result<Json<Vec<String>>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["servers"]).start_timer();
if let Some(servers) = config::servers().servers {
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(Json(servers.into_keys().collect()))
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/list")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/list", security((), ("api_key" = [])),
params(("name" = String, Path, description = "Name of remote daemon", example = "example"),),
responses(
(status = 200, description = "Get list from remote daemon successfully", body = [ProcessItem]),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_list(name: String, _t: Token) -> Result<Json<Vec<ProcessItem>>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/list")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<Vec<ProcessItem>>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/info/<id>")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/info/{id}", security((), ("api_key" = [])),
params(
("name" = String, Path, description = "Name of remote daemon", example = "example"),
("id" = usize, Path, description = "Process id to get information for", example = 0)
),
responses(
(status = 200, description = "Get process info from remote daemon successfully", body = [ProcessItem]),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_info(name: String, id: usize, _t: Token) -> Result<Json<ItemSingle>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/process/{id}/info")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ItemSingle>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/logs/<id>/<kind>")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/logs/{id}/{kind}", security((), ("api_key" = [])),
params(
("name" = String, Path, description = "Name of remote daemon", example = "example"),
("id" = usize, Path, description = "Process id to get information for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Remote process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_logs(name: String, id: usize, kind: String, _t: Token) -> Result<Json<LogResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/process/{id}/logs/{kind}")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<LogResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[post("/remote/<name>/rename/<id>", format = "text", data = "<body>")]
#[utoipa::path(post, tag = "Remote", path = "/remote/{name}/rename/{id}",
security((), ("api_key" = [])),
request_body(content = String, example = json!("example_name")),
params(
("id" = usize, Path, description = "Process id to rename", example = 0),
("name" = String, Path, description = "Name of remote daemon", example = "example"),
),
responses(
(
description = "Remote rename process successful", body = ActionResponse,
example = json!({"action": "rename", "done": true }), status = 200,
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_rename(name: String, id: usize, body: String, _t: Token) -> Result<Json<ActionResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, mut headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
headers.insert("content-type", HeaderValue::from_static("text/plain"));
match client.post(fmtstr!("{address}/process/{id}/rename")).body(body).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ActionResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[post("/remote/<name>/action/<id>", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Remote", path = "/remote/{name}/action/{id}", request_body = ActionBody,
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to run action on", example = 0),
("name" = String, Path, description = "Name of remote daemon", example = "example")
),
responses(
(status = 200, description = "Run action on remote process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_action(name: String, id: usize, body: Json<ActionBody>, _t: Token) -> Result<Json<ActionResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.post(fmtstr!("{address}/process/{id}/action")).json(&body.0).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ActionResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/daemon/dump")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/dump", security((), ("api_key" = [])),
responses(
(status = 200, description = "Dump processes successfully", body = [u8]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn dump_handler(_t: Token) -> Vec<u8> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
HTTP_COUNTER.inc();
timer.observe_duration();
dump::raw()
}
#[get("/daemon/config")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/config", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon config successfully", body = ConfigBody),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn config_handler(_t: Token) -> Json<ConfigBody> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
let config = config::read().runner;
HTTP_COUNTER.inc();
timer.observe_duration();
Json(ConfigBody {
shell: config.shell,
args: config.args,
log_path: config.log_path,
})
}
#[get("/list")]
#[utoipa::path(get, path = "/list", tag = "Process", security((), ("api_key" = [])),
responses(
(status = 200, description = "List processes successfully", body = [ProcessItem]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn list_handler(_t: Token) -> Json<Vec<ProcessItem>> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
let data = Runner::new().fetch();
HTTP_COUNTER.inc();
timer.observe_duration();
Json(data)
}
#[get("/process/<id>/logs/<kind>")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}",
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn logs_handler(id: usize, kind: String, _t: Token) -> Result<Json<LogResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => item.logs().out,
"error" | "stderr" => item.logs().error,
_ => item.logs().out,
};
match File::open(log_file) {
Ok(data) => {
let reader = BufReader::new(data);
let logs: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
timer.observe_duration();
Ok(Json(LogResponse { logs }))
}
Err(_) => Ok(Json(LogResponse { logs: vec![] })),
}
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/logs/<kind>/raw")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw",
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(
description = "Process logs of {type} fetched raw", body = String, status = 200,
example = json!("# PATH path/of/file.log\nserver started on port 3000")
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn logs_raw_handler(id: usize, kind: String, _t: Token) -> Result<String, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => item.logs().out,
"error" | "stderr" => item.logs().error,
_ => item.logs().out,
};
let data = match fs::read_to_string(&log_file) {
Ok(data) => format!("# PATH {log_file}\n{data}"),
Err(err) => err.to_string(),
};
timer.observe_duration();
Ok(data)
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/info")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/info", security((), ("api_key" = [])),
params(("id" = usize, Path, description = "Process id to get information for", example = 0)),
responses(
(status = 200, description = "Current process info retrieved", body = ItemSingle),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn info_handler(id: usize, _t: Token) -> Result<Json<ItemSingle>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
let runner = Runner::new();
if runner.exists(id) {
let item = runner.get(id);
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(Json(item.fetch()))
} else {
Err(not_found("Process was not found"))
}
}
#[post("/process/create", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = CreateBody),
security((), ("api_key" = [])),
responses(
(
description = "Create process successful", body = ActionResponse,
example = json!({"action": "create", "done": true }), status = 200,
),
(status = INTERNAL_SERVER_ERROR, description = "Failed to create process", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn create_handler(body: Json<CreateBody>, _t: Token) -> Result<Json<ActionResponse>, ()> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["create"]).start_timer();
let mut runner = Runner::new();
HTTP_COUNTER.inc();
let name = match &body.name {
Some(name) => string!(name),
None => string!(body.script.split_whitespace().next().unwrap_or_default()),
};
runner.start(&name, &body.script, body.path.clone(), &body.watch).save();
timer.observe_duration();
Ok(Json(attempt(true, "create")))
}
#[post("/process/<id>/rename", format = "text", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/rename",
security((), ("api_key" = [])),
request_body(content = String, example = json!("example_name")),
params(("id" = usize, Path, description = "Process id to rename", example = 0)),
responses(
(
description = "Rename process successful", body = ActionResponse,
example = json!({"action": "rename", "done": true }), status = 200,
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn rename_handler(id: usize, body: String, _t: Token) -> Result<Json<ActionResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
let runner = Runner::new();
match runner.clone().info(id) {
Some(process) => {
HTTP_COUNTER.inc();
let mut item = runner.get(id);
item.rename(body.trim().replace("\n", ""));
then!(process.running, item.restart());
timer.observe_duration();
Ok(Json(attempt(true, "rename")))
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/env")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/env",
params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)),
responses(
(
description = "Current process env", body = HashMap<String, String>,
example = json!({"ENV_TEST_VALUE": "example_value"}), status = 200
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn env_handler(id: usize, _t: Token) -> Result<EnvList, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(Json(item.clone().env))
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[post("/process/<id>/action", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody,
security((), ("api_key" = [])),
params(("id" = usize, Path, description = "Process id to run action on", example = 0)),
responses(
(status = 200, description = "Run action on process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn action_handler(id: usize, body: Json<ActionBody>, _t: Token) -> Result<Json<ActionResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
let mut runner = Runner::new();
let method = body.method.as_str();
if runner.exists(id) {
HTTP_COUNTER.inc();
match method {
"start" | "restart" => {
runner.get(id).restart();
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"stop" | "kill" => {
runner.get(id).stop();
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"reset_env" | "clear_env" => {
runner.get(id).clear_env();
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"remove" | "delete" => {
runner.remove(id);
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"flush" | "clean" => {
runner.flush(id);
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
_ => {
timer.observe_duration();
Err(not_found("Invalid action attempt"))
}
}
} else {
Err(not_found("Process was not found"))
}
}
pub async fn get_metrics() -> MetricsRoot {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer();
let os_info = crate::globals::get_os_info();
let mut pid: Option<Pid> = None;
let mut cpu_percent: Option<f64> = None;
let mut uptime: Option<DateTime<Utc>> = None;
- let mut memory_usage: Option<MemoryInfo> = None;
+ let mut memory_usage: Option<u64> = None;
let mut runner: Runner = file::read_object(global!("pmc.dump"));
HTTP_COUNTER.inc();
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(process) = Process::new(process_id.get()) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
- memory_usage = process.memory_info().ok();
+ memory_usage = Some(process.memory_info().unwrap().rss());
cpu_percent = Some(pmc::service::get_process_cpu_usage_percentage(process_id.get::<i64>()));
}
}
}
- let memory_usage_fmt = match &memory_usage {
- Some(usage) => helpers::format_memory(usage.rss()),
+ let memory_usage_fmt = match memory_usage {
+ Some(usage) => helpers::format_memory(usage),
None => string!("0b"),
};
let cpu_percent_fmt = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
+ None => string!("0.00%"),
};
let uptime_fmt = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
timer.observe_duration();
MetricsRoot {
os: os_info.clone(),
raw: Raw { memory_usage, cpu_percent },
version: Version {
target: env!("PROFILE").into(),
build_date: env!("BUILD_DATE").into(),
pkg: format!("v{}", env!("CARGO_PKG_VERSION")),
hash: ternary!(env!("GIT_HASH_FULL") == "", None, Some(env!("GIT_HASH_FULL").into())),
},
daemon: Daemon {
pid,
uptime: uptime_fmt,
running: pid::exists(),
process_count: runner.count(),
daemon_type: global!("pmc.daemon.kind"),
stats: Stats {
memory_usage: memory_usage_fmt,
cpu_percent: cpu_percent_fmt,
},
},
}
}
#[get("/daemon/metrics")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/metrics", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon metrics", body = MetricsRoot),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn metrics_handler(_t: Token) -> Json<MetricsRoot> { Json(get_metrics().await) }
#[get("/remote/<name>/metrics")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/metrics", security((), ("api_key" = [])),
params(("name" = String, Path, description = "Name of remote daemon", example = "example")),
responses(
(status = 200, description = "Get remote metrics", body = MetricsRoot),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_metrics(name: String, _t: Token) -> Result<Json<MetricsRoot>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/daemon/metrics")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<MetricsRoot>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/live/daemon/<server>/metrics")]
pub async fn stream_metrics(server: String, _t: Token) -> EventStream![] {
EventStream! {
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&server) {
Some(server) => (&server.address, client(&server.token).await),
None => loop {
let response = get_metrics().await;
yield Event::data(serde_json::to_string(&response).unwrap());
sleep(Duration::from_millis(500))
},
};
loop {
match client.get(fmtstr!("{address}/daemon/metrics")).headers(headers.clone()).send().await {
Ok(data) => {
if data.status() != 200 {
break yield Event::data(data.text().await.unwrap());
} else {
yield Event::data(data.text().await.unwrap());
sleep(Duration::from_millis(1500));
}
}
Err(err) => break yield Event::data(format!("{{\"error\": \"{err}\"}}")),
}
}
};
}
}
#[get("/live/process/<server>/<id>")]
pub async fn stream_info(server: String, id: usize, _t: Token) -> EventStream![] {
EventStream! {
let runner = Runner::new();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&server) {
Some(server) => (&server.address, client(&server.token).await),
None => loop {
let item = runner.refresh().get(id);
yield Event::data(serde_json::to_string(&item.fetch()).unwrap());
sleep(Duration::from_millis(1000))
},
};
loop {
match client.get(fmtstr!("{address}/process/{id}/info")).headers(headers.clone()).send().await {
Ok(data) => {
if data.status() != 200 {
break yield Event::data(data.text().await.unwrap());
} else {
yield Event::data(data.text().await.unwrap());
sleep(Duration::from_millis(1500));
}
}
Err(err) => break yield Event::data(format!("{{\"error\": \"{err}\"}}")),
}
}
};
}
}
diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs
index a01640c..82ec9f3 100644
--- a/src/daemon/mod.rs
+++ b/src/daemon/mod.rs
@@ -1,313 +1,313 @@
#[macro_use]
mod log;
mod api;
mod fork;
use api::{DAEMON_CPU_PERCENTAGE, DAEMON_MEM_USAGE, DAEMON_START_TIME};
use chrono::{DateTime, Utc};
use colored::Colorize;
use fork::{daemon, Fork};
use global_placeholders::global;
use macros_rs::{crashln, str, string, ternary, then};
use psutil::process::{MemoryInfo, Process};
use serde::Serialize;
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use std::{process, thread::sleep, time::Duration};
use pmc::{
config, file,
helpers::{self, ColoredString},
process::{hash, id::Id, Runner, Status},
};
use tabled::{
settings::{
object::Columns,
style::{BorderColor, Style},
themes::Colorization,
Color, Rotate,
},
Table, Tabled,
};
static ENABLE_API: AtomicBool = AtomicBool::new(false);
static ENABLE_WEBUI: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_termination_signal(_: libc::c_int) {
pid::remove();
log!("[daemon] killed", "pid" => process::id());
unsafe { libc::_exit(0) }
}
fn restart_process() {
for (id, item) in Runner::new().items_mut() {
let mut runner = Runner::new();
let children = pmc::service::find_chidren(item.pid);
if !children.is_empty() && children != item.children {
log!("[daemon] added", "children" => format!("{children:?}"));
runner.set_children(*id, children).save();
}
if item.running && item.watch.enabled {
let path = item.path.join(item.watch.path.clone());
let hash = hash::create(path);
if hash != item.watch.hash {
runner.restart(item.id, false);
log!("[daemon] watch reload", "name" => item.name, "hash" => "hash");
continue;
}
}
if !item.running && pid::running(item.pid as i32) {
Runner::new().set_status(*id, Status::Running);
log!("[daemon] process fix status", "name" => item.name, "id" => id);
continue;
}
then!(!item.running || pid::running(item.pid as i32), continue);
if item.running && item.crash.value == config::read().daemon.restarts {
log!("[daemon] process has crashed", "name" => item.name, "id" => id);
runner.stop(item.id);
runner.set_crashed(*id).save();
continue;
} else {
runner.get(item.id).crashed();
log!("[daemon] restarted", "name" => item.name, "id" => id, "crashes" => item.crash.value);
}
}
}
pub fn health(format: &String) {
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f64> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_object(global!("pmc.dump"));
#[derive(Clone, Debug, Tabled)]
struct Info {
#[tabled(rename = "pid file")]
pid_file: String,
#[tabled(rename = "fork path")]
path: String,
#[tabled(rename = "cpu percent")]
cpu_percent: String,
#[tabled(rename = "memory usage")]
memory_usage: String,
#[tabled(rename = "daemon type")]
external: String,
#[tabled(rename = "process count")]
process_count: usize,
uptime: String,
pid: String,
status: ColoredString,
}
impl Serialize for Info {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let trimmed_json = json!({
"pid_file": &self.pid_file.trim(),
"path": &self.path.trim(),
"cpu": &self.cpu_percent.trim(),
"mem": &self.memory_usage.trim(),
"process_count": &self.process_count.to_string(),
"uptime": &self.uptime.trim(),
"pid": &self.pid.trim(),
"status": &self.status.0.trim(),
});
trimmed_json.serialize(serializer)
}
}
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(process) = Process::new(process_id.get::<u32>()) {
pid = Some(process.pid() as i32);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = Some(pmc::service::get_process_cpu_usage_percentage(process_id.get::<i64>()));
}
}
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
- None => string!("0%"),
+ None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
let pid = match pid {
Some(pid) => string!(pid),
None => string!("n/a"),
};
let data = vec![Info {
pid: pid,
cpu_percent,
memory_usage,
uptime: uptime,
path: global!("pmc.base"),
external: global!("pmc.daemon.kind"),
process_count: runner.count(),
pid_file: format!("{} ", global!("pmc.pid")),
status: ColoredString(ternary!(pid::exists(), "online".green().bold(), "stopped".red().bold())),
}];
let table = Table::new(data.clone())
.with(Rotate::Left)
.with(Style::rounded().remove_horizontals())
.with(Colorization::exact([Color::FG_CYAN], Columns::first()))
.with(BorderColor::filled(Color::FG_BRIGHT_BLACK))
.to_string();
if let Ok(json) = serde_json::to_string(&data[0]) {
match format.as_str() {
"raw" => println!("{:?}", data[0]),
"json" => println!("{json}"),
"default" => {
println!("{}\n{table}\n", format!("PMC daemon information").on_bright_white().black());
println!(" {}", format!("Use `pmc daemon restart` to restart the daemon").white());
println!(" {}", format!("Use `pmc daemon reset` to clean process id values").white());
}
_ => {}
};
};
}
pub fn stop() {
if pid::exists() {
println!("{} Stopping PMC daemon", *helpers::SUCCESS);
match pid::read() {
Ok(pid) => {
pmc::service::stop(pid.get());
pid::remove();
log!("[daemon] stopped", "pid" => pid);
println!("{} PMC daemon stopped", *helpers::SUCCESS);
}
Err(err) => crashln!("{} Failed to read PID file: {}", *helpers::FAIL, err),
}
} else {
crashln!("{} The daemon is not running", *helpers::FAIL)
}
}
pub fn start(verbose: bool) {
let external = match global!("pmc.daemon.kind").as_str() {
"external" => true,
"default" => false,
"rust" => false,
"cc" => true,
_ => false,
};
println!("{} Spawning PMC daemon (pmc_base={})", *helpers::SUCCESS, global!("pmc.base"));
if ENABLE_API.load(Ordering::Acquire) {
println!(
"{} API server started (address={}, webui={})",
*helpers::SUCCESS,
config::read().fmt_address(),
ENABLE_WEBUI.load(Ordering::Acquire)
);
}
if pid::exists() {
match pid::read() {
Ok(pid) => then!(!pid::running(pid.get()), pid::remove()),
Err(_) => crashln!("{} The daemon is already running", *helpers::FAIL),
}
}
#[inline]
#[tokio::main]
async extern "C" fn init() {
pid::name("PMC Restart Handler Daemon");
let config = config::read().daemon;
let api_enabled = ENABLE_API.load(Ordering::Acquire);
let ui_enabled = ENABLE_WEBUI.load(Ordering::Acquire);
unsafe { libc::signal(libc::SIGTERM, handle_termination_signal as usize) };
DAEMON_START_TIME.set(Utc::now().timestamp_millis() as f64);
pid::write(process::id());
log!("[daemon] new fork", "pid" => process::id());
if api_enabled {
log!("[api] server queued", "address" => config::read().fmt_address());
tokio::spawn(async move { api::start(ui_enabled).await });
}
loop {
if api_enabled {
if let Ok(process) = Process::new(process::id()) {
DAEMON_CPU_PERCENTAGE.observe(pmc::service::get_process_cpu_usage_percentage(process.pid() as i64));
DAEMON_MEM_USAGE.observe(process.memory_info().ok().unwrap().rss() as f64);
}
}
then!(!Runner::new().is_empty(), restart_process());
sleep(Duration::from_millis(config.interval));
}
}
println!("{} PMC Successfully daemonized (type={})", *helpers::SUCCESS, global!("pmc.daemon.kind"));
if external {
let callback = pmc::Callback(init);
pmc::service::try_fork(false, verbose, callback);
} else {
match daemon(false, verbose) {
Ok(Fork::Parent(_)) => {}
Ok(Fork::Child) => init(),
Err(err) => crashln!("{} Daemon creation failed with code {err}", *helpers::FAIL),
}
}
}
pub fn restart(api: &bool, webui: &bool, verbose: bool) {
if pid::exists() {
stop();
}
let config = config::read().daemon;
if config.web.ui || *webui {
ENABLE_API.store(true, Ordering::Release);
ENABLE_WEBUI.store(true, Ordering::Release);
} else if config.web.api {
ENABLE_API.store(true, Ordering::Release);
} else {
ENABLE_API.store(*api, Ordering::Release);
}
start(verbose);
}
pub fn reset() {
let mut runner = Runner::new();
let largest = runner.size();
match largest {
Some(id) => runner.set_id(Id::from(str!(id.to_string()))),
None => runner.set_id(Id::new(0)),
}
println!("{} Successfully reset (index={})", *helpers::SUCCESS, runner.id);
}
pub mod pid;
diff --git a/src/webui/src/components/react/status.tsx b/src/webui/src/components/react/status.tsx
index 58f4c1e..0deaaab 100644
--- a/src/webui/src/components/react/status.tsx
+++ b/src/webui/src/components/react/status.tsx
@@ -1,211 +1,211 @@
import { Line } from 'react-chartjs-2';
import { SSE, api, headers } from '@/api';
import Loader from '@/components/react/loader';
import { useEffect, useState, useRef, Fragment } from 'react';
import { classNames, isRunning, formatMemory, startDuration, useArray } from '@/helpers';
import { Chart, CategoryScale, LinearScale, PointElement, LineElement, Filler } from 'chart.js';
Chart.register(CategoryScale, LinearScale, PointElement, LineElement, Filler);
const bytesToSize = (bytes: number, precision: number) => {
if (isNaN(bytes) || bytes === 0) return '0b';
const sizes = ['b', 'kb', 'mb', 'gb', 'tb'];
const kilobyte = 1024;
const index = Math.floor(Math.log(bytes) / Math.log(kilobyte));
const size = (bytes / Math.pow(kilobyte, index)).toFixed(precision);
return size + sizes[index];
};
const Status = (props: { name: string; base: string }) => {
const bufferLength = 21;
const memoryUsage = useArray([], bufferLength);
const cpuPercentage = useArray([], bufferLength);
const [item, setItem] = useState<any>();
const [loaded, setLoaded] = useState(false);
const [live, setLive] = useState<SSE | null>(null);
const options = {
responsive: true,
maintainAspectRatio: false,
animation: { duration: 0 },
layout: {
padding: {
left: 0,
right: 0,
bottom: 0,
top: 0
}
},
plugins: {
tooltips: { enabled: false },
title: { display: false }
},
elements: {
point: { radius: 0 },
line: { tension: 0.5, borderWidth: 1 }
},
scales: {
x: { display: false },
y: { display: false, suggestedMin: 0 }
},
data: {
labels: Array(20).fill(''),
datasets: [{ fill: true, data: Array(20).fill(0) }]
}
};
const chartContainerStyle = {
borderRadius: '0 0 0.5rem 0.5rem',
marginBottom: '0.5px',
zIndex: 1
};
const cpuChart = {
labels: Array(20).fill(''),
datasets: [
{
fill: true,
data: cpuPercentage.value,
borderColor: '#0284c7',
backgroundColor: (ctx: any) => {
const chart = ctx.chart;
const { ctx: context, chartArea } = chart;
if (!chartArea) {
return null;
}
const gradient = context.createLinearGradient(0, chartArea.bottom, 0, chartArea.top);
gradient.addColorStop(0, 'rgba(14, 165, 233, 0.1)');
gradient.addColorStop(1, 'rgba(14, 165, 233, 0.5)');
return gradient;
}
}
]
};
const memoryChart = {
labels: Array(20).fill(''),
datasets: [
{
fill: true,
data: memoryUsage.value,
borderColor: '#0284c7',
backgroundColor: (ctx: any) => {
const chart = ctx.chart;
const { ctx: context, chartArea } = chart;
if (!chartArea) {
return null;
}
const gradient = context.createLinearGradient(0, chartArea.bottom, 0, chartArea.top);
gradient.addColorStop(0, 'rgba(14, 165, 233, 0.1)');
gradient.addColorStop(1, 'rgba(14, 165, 233, 0.5)');
return gradient;
}
}
]
};
const openConnection = () => {
let retryTimeout;
let hasRun = false;
- const source = new SSE(`${props.base}/live/daemon/${props.server}/metrics`, { headers });
+ const source = new SSE(`${props.base}/live/daemon/${props.name}/metrics`, { headers });
setLive(source);
source.onmessage = (event) => {
const data = JSON.parse(event.data);
setItem(data);
- memoryUsage.pushMax(data.raw.memory_usage.rss);
+ memoryUsage.pushMax(data.raw.memory_usage);
cpuPercentage.pushMax(data.raw.cpu_percent);
if (!hasRun) {
setLoaded(true);
hasRun = true;
}
};
source.onerror = (error) => {
source.close();
retryTimeout = setTimeout(() => {
openConnection();
}, 5000);
};
return retryTimeout;
};
useEffect(() => {
const retryTimeout = openConnection();
return () => {
live && live.close();
clearTimeout(retryTimeout);
};
}, []);
if (!loaded) {
return <Loader />;
} else {
const stats = [
{ name: 'Uptime', stat: startDuration(item.daemon.uptime, false) },
{ name: 'Count', stat: item.daemon.process_count },
{ name: 'Version', stat: item.version.pkg },
{ name: 'Process Id', stat: item.daemon.pid },
{ name: 'Build date', stat: item.version.build_date },
{ name: 'Hash', stat: item.version.hash.slice(0, 18) },
{ name: 'Platform', stat: `${item.os.name} ${item.os.version} (${item.os.arch})` },
{ name: 'Daemon', stat: item.daemon.daemon_type }
];
return (
<Fragment>
<div className="absolute top-2 right-3 z-[200]">
<span className="inline-flex items-center gap-x-1.5 rounded-md px-2 py-1 text-xs font-medium text-white ring-1 ring-inset ring-zinc-800">
<svg viewBox="0 0 6 6" aria-hidden="true" className="h-1.5 w-1.5 fill-green-400">
<circle r={3} cx={3} cy={3} />
</svg>
{props.name != 'local' ? props.name : 'Internal'}
</span>
</div>
<dl className="mt-8 grid grid-cols-1 gap-5 sm:grid-cols-2 px-5">
<div className="overflow-hidden rounded-lg bg-zinc-900/20 border border-zinc-800 shadow">
<dt className="truncate text-sm font-bold text-zinc-400 pt-4 px-4">CPU Usage</dt>
<dt className="truncate text-xl font-bold text-zinc-100 p-1 px-4">
{cpuPercentage.value.slice(-1)[0].toFixed(2)}
<span className="text-base text-zinc-400">%</span>
</dt>
<dd className="mt-2 text-3xl font-semibold tracking-tight text-zinc-100 h-96" style={chartContainerStyle}>
<Line data={cpuChart} options={options} />
</dd>
</div>
<div className="overflow-hidden rounded-lg bg-zinc-900/20 border border-zinc-800 shadow">
<dt className="truncate text-sm font-bold text-zinc-400 pt-4 px-4">Memory Usage</dt>
<dt className="truncate text-xl font-bold text-zinc-100 p-1 px-4">{bytesToSize(memoryUsage.value.slice(-1)[0], 2)}</dt>
<dd className="mt-2 text-3xl font-semibold tracking-tight text-zinc-100 h-96" style={chartContainerStyle}>
<Line data={memoryChart} options={options} />
</dd>
</div>
</dl>
<dl className="mt-5 pb-5 grid grid-cols-2 gap-5 lg:grid-cols-4 px-5 h-3/10">
{stats.map((item: any) => (
<div key={item.name} className="overflow-hidden rounded-lg bg-zinc-900/20 border border-zinc-800 px-4 py-5 shadow sm:p-6">
<dt className="truncate text-sm font-medium text-zinc-400">{item.name}</dt>
<dd className="mt-1 text-2xl font-semibold tracking-tight text-zinc-100">{item.stat}</dd>
</div>
))}
</dl>
</Fragment>
);
}
};
export default Status;

File Metadata

Mime Type
text/x-diff
Expires
Sun, Feb 1, 11:37 PM (1 d, 15 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
494957
Default Alt Text
(100 KB)

Event Timeline