Page MenuHomePhorge

No OneTemporary

Size
76 KB
Referenced Files
None
Subscribers
None
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 9f8f917..d042e3a 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,49 +1,49 @@
stages: [build, release]
-image: 'themackabu/rust:zigbuild-1.75.0'
+image: 'themackabu/rust:zigbuild-1.77.0'
before_script:
- mkdir binary
- apt-get update -yqq
- apt-get install -yqq zip clang llvm
- export CC="/usr/bin/clang"
- export CXX="/usr/bin/clang++"
build_linux_amd64:
stage: build
- tags: [fedora]
+ tags: [rust]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r --color always
- zip binary/pmc_${CI_COMMIT_TAG}_linux_amd64.zip target/release/pmc -j
artifacts:
paths: [binary/]
build_linux_aarch64:
stage: build
- tags: [fedora]
+ tags: [rust]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r --target aarch64-unknown-linux-gnu --color always
- zip binary/pmc_${CI_COMMIT_TAG}_linux_aarch64.zip target/aarch64-unknown-linux-gnu/release/pmc -j
artifacts:
paths: [binary/]
build_darwin_amd64:
stage: build
- tags: [fedora]
+ tags: [rust]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r --target x86_64-apple-darwin --color always
- zip binary/pmc_${CI_COMMIT_TAG}_darwin_amd64.zip target/x86_64-apple-darwin/release/pmc -j
artifacts:
paths: [binary/]
build_darwin_aarch64:
stage: build
- tags: [fedora]
+ tags: [rust]
only: [/\d+\.\d+\.\d+.*$/]
script:
- cargo zigbuild -r --target aarch64-apple-darwin --color always
- zip binary/pmc_${CI_COMMIT_TAG}_darwin_arm.zip target/aarch64-apple-darwin/release/pmc -j
artifacts:
paths: [binary/]
diff --git a/lib/bridge.cc b/lib/bridge.cc
index ad49271..9611b91 100644
--- a/lib/bridge.cc
+++ b/lib/bridge.cc
@@ -1,126 +1,128 @@
#include <bridge.h>
#include <process.h>
#include <iostream>
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <vector>
#ifdef __linux__
#include <cstring>
#include <string>
#include <cstdlib>
#include <dirent.h>
#include <sys/prctl.h>
#elif __APPLE__
#include <libproc.h>
#include <sys/proc_info.h>
#include <libproc.h>
#include <sys/proc_info.h>
#include <iostream>
#include <crt_externs.h>
#endif
using namespace std;
void set_program_name(String name) {
#ifdef __linux__
prctl(PR_SET_NAME, name.c_str());
#elif __APPLE__
setprogname(name.c_str());
#endif
}
int64_t get_child_pid(int64_t parentPID) {
#ifdef __linux__
DIR *dir = opendir("/proc");
if (!dir) {
std::cerr << "[PMC] (cc) Error opening /proc directory.\n";
+ perror("get_child_pid");
return -1;
}
int targetPID = -1;
dirent *entry;
while ((entry = readdir(dir)) != nullptr) {
if (entry->d_type == DT_DIR && isdigit(entry->d_name[0])) {
int pid = atoi(entry->d_name);
char statusPath[256];
snprintf(statusPath, sizeof(statusPath), "/proc/%d/status", pid);
FILE *statusFile = fopen(statusPath, "r");
if (statusFile) {
char buffer[256];
while (fgets(buffer, sizeof(buffer), statusFile) != nullptr) {
if (strncmp(buffer, "PPid:", 5) == 0) {
int parentID;
if (sscanf(buffer + 5, "%d", &parentID) == 1 && parentID == parentPID) {
targetPID = pid; break;
} break;
}
}
fclose(statusFile);
}
}
}
closedir(dir);
return targetPID;
#elif __APPLE__
pid_t pidList[1024];
int count = proc_listpids(PROC_ALL_PIDS, 0, pidList, sizeof(pidList));
if (count <= 0) {
std::cerr << "Error retrieving process list." << std::endl;
+ perror("get_child_pid");
return -1;
}
for (int i = 0; i < count; ++i) {
struct proc_bsdinfo procInfo;
if (proc_pidinfo(pidList[i], PROC_PIDTBSDINFO, 0, &procInfo, sizeof(procInfo)) > 0) {
if (procInfo.pbi_ppid == parentPID) {
return static_cast<int>(pidList[i]);
}
}
}
return -1;
#else
return -1;
#endif
}
rust::Vec<rust::i64> find_chidren(int64_t pid) {
rust::Vec<rust::i64> children;
int64_t child;
while ((child = get_child_pid(pid)) != -1) {
children.push_back(child);
pid = child;
}
return children;
}
int64_t stop(int64_t pid) {
vector<pid_t> children;
int64_t child;
while ((child = get_child_pid(pid)) != -1) {
children.push_back(child);
pid = child;
}
for (size_t i = 0; i < children.size(); i++) {
kill(children[i], SIGTERM);
}
return kill(pid, SIGTERM);
}
int64_t run(ProcessMetadata metadata) {
process::Runner runner;
runner.New(std::string(metadata.name), std::string(metadata.log_path));
- return runner.Run(std::string(metadata.command), std::string(metadata.shell), metadata.args);
+ return runner.Run(std::string(metadata.command), std::string(metadata.shell), metadata.args, metadata.env);
}
\ No newline at end of file
diff --git a/lib/fork.cc b/lib/fork.cc
index a2db2e6..91a0ca4 100644
--- a/lib/fork.cc
+++ b/lib/fork.cc
@@ -1,84 +1,93 @@
#include <fork.h>
#include <stdexcept>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <cstring>
#ifdef _WIN32
#include <windows.h>
#else
#include <pwd.h>
#include <unistd.h>
#endif
using namespace std;
std::string home() {
#ifdef _WIN32
const char* userProfile = std::getenv("USERPROFILE");
if (userProfile) {
return std::string(userProfile);
} else {
return "";
}
#else
struct passwd* pw = getpwuid(getuid());
if (pw && pw->pw_dir) {
return std::string(pw->pw_dir);
} else {
return "";
}
#endif
}
Fork fork_process() {
pid_t res = ::fork();
if (res == -1) {
+ perror("fork_process");
throw std::runtime_error("fork() failed");
} else if (res == 0) {
return Fork::Child;
} else {
return Fork::Parent;
}
}
pid_t set_sid() {
pid_t res = ::setsid();
if (res == -1) {
+ perror("set_sid");
throw std::runtime_error("setsid() failed");
}
return res;
}
void close_fd() {
- if (::close(0) == -1 || ::close(1) == -1 || ::close(2) == -1) {
- throw std::runtime_error("close() failed");
+ bool res = false;
+ for (int i = 0; i <= 2; ++i) {
+ res |= (::close(i) == -1);
+ }
+
+ if (res) {
+ perror("close_fd");
+ throw std::runtime_error("close_fd() failed");
}
}
int32_t try_fork(bool nochdir, bool noclose, Callback callback) {
try {
Fork forkResult = fork_process();
if (forkResult == Fork::Parent) {
exit(0);
} else if (forkResult == Fork::Child) {
set_sid();
if (!nochdir) {
std::string home_dir = home() + ".pmc";
chdir(home_dir.c_str());
}
if (!noclose) {
close_fd();
}
forkResult = fork_process();
}
return static_cast<int32_t>(forkResult);
} catch (const std::exception& e) {
std::cerr << "[PMC] (cc) Error setting up daemon handler\n";
+ perror("try_fork");
}
callback();
return -1;
}
\ No newline at end of file
diff --git a/lib/include/bridge.h b/lib/include/bridge.h
index 4c76fef..57fefb5 100644
--- a/lib/include/bridge.h
+++ b/lib/include/bridge.h
@@ -1,24 +1,25 @@
#ifndef BRIDGE_H
#define BRIDGE_H
#include <rust.h>
using namespace rust;
#ifndef CXXBRIDGE1_STRUCT_ProcessMetadata
#define CXXBRIDGE1_STRUCT_ProcessMetadata
struct ProcessMetadata final {
String name;
String shell;
String command;
String log_path;
Vec<String> args;
+ Vec<String> env;
using IsRelocatable = std::true_type;
};
#endif
extern "C++" int64_t stop(int64_t pid);
extern "C++" int64_t run(ProcessMetadata metadata);
extern "C++" void set_program_name(String name);
extern "C++" int64_t get_child_pid(int64_t parentPID);
extern "C++" Vec<i64> find_chidren(int64_t pid);
#endif
diff --git a/lib/include/process.h b/lib/include/process.h
index 9a3ef5c..fe721c7 100644
--- a/lib/include/process.h
+++ b/lib/include/process.h
@@ -1,20 +1,20 @@
#ifndef PROCESS_H
#define PROCESS_H
#include <rust.h>
using namespace rust;
namespace process {
class Runner {
public:
void New(const std::string &name, const std::string &logPath);
- int64_t Run(const std::string &command, const std::string &shell, Vec<String> args);
+ int64_t Run(const std::string &command, const std::string &shell, Vec<String> args, Vec<String> env);
~Runner();
private:
int stdout_fd;
int stderr_fd;
};
}
#endif
diff --git a/lib/process.cc b/lib/process.cc
index 2d9e036..38543ec 100644
--- a/lib/process.cc
+++ b/lib/process.cc
@@ -1,101 +1,109 @@
#include <process.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/wait.h>
#include <signal.h>
#include <iostream>
#include <algorithm>
using namespace std;
namespace process {
volatile sig_atomic_t childExitStatus = 0;
std::string format(std::string text) {
std::replace(text.begin(), text.end(), ' ', '_');
return text;
}
pair<std::string, std::string> split(const std::string& str) {
size_t length = str.length();
size_t midpoint = length / 2;
std::string firstHalf = str.substr(0, midpoint);
std::string secondHalf = str.substr(midpoint);
return make_pair(firstHalf, secondHalf);
}
void sigchld_handler(int signo) {
(void)signo;
int status;
while (waitpid(-1, &status, WNOHANG) > 0) {
childExitStatus = status;
}
}
void Runner::New(const std::string &name, const std::string &logPath) {
std::string formattedName = format(name);
std::string stdoutFileName = logPath + "/" + formattedName + "-out.log";
std::string stderrFileName = logPath + "/" + formattedName + "-error.log";
stdout_fd = open(stdoutFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
stderr_fd = open(stderrFileName.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
struct sigaction sa;
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART | SA_NOCLDSTOP;
if (sigaction(SIGCHLD, &sa, NULL) == -1) {
std::cerr << "[PMC] (cc) Error setting up SIGCHLD handler\n";
+ perror("Runner::New");
}
}
Runner::~Runner() {
if (stdout_fd != -1) {
close(stdout_fd);
}
if (stderr_fd != -1) {
close(stderr_fd);
}
}
-int64_t Runner::Run(const std::string &command, const std::string &shell, Vec<String> args) {
+int64_t Runner::Run(const std::string &command, const std::string &shell, Vec<String> args, Vec<String> env) {
pid_t pid = fork();
if (pid == -1) {
std::cerr << "[PMC] (cc) Unable to fork\n";
+ perror("Runner::Run");
return -1;
} else if (pid == 0) {
setsid();
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
dup2(stdout_fd, STDOUT_FILENO);
dup2(stderr_fd, STDERR_FILENO);
std::vector<const char*> argsArray;
+ std::vector<const char*> envArray;
argsArray.push_back(shell.c_str());
transform(args.begin(), args.end(), std::back_inserter(argsArray),
[](rust::String& arg) { return arg.c_str(); });
+
+ transform(env.begin(), env.end(), std::back_inserter(envArray),
+ [](rust::String& env) { return env.c_str(); });
argsArray.push_back(command.c_str());
- argsArray.push_back((char *)nullptr);
+ argsArray.push_back(nullptr);
+ envArray.push_back(nullptr);
- if (execvp(shell.c_str(), const_cast<char* const*>(argsArray.data())) == -1) {
+ if (execve(shell.c_str(), const_cast<char* const*>(argsArray.data()), const_cast<char* const*>(envArray.data())) == -1) {
std::cerr << "[PMC] (cc) Unable to execute the command\n";
+ perror("execvp");
exit(EXIT_FAILURE);
}
} else {
close(stdout_fd);
close(stderr_fd);
return pid;
}
return -1;
}}
diff --git a/pmc.service b/pmc.service
index 163f4c6..d2237a4 100644
--- a/pmc.service
+++ b/pmc.service
@@ -1,23 +1,25 @@
+# example systemd file. edit as needed
+
[Unit]
Description=PMC Daemon
After=network.target
[Service]
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
User=root
Type=forking
WorkingDirectory=/root/.pmc
PIDFile=/root/.pmc/daemon.pid
Restart=on-failure
StartLimitInterval=180
StartLimitBurst=30
RestartSec=5s
-ExecStart=/root/.cargo/bin/pmc daemon start
-ExecStop=/root/.cargo/bin/pmc daemon stop
+ExecStart=/root/.cargo/bin/pmc daemon start -vvv
+ExecStop=/root/.cargo/bin/pmc daemon stop -vvv
[Install]
WantedBy=multi-user.target
diff --git a/src/daemon/api/routes.rs b/src/daemon/api/routes.rs
index dc37a49..cd71588 100644
--- a/src/daemon/api/routes.rs
+++ b/src/daemon/api/routes.rs
@@ -1,834 +1,834 @@
#![allow(non_snake_case)]
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{fmtstr, string, ternary, then};
use prometheus::{Encoder, TextEncoder};
use psutil::process::{MemoryInfo, Process};
use reqwest::header::HeaderValue;
use serde::Deserialize;
use tera::{Context, Tera};
use utoipa::ToSchema;
use rocket::{
get,
http::{ContentType, Status},
post,
serde::{json::Json, Serialize},
State,
};
use super::{
helpers::{generic_error, not_found, GenericError, NotFound},
structs::ErrorMessage,
EnableWebUI, TeraState,
};
use pmc::{
config, file, helpers,
process::{dump, http::client, ItemSingle, ProcessItem, Runner},
};
use crate::daemon::{
api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM},
pid,
};
use std::{
- collections::HashMap,
+ collections::BTreeMap,
env,
fs::{self, File},
io::{self, BufRead, BufReader},
path::PathBuf,
};
pub(crate) struct Token;
-type EnvList = Json<HashMap<String, String>>;
+type EnvList = Json<BTreeMap<String, String>>;
#[allow(dead_code)]
#[derive(ToSchema)]
#[schema(as = MemoryInfo)]
pub(crate) struct DocMemoryInfo {
rss: u64,
vms: u64,
#[cfg(target_os = "linux")]
shared: u64,
#[cfg(target_os = "linux")]
text: u64,
#[cfg(target_os = "linux")]
data: u64,
#[cfg(target_os = "macos")]
page_faults: u64,
#[cfg(target_os = "macos")]
pageins: u64,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct ActionBody {
#[schema(example = "restart")]
method: String,
}
#[derive(Serialize, ToSchema)]
pub(crate) struct ConfigBody {
#[schema(example = "bash")]
shell: String,
#[schema(min_items = 1, example = json!(["-c"]))]
args: Vec<String>,
#[schema(example = "/home/user/.pmc/logs")]
log_path: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct CreateBody {
#[schema(example = "app")]
name: Option<String>,
#[schema(example = "node index.js")]
script: String,
#[schema(value_type = String, example = "/projects/app")]
path: PathBuf,
#[schema(example = "src")]
watch: Option<String>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct ActionResponse {
#[schema(example = true)]
done: bool,
#[schema(example = "name")]
action: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub(crate) struct LogResponse {
logs: Vec<String>,
}
#[derive(Serialize, ToSchema)]
pub struct MetricsRoot {
pub version: Version,
pub daemon: Daemon,
}
#[derive(Serialize, ToSchema)]
pub struct Version {
#[schema(example = "v1.0.0")]
pub pkg: String,
pub hash: &'static str,
#[schema(example = "2000-01-01")]
pub build_date: &'static str,
#[schema(example = "release")]
pub target: &'static str,
}
#[derive(Serialize, ToSchema)]
pub struct Daemon {
pub pid: Option<i32>,
#[schema(example = true)]
pub running: bool,
pub uptime: String,
pub process_count: usize,
#[schema(example = "default")]
pub daemon_type: String,
pub stats: Stats,
}
#[derive(Serialize, ToSchema)]
pub struct Stats {
pub memory_usage: String,
pub cpu_percent: String,
}
fn attempt(done: bool, method: &str) -> ActionResponse {
ActionResponse {
done,
action: ternary!(done, Box::leak(Box::from(method)), "DOES_NOT_EXIST").to_string(),
}
}
fn render(name: &str, tmpl: &Tera, ctx: &Context) -> Result<String, NotFound> { tmpl.render(name, &ctx).or(Err(not_found("Page was not found"))) }
#[get("/")]
pub async fn dashboard(state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> {
let mut ctx = Context::new();
ctx.insert("base_path", &state.path);
let payload = render("dashboard", &state.tera, &ctx)?;
Ok((ContentType::HTML, payload))
}
#[get("/login")]
pub async fn login(state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> {
let mut ctx = Context::new();
ctx.insert("base_path", &state.path);
let payload = render("login", &state.tera, &ctx)?;
Ok((ContentType::HTML, payload))
}
#[get("/view/<id>")]
pub async fn view_process(id: usize, state: &State<TeraState>, _webui: EnableWebUI) -> Result<(ContentType, String), NotFound> {
let mut ctx = Context::new();
ctx.insert("base_path", &state.path);
ctx.insert("process_id", &id);
let payload = render("view", &state.tera, &ctx)?;
Ok((ContentType::HTML, payload))
}
#[get("/daemon/prometheus")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/prometheus", security((), ("api_key" = [])),
responses(
(
description = "Get prometheus metrics", body = String, status = 200,
example = json!("# HELP daemon_cpu_percentage The cpu usage graph of the daemon.\n# TYPE daemon_cpu_percentage histogram\ndaemon_cpu_percentage_bucket{le=\"0.005\"} 0"),
),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn prometheus_handler(_t: Token) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer.clone()).unwrap()
}
#[get("/daemon/servers")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/servers", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon servers successfully", body = [String]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn servers_handler(_t: Token) -> Result<Json<Vec<String>>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["servers"]).start_timer();
if let Some(servers) = config::servers().servers {
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(Json(servers.into_keys().collect()))
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/list")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/list", security((), ("api_key" = [])),
params(("name" = String, Path, description = "Name of remote daemon", example = "example"),),
responses(
(status = 200, description = "Get list from remote daemon successfully", body = [ProcessItem]),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_list(name: String, _t: Token) -> Result<Json<Vec<ProcessItem>>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/list")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<Vec<ProcessItem>>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/info/<id>")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/info/{id}", security((), ("api_key" = [])),
params(
("name" = String, Path, description = "Name of remote daemon", example = "example"),
("id" = usize, Path, description = "Process id to get information for", example = 0)
),
responses(
(status = 200, description = "Get process info from remote daemon successfully", body = [ProcessItem]),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_info(name: String, id: usize, _t: Token) -> Result<Json<ItemSingle>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/process/{id}/info")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ItemSingle>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/remote/<name>/logs/<id>/<kind>")]
#[utoipa::path(get, tag = "Remote", path = "/remote/{name}/logs/{id}/{kind}", security((), ("api_key" = [])),
params(
("name" = String, Path, description = "Name of remote daemon", example = "example"),
("id" = usize, Path, description = "Process id to get information for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Remote process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Remote daemon does not exist", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_logs(name: String, id: usize, kind: String, _t: Token) -> Result<Json<LogResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.get(fmtstr!("{address}/process/{id}/logs/{kind}")).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<LogResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[post("/remote/<name>/rename/<id>", format = "text", data = "<body>")]
#[utoipa::path(post, tag = "Remote", path = "/remote/{name}/rename/{id}",
security((), ("api_key" = [])),
request_body(content = String, example = json!("example_name")),
params(
("id" = usize, Path, description = "Process id to rename", example = 0),
("name" = String, Path, description = "Name of remote daemon", example = "example"),
),
responses(
(
description = "Remote rename process successful", body = ActionResponse,
example = json!({"action": "rename", "done": true }), status = 200,
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_rename(name: String, id: usize, body: String, _t: Token) -> Result<Json<ActionResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, mut headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
headers.insert("content-type", HeaderValue::from_static("text/plain"));
match client.post(fmtstr!("{address}/process/{id}/rename")).body(body).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ActionResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[post("/remote/<name>/action/<id>", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Remote", path = "/remote/{name}/action/{id}", request_body = ActionBody,
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to run action on", example = 0),
("name" = String, Path, description = "Name of remote daemon", example = "example")
),
responses(
(status = 200, description = "Run action on remote process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn remote_action(name: String, id: usize, body: Json<ActionBody>, _t: Token) -> Result<Json<ActionResponse>, GenericError> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
if let Some(servers) = config::servers().servers {
let (address, (client, headers)) = match servers.get(&name) {
Some(server) => (&server.address, client(&server.token).await),
None => return Err(generic_error(Status::NotFound, string!("Server was not found"))),
};
HTTP_COUNTER.inc();
timer.observe_duration();
match client.post(fmtstr!("{address}/process/{id}/action")).json(&body.0).headers(headers).send().await {
Ok(data) => {
if data.status() != 200 {
let err = data.json::<ErrorMessage>().await.unwrap();
Err(generic_error(err.code, err.message))
} else {
Ok(Json(data.json::<ActionResponse>().await.unwrap()))
}
}
Err(err) => Err(generic_error(Status::InternalServerError, err.to_string())),
}
} else {
Err(generic_error(Status::BadRequest, string!("No servers have been added")))
}
}
#[get("/daemon/dump")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/dump", security((), ("api_key" = [])),
responses(
(status = 200, description = "Dump processes successfully", body = [u8]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn dump_handler(_t: Token) -> Vec<u8> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
HTTP_COUNTER.inc();
timer.observe_duration();
dump::raw()
}
#[get("/daemon/config")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/config", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon config successfully", body = ConfigBody),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn config_handler(_t: Token) -> Json<ConfigBody> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["dump"]).start_timer();
let config = config::read().runner;
HTTP_COUNTER.inc();
timer.observe_duration();
Json(ConfigBody {
shell: config.shell,
args: config.args,
log_path: config.log_path,
})
}
#[get("/list")]
#[utoipa::path(get, path = "/list", tag = "Process", security((), ("api_key" = [])),
responses(
(status = 200, description = "List processes successfully", body = [ProcessItem]),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn list_handler(_t: Token) -> Json<Vec<ProcessItem>> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer();
let data = Runner::new().fetch();
HTTP_COUNTER.inc();
timer.observe_duration();
Json(data)
}
#[get("/process/<id>/logs/<kind>")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}",
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(status = 200, description = "Process logs of {type} fetched", body = LogResponse),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn logs_handler(id: usize, kind: String, _t: Token) -> Result<Json<LogResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => item.logs().out,
"error" | "stderr" => item.logs().error,
_ => item.logs().out,
};
match File::open(log_file) {
Ok(data) => {
let reader = BufReader::new(data);
let logs: Vec<String> = reader.lines().collect::<io::Result<_>>().unwrap();
timer.observe_duration();
Ok(Json(LogResponse { logs }))
}
Err(_) => Ok(Json(LogResponse { logs: vec![] })),
}
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/logs/<kind>/raw")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw",
security((), ("api_key" = [])),
params(
("id" = usize, Path, description = "Process id to get logs for", example = 0),
("kind" = String, Path, description = "Log output type", example = "out")
),
responses(
(
description = "Process logs of {type} fetched raw", body = String, status = 200,
example = json!("# PATH path/of/file.log\nserver started on port 3000")
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn logs_raw_handler(id: usize, kind: String, _t: Token) -> Result<String, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
let log_file = match kind.as_str() {
"out" | "stdout" => item.logs().out,
"error" | "stderr" => item.logs().error,
_ => item.logs().out,
};
let data = match fs::read_to_string(&log_file) {
Ok(data) => format!("# PATH {log_file}\n{data}"),
Err(err) => err.to_string(),
};
timer.observe_duration();
Ok(data)
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/info")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/info", security((), ("api_key" = [])),
params(("id" = usize, Path, description = "Process id to get information for", example = 0)),
responses(
(status = 200, description = "Current process info retrieved", body = ItemSingle),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn info_handler(id: usize, _t: Token) -> Result<Json<ItemSingle>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer();
let runner = Runner::new();
if runner.exists(id) {
let item = runner.get(id);
HTTP_COUNTER.inc();
timer.observe_duration();
Ok(Json(item.fetch()))
} else {
Err(not_found("Process was not found"))
}
}
#[post("/process/create", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/create", request_body(content = CreateBody),
security((), ("api_key" = [])),
responses(
(
description = "Create process successful", body = ActionResponse,
example = json!({"action": "create", "done": true }), status = 200,
),
(status = INTERNAL_SERVER_ERROR, description = "Failed to create process", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn create_handler(body: Json<CreateBody>, _t: Token) -> Result<Json<ActionResponse>, ()> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["create"]).start_timer();
let mut runner = Runner::new();
HTTP_COUNTER.inc();
let name = match &body.name {
Some(name) => string!(name),
None => string!(body.script.split_whitespace().next().unwrap_or_default()),
};
runner.start(&name, &body.script, body.path.clone(), &body.watch).save();
timer.observe_duration();
Ok(Json(attempt(true, "create")))
}
#[post("/process/<id>/rename", format = "text", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/rename",
security((), ("api_key" = [])),
request_body(content = String, example = json!("example_name")),
params(("id" = usize, Path, description = "Process id to rename", example = 0)),
responses(
(
description = "Rename process successful", body = ActionResponse,
example = json!({"action": "rename", "done": true }), status = 200,
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn rename_handler(id: usize, body: String, _t: Token) -> Result<Json<ActionResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer();
let runner = Runner::new();
match runner.clone().info(id) {
Some(process) => {
HTTP_COUNTER.inc();
let mut item = runner.get(id);
item.rename(body.trim().replace("\n", ""));
then!(process.running, item.restart());
timer.observe_duration();
Ok(Json(attempt(true, "rename")))
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[get("/process/<id>/env")]
#[utoipa::path(get, tag = "Process", path = "/process/{id}/env",
params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)),
responses(
(
description = "Current process env", body = HashMap<String, String>,
example = json!({"ENV_TEST_VALUE": "example_value"}), status = 200
),
(status = NOT_FOUND, description = "Process was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn env_handler(id: usize, _t: Token) -> Result<EnvList, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer();
HTTP_COUNTER.inc();
match Runner::new().info(id) {
Some(item) => {
timer.observe_duration();
Ok(Json(item.clone().env))
}
None => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
}
#[post("/process/<id>/action", format = "json", data = "<body>")]
#[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody,
security((), ("api_key" = [])),
params(("id" = usize, Path, description = "Process id to run action on", example = 0)),
responses(
(status = 200, description = "Run action on process successful", body = ActionResponse),
(status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn action_handler(id: usize, body: Json<ActionBody>, _t: Token) -> Result<Json<ActionResponse>, NotFound> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer();
let mut runner = Runner::new();
let method = body.method.as_str();
if runner.exists(id) {
HTTP_COUNTER.inc();
match method {
"start" | "restart" => {
runner.restart(id, false);
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"stop" | "kill" => {
runner.get(id).stop();
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
"remove" | "delete" => {
runner.remove(id);
timer.observe_duration();
Ok(Json(attempt(true, method)))
}
_ => {
timer.observe_duration();
Err(not_found("Process was not found"))
}
}
} else {
Err(not_found("Process was not found"))
}
}
#[get("/daemon/metrics")]
#[utoipa::path(get, tag = "Daemon", path = "/daemon/metrics", security((), ("api_key" = [])),
responses(
(status = 200, description = "Get daemon metrics", body = MetricsRoot),
(
status = UNAUTHORIZED, description = "Authentication failed or not provided", body = ErrorMessage,
example = json!({"code": 401, "message": "Unauthorized"})
)
)
)]
pub async fn metrics_handler(_t: Token) -> Json<MetricsRoot> {
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer();
let mut pid: Option<i32> = None;
let mut cpu_percent: Option<f32> = None;
let mut uptime: Option<DateTime<Utc>> = None;
let mut memory_usage: Option<MemoryInfo> = None;
let mut runner: Runner = file::read_object(global!("pmc.dump"));
HTTP_COUNTER.inc();
if pid::exists() {
if let Ok(process_id) = pid::read() {
if let Ok(mut process) = Process::new(process_id as u32) {
pid = Some(process_id);
uptime = Some(pid::uptime().unwrap());
memory_usage = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
}
}
}
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss()),
None => string!("0b"),
};
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0%"),
};
let uptime = match uptime {
Some(uptime) => helpers::format_duration(uptime),
None => string!("none"),
};
timer.observe_duration();
Json(MetricsRoot {
version: Version {
pkg: format!("v{}", env!("CARGO_PKG_VERSION")),
hash: env!("GIT_HASH_FULL"),
build_date: env!("BUILD_DATE"),
target: env!("PROFILE"),
},
daemon: Daemon {
pid,
uptime,
running: pid::exists(),
process_count: runner.count(),
daemon_type: global!("pmc.daemon.kind"),
stats: Stats { memory_usage, cpu_percent },
},
})
}
diff --git a/src/daemon/fork.rs b/src/daemon/fork.rs
index 73c0504..6f8941b 100644
--- a/src/daemon/fork.rs
+++ b/src/daemon/fork.rs
@@ -1,62 +1,61 @@
use global_placeholders::global;
use std::{ffi::CString, process::exit};
pub enum Fork {
Parent(libc::pid_t),
Child,
}
pub fn chdir() -> Result<libc::c_int, i32> {
let dir = CString::new(global!("pmc.base")).expect("CString::new failed");
let res = unsafe { libc::chdir(dir.as_ptr()) };
match res {
-1 => Err(-1),
res => Ok(res),
}
}
pub fn fork() -> Result<Fork, i32> {
let res = unsafe { libc::fork() };
match res {
-1 => Err(-1),
0 => Ok(Fork::Child),
res => Ok(Fork::Parent(res)),
}
}
pub fn setsid() -> Result<libc::pid_t, i32> {
let res = unsafe { libc::setsid() };
match res {
-1 => Err(-1),
res => Ok(res),
}
}
-pub fn close_fd() -> Result<(), i32> {
- match unsafe { libc::close(0) } {
- -1 => Err(-1),
- _ => match unsafe { libc::close(1) } {
- -1 => Err(-1),
- _ => match unsafe { libc::close(2) } {
- -1 => Err(-1),
- _ => Ok(()),
- },
- },
+pub fn close_fd() -> Result<i32, i32> {
+ let mut res = false;
+ for i in 0..=2 {
+ res |= unsafe { libc::close(i) } == -1;
}
+
+ return match res {
+ true => Err(-1),
+ false => Ok(1),
+ };
}
pub fn daemon(nochdir: bool, noclose: bool) -> Result<Fork, i32> {
match fork() {
Ok(Fork::Parent(_)) => exit(0),
Ok(Fork::Child) => setsid().and_then(|_| {
if !nochdir {
chdir()?;
}
if !noclose {
close_fd()?;
}
fork()
}),
Err(n) => Err(n),
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 93b56ca..f7b8000 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,45 +1,45 @@
pub mod config;
pub mod file;
pub mod helpers;
pub mod log;
pub mod process;
#[repr(transparent)]
pub struct Callback(pub extern "C" fn());
unsafe impl cxx::ExternType for Callback {
type Id = cxx::type_id!("Callback");
type Kind = cxx::kind::Trivial;
}
#[cxx::bridge]
pub mod service {
-
#[repr(u8)]
enum Fork {
Parent,
Child,
}
pub struct ProcessMetadata {
pub name: String,
pub shell: String,
pub command: String,
pub log_path: String,
pub args: Vec<String>,
+ pub env: Vec<String>,
}
unsafe extern "C++" {
include!("pmc/lib/include/process.h");
include!("pmc/lib/include/bridge.h");
include!("pmc/lib/include/fork.h");
type Callback = crate::Callback;
pub fn stop(pid: i64) -> i64;
pub fn set_program_name(name: String);
pub fn get_child_pid(parentPID: i64) -> i64;
pub fn run(metadata: ProcessMetadata) -> i64;
pub fn find_chidren(parentPID: i64) -> Vec<i64>;
pub fn try_fork(nochdir: bool, noclose: bool, callback: Callback) -> i32;
}
}
diff --git a/src/process/mod.rs b/src/process/mod.rs
index d1365ab..ca9bc9c 100644
--- a/src/process/mod.rs
+++ b/src/process/mod.rs
@@ -1,561 +1,566 @@
+mod unix;
+
use crate::{
config,
config::structs::Server,
file, helpers,
service::{run, stop, ProcessMetadata},
};
use std::{
env,
path::PathBuf,
sync::{Arc, Mutex},
};
use nix::{
sys::signal::{kill, Signal},
unistd::Pid,
};
use chrono::serde::ts_milliseconds;
use chrono::{DateTime, Utc};
use global_placeholders::global;
use macros_rs::{crashln, string, ternary, then};
use psutil::process;
use serde::{Deserialize, Serialize};
-use std::collections::{BTreeMap, HashMap};
+use std::collections::BTreeMap;
use utoipa::ToSchema;
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ItemSingle {
pub info: Info,
pub stats: Stats,
pub watch: Watch,
pub log: Log,
pub raw: Raw,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Info {
pub id: usize,
pub pid: i64,
pub name: String,
pub status: String,
#[schema(value_type = String, example = "/path")]
pub path: PathBuf,
pub uptime: String,
pub command: String,
pub children: Vec<i64>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Stats {
pub restarts: u64,
pub start_time: i64,
pub cpu_percent: Option<f32>,
pub memory_usage: Option<MemoryInfo>,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct MemoryInfo {
pub rss: u64,
pub vms: u64,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Log {
pub out: String,
pub error: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct Raw {
pub running: bool,
pub crashed: bool,
pub crashes: u64,
}
#[derive(Clone)]
pub struct LogInfo {
pub out: String,
pub error: String,
}
#[derive(Serialize, Deserialize, ToSchema)]
pub struct ProcessItem {
pid: i64,
id: usize,
cpu: String,
mem: String,
name: String,
restarts: u64,
status: String,
uptime: String,
#[schema(example = "/path")]
watch_path: String,
#[schema(value_type = String, example = "2000-01-01T01:00:00.000Z")]
start_time: DateTime<Utc>,
}
#[derive(Clone)]
pub struct ProcessWrapper {
pub id: usize,
pub runner: Arc<Mutex<Runner>>,
}
-type Env = HashMap<String, String>;
+type Env = BTreeMap<String, String>;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Process {
pub id: usize,
pub pid: i64,
pub env: Env,
pub name: String,
pub path: PathBuf,
pub script: String,
pub restarts: u64,
pub running: bool,
pub crash: Crash,
pub watch: Watch,
pub children: Vec<i64>,
#[serde(with = "ts_milliseconds")]
pub started: DateTime<Utc>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Crash {
pub crashed: bool,
pub value: u64,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct Watch {
pub enabled: bool,
#[schema(example = "/path")]
pub path: String,
pub hash: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Runner {
pub id: id::Id,
#[serde(skip)]
pub remote: Option<Remote>,
pub list: BTreeMap<usize, Process>,
}
#[derive(Clone, Debug)]
pub struct Remote {
address: String,
token: Option<String>,
pub config: RemoteConfig,
}
#[derive(Clone, Debug, Deserialize)]
pub struct RemoteConfig {
pub shell: String,
pub args: Vec<String>,
pub log_path: String,
}
pub enum Status {
Offline,
Running,
}
impl Status {
pub fn to_bool(&self) -> bool {
match self {
Status::Offline => false,
Status::Running => true,
}
}
}
macro_rules! lock {
($runner:expr) => {{
match $runner.lock() {
Ok(runner) => runner,
Err(err) => crashln!("Unable to lock mutex: {err}"),
}
}};
}
fn kill_children(children: Vec<i64>) {
for pid in children {
if let Err(err) = kill(Pid::from_raw(pid as i32), Signal::SIGTERM) {
log::error!("Failed to stop pid {pid}: {err:?}");
};
}
}
impl Runner {
pub fn new() -> Self { dump::read() }
pub fn connect(name: String, Server { address, token }: Server, verbose: bool) -> Option<Self> {
let remote_config = match config::from(&address, token.as_deref()) {
Ok(config) => config,
Err(err) => {
log::error!("{err}");
return None;
}
};
if let Ok(dump) = dump::from(&address, token.as_deref()) {
then!(verbose, println!("{} Fetched remote (name={name}, address={address})", *helpers::SUCCESS));
Some(Runner {
remote: Some(Remote {
token,
address: string!(address),
config: remote_config,
}),
..dump
})
} else {
None
}
}
pub fn start(&mut self, name: &String, command: &String, path: PathBuf, watch: &Option<String>) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::create(remote, name, command, path, watch) {
crashln!("{} Failed to start create {name}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let id = self.id.next();
let config = config::read().runner;
let crash = Crash { crashed: false, value: 0 };
let watch = match watch {
Some(watch) => Watch {
enabled: true,
path: string!(watch),
hash: hash::create(file::cwd().join(watch)),
},
None => Watch {
enabled: false,
path: string!(""),
hash: string!(""),
},
};
let pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
command: command.clone(),
log_path: config.log_path,
+ env: unix::env(),
});
self.list.insert(
id,
Process {
id,
pid,
path,
watch,
crash,
restarts: 0,
running: true,
children: vec![],
name: name.clone(),
started: Utc::now(),
script: command.clone(),
env: env::vars().collect(),
},
);
}
return self;
}
pub fn restart(&mut self, id: usize, dead: bool) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::restart(remote, id) {
crashln!("{} Failed to start process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let process = self.process(id);
let config = config::read().runner;
let Process { path, script, name, .. } = process.clone();
kill_children(process.children.clone());
stop(process.pid);
if let Err(err) = std::env::set_current_dir(&path) {
crashln!("{} Failed to set working directory {:?}\nError: {:#?}", *helpers::FAIL, path, err);
};
process.pid = run(ProcessMetadata {
args: config.args,
name: name.clone(),
shell: config.shell,
log_path: config.log_path,
command: script.to_string(),
+ env: unix::env(),
});
process.running = true;
process.children = vec![];
process.started = Utc::now();
process.crash.crashed = false;
+ process.env = env::vars().collect();
then!(dead, process.restarts += 1);
then!(dead, process.crash.value += 1);
then!(!dead, process.crash.value = 0);
}
return self;
}
pub fn remove(&mut self, id: usize) {
if let Some(remote) = &self.remote {
if let Err(err) = http::remove(remote, id) {
crashln!("{} Failed to stop remove {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.stop(id);
self.list.remove(&id);
dump::write(&self);
}
}
pub fn set_id(&mut self, id: id::Id) {
self.id = id;
self.id.next();
dump::write(&self);
}
pub fn set_status(&mut self, id: usize, status: Status) {
self.process(id).running = status.to_bool();
dump::write(&self);
}
pub fn items(&self) -> BTreeMap<usize, Process> { self.list.clone() }
pub fn items_mut(&mut self) -> &mut BTreeMap<usize, Process> { &mut self.list }
pub fn save(&self) { then!(self.remote.is_none(), dump::write(&self)) }
pub fn count(&mut self) -> usize { self.list().count() }
pub fn is_empty(&self) -> bool { self.list.is_empty() }
pub fn exists(&self, id: usize) -> bool { self.list.contains_key(&id) }
pub fn info(&self, id: usize) -> Option<&Process> { self.list.get(&id) }
pub fn list<'l>(&'l mut self) -> impl Iterator<Item = (&'l usize, &'l mut Process)> { self.list.iter_mut().map(|(k, v)| (k, v)) }
pub fn process(&mut self, id: usize) -> &mut Process { self.list.get_mut(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)) }
pub fn pid(&self, id: usize) -> i64 { self.list.get(&id).unwrap_or_else(|| crashln!("{} Process ({id}) not found", *helpers::FAIL)).pid }
pub fn get(self, id: usize) -> ProcessWrapper {
ProcessWrapper {
id,
runner: Arc::new(Mutex::new(self)),
}
}
pub fn set_crashed(&mut self, id: usize) -> &mut Self {
self.process(id).crash.crashed = true;
return self;
}
pub fn set_children(&mut self, id: usize, children: Vec<i64>) -> &mut Self {
self.process(id).children = children;
return self;
}
pub fn new_crash(&mut self, id: usize) -> &mut Self {
self.process(id).crash.value += 1;
return self;
}
pub fn stop(&mut self, id: usize) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::stop(remote, id) {
crashln!("{} Failed to stop process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
let process = self.process(id);
kill_children(process.children.clone());
stop(process.pid);
process.running = false;
process.crash.crashed = false;
process.crash.value = 0;
process.children = vec![];
}
return self;
}
pub fn rename(&mut self, id: usize, name: String) -> &mut Self {
if let Some(remote) = &self.remote {
if let Err(err) = http::rename(remote, id, name) {
crashln!("{} Failed to rename process {id}\nError: {:#?}", *helpers::FAIL, err);
};
} else {
self.process(id).name = name;
}
return self;
}
pub fn watch(&mut self, id: usize, path: &str, enabled: bool) -> &mut Self {
let process = self.process(id);
process.watch = Watch {
enabled,
path: string!(path),
hash: ternary!(enabled, hash::create(process.path.join(path)), string!("")),
};
return self;
}
pub fn fetch(&self) -> Vec<ProcessItem> {
let mut processes: Vec<ProcessItem> = Vec::new();
for (id, item) in self.items() {
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
let mem_info_psutil = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
memory_usage = Some(MemoryInfo {
rss: mem_info_psutil.as_ref().unwrap().rss(),
vms: mem_info_psutil.as_ref().unwrap().vms(),
});
}
let cpu_percent = match cpu_percent {
Some(percent) => format!("{:.2}%", percent),
None => string!("0.00%"),
};
let memory_usage = match memory_usage {
Some(usage) => helpers::format_memory(usage.rss),
None => string!("0b"),
};
let status = if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
processes.push(ProcessItem {
id,
status,
pid: item.pid,
cpu: cpu_percent,
mem: memory_usage,
restarts: item.restarts,
name: item.name.clone(),
start_time: item.started,
watch_path: item.watch.path.clone(),
uptime: helpers::format_duration(item.started),
});
}
return processes;
}
}
impl Process {
/// Get a log paths of the process item
pub fn logs(&self) -> LogInfo {
let name = self.name.replace(" ", "_");
LogInfo {
out: global!("pmc.logs.out", name.as_str()),
error: global!("pmc.logs.error", name.as_str()),
}
}
}
impl ProcessWrapper {
/// Stop the process item
pub fn stop(&mut self) { lock!(self.runner).stop(self.id).save(); }
/// Restart the process item
pub fn restart(&mut self) { lock!(self.runner).restart(self.id, false).save(); }
/// Rename the process item
pub fn rename(&mut self, name: String) { lock!(self.runner).rename(self.id, name).save(); }
/// Enable watching a path on the process item
pub fn watch(&mut self, path: &str) { lock!(self.runner).watch(self.id, path, true).save(); }
/// Disable watching on the process item
pub fn disable_watch(&mut self) { lock!(self.runner).watch(self.id, "", false).save(); }
/// Set the process item as crashed
pub fn crashed(&mut self) { lock!(self.runner).restart(self.id, true).save(); }
/// Get a json dump of the process item
pub fn fetch(&self) -> ItemSingle {
let mut runner = lock!(self.runner);
let item = runner.process(self.id);
let config = config::read().runner;
let mut memory_usage: Option<MemoryInfo> = None;
let mut cpu_percent: Option<f32> = None;
if let Ok(mut process) = process::Process::new(item.pid as u32) {
let mem_info_psutil = process.memory_info().ok();
cpu_percent = process.cpu_percent().ok();
memory_usage = Some(MemoryInfo {
rss: mem_info_psutil.as_ref().unwrap().rss(),
vms: mem_info_psutil.as_ref().unwrap().vms(),
});
}
let status = if item.running {
string!("online")
} else {
match item.crash.crashed {
true => string!("crashed"),
false => string!("stopped"),
}
};
ItemSingle {
info: Info {
status,
id: item.id,
pid: item.pid,
name: item.name.clone(),
path: item.path.clone(),
children: item.children.clone(),
uptime: helpers::format_duration(item.started),
command: format!("{} {} '{}'", config.shell, config.args.join(" "), item.script.clone()),
},
stats: Stats {
cpu_percent,
memory_usage,
restarts: item.restarts,
start_time: item.started.timestamp_millis(),
},
watch: Watch {
enabled: item.watch.enabled,
hash: item.watch.hash.clone(),
path: item.watch.path.clone(),
},
log: Log {
out: item.logs().out,
error: item.logs().error,
},
raw: Raw {
running: item.running,
crashed: item.crash.crashed,
crashes: item.crash.value,
},
}
}
}
pub mod dump;
pub mod hash;
pub mod http;
pub mod id;
diff --git a/src/process/unix.rs b/src/process/unix.rs
new file mode 100644
index 0000000..26594b0
--- /dev/null
+++ b/src/process/unix.rs
@@ -0,0 +1,47 @@
+use std::ffi::{CStr, OsString};
+use std::os::unix::prelude::OsStringExt;
+
+pub struct Vars {
+ inner: std::vec::IntoIter<OsString>,
+}
+impl Iterator for Vars {
+ type Item = String;
+ fn next(&mut self) -> Option<String> { self.inner.next().map(|var| var.into_string().unwrap()) }
+ fn size_hint(&self) -> (usize, Option<usize>) { self.inner.size_hint() }
+}
+
+#[cfg(target_os = "macos")]
+unsafe fn environ() -> *mut *const *const libc::c_char { libc::_NSGetEnviron() as *mut *const *const libc::c_char }
+
+#[cfg(not(target_os = "macos"))]
+unsafe fn environ() -> *mut *const *const libc::c_char {
+ extern "C" {
+ static mut environ: *const *const libc::c_char;
+ }
+ ptr::addr_of_mut!(environ)
+}
+
+pub fn env() -> Vec<String> {
+ unsafe {
+ let mut environ = *environ();
+ let mut result = Vec::new();
+
+ if !environ.is_null() {
+ while !(*environ).is_null() {
+ if let Some(key_value) = parse(CStr::from_ptr(*environ).to_bytes()) {
+ result.push(key_value);
+ }
+ environ = environ.add(1);
+ }
+ }
+
+ return Vars { inner: result.into_iter() }.collect();
+ }
+
+ fn parse(input: &[u8]) -> Option<OsString> {
+ if input.is_empty() {
+ return None;
+ }
+ Some(OsString::from_vec(input.to_vec()))
+ }
+}
diff --git a/src/webui/src/components/react/view.tsx b/src/webui/src/components/react/view.tsx
index 8492230..fcbc56d 100644
--- a/src/webui/src/components/react/view.tsx
+++ b/src/webui/src/components/react/view.tsx
@@ -1,317 +1,318 @@
import { api } from '@/api';
import { matchSorter } from 'match-sorter';
import Rename from '@/components/react/rename';
import { Menu, Transition } from '@headlessui/react';
import { useEffect, useState, useRef, Fragment } from 'react';
import { EllipsisVerticalIcon } from '@heroicons/react/20/solid';
const classNames = (...classes: Array<any>) => classes.filter(Boolean).join(' ');
const formatMemory = (bytes: number): [number, string] => {
const units = ['b', 'kb', 'mb', 'gb'];
let size = bytes;
let unitIndex = 0;
+
while (size > 1024 && unitIndex < units.length - 1) {
size /= 1024;
unitIndex++;
}
return [+size.toFixed(1), units[unitIndex]];
};
const startDuration = (input: string): [number, string] => {
const matches = input.match(/(\d+)([dhms])/);
if (matches) {
const value = parseInt(matches[1], 10);
const unit = matches[2];
return [value, unit];
}
return null;
};
const LogRow = ({ match, children }: any) => {
const _match = match.toLowerCase();
const chunks = match.length ? children.split(new RegExp('(' + match + ')', 'ig')) : [children];
return (
<div>
{chunks.map((chunk: any, index: number) =>
chunk.toLowerCase() === _match ? (
<span key={index} className="bg-yellow-400 text-black">
{chunk}
</span>
) : (
<span key={index} className=" text-zinc-200">
{chunk}
</span>
)
)}
</div>
);
};
const LogViewer = (props: { server: string | null; base: string; id: number }) => {
const [logs, setLogs] = useState<string[]>([]);
const [loaded, setLoaded] = useState(false);
const lastRow = useRef<HTMLDivElement | null>(null);
const [searchQuery, setSearchQuery] = useState('');
const [searchOpen, setSearchOpen] = useState(false);
const [componentHeight, setComponentHeight] = useState(0);
const filtered = (!searchQuery && logs) || matchSorter(logs, searchQuery);
useEffect(() => {
const updateComponentHeight = () => {
const windowHeight = window.innerHeight;
const newHeight = (windowHeight * 4) / 6;
setComponentHeight(newHeight);
};
updateComponentHeight();
window.addEventListener('resize', updateComponentHeight);
return () => {
window.removeEventListener('resize', updateComponentHeight);
};
}, []);
const componentStyle = {
height: componentHeight + 'px',
};
useEffect(() => {
const handleKeydown = (event: any) => {
if ((event.ctrlKey || event.metaKey) && event.key === 'f') {
setSearchOpen(true);
event.preventDefault();
}
};
const handleKeyup = (event: any) => {
if (event.key === 'Escape') {
setSearchQuery('');
setSearchOpen(false);
}
};
const handleClick = () => {
setSearchQuery('');
setSearchOpen(false);
};
window.addEventListener('click', handleClick);
window.addEventListener('keydown', handleKeydown);
window.addEventListener('keyup', handleKeyup);
return () => {
window.removeEventListener('click', handleClick);
window.removeEventListener('keydown', handleKeydown);
window.removeEventListener('keyup', handleKeyup);
};
}, [searchOpen]);
const loadLogs = () => {
api
.get(`${props.base}/process/${props.id}/logs/out`)
.json()
.then((data) => setLogs(data.logs))
.finally(() => setLoaded(true));
};
const loadLogsRemote = () => {
api
.get(`${props.base}/remote/${props.server}/logs/${props.id}/out`)
.json()
.then((data) => setLogs(data.logs))
.finally(() => setLoaded(true));
};
useEffect(() => (props.server != null ? loadLogsRemote() : loadLogs()), []);
useEffect(() => lastRow.current?.scrollIntoView(), [loaded]);
if (!loaded) {
return <div className="text-lg text-white font-bold">loading...</div>;
} else {
return (
<div>
{searchOpen && (
<div className="z-50 fixed top-[16.5rem] right-5 w-96 flex bg-zinc-800/50 backdrop-blur-md px-3 py-1 rounded-lg border border-zinc-700 shadow">
<input
className="grow bg-transparent p-2 border-0 text-white focus:ring-0 sm:text-sm"
autoFocus
placeholder="Filter logs..."
value={searchQuery}
onChange={(e) => setSearchQuery(e.target.value)}
/>
<span className="grow-0 text-zinc-400 font-medium mt-1.5">{searchQuery && filtered.length + ' matches'}</span>
</div>
)}
<div className="p-5 pb-0 break-words overflow-y-scroll font-mono" style={componentStyle}>
{filtered.map((log, index) => (
<LogRow key={index} match={searchQuery}>
{log}
</LogRow>
))}
<div ref={lastRow} />
</div>
</div>
);
}
};
const View = (props: { id: string; base: string }) => {
const [item, setItem] = useState<any>();
const [loaded, setLoaded] = useState(false);
const server = new URLSearchParams(window.location.search).get('server');
const badge = {
online: 'bg-emerald-400/10 text-emerald-400',
stopped: 'bg-red-500/10 text-red-500',
crashed: 'bg-amber-400/10 text-amber-400',
};
const fetch = () => {
api
.get(`${props.base}/process/${props.id}/info`)
.json()
.then((res) => setItem(res))
.finally(() => setLoaded(true));
};
const fetchRemote = () => {
api
.get(`${props.base}/remote/${server}/info/${props.id}`)
.json()
.then((res) => setItem(res))
.finally(() => setLoaded(true));
};
const isRunning = (status: string): bool => (status == 'stopped' ? false : status == 'crashed' ? false : true);
const action = (id: number, name: string) => api.post(`${props.base}/process/${id}/action`, { json: { method: name } }).then(() => fetch());
useEffect(() => (server != null ? fetchRemote() : fetch()), []);
if (!loaded) {
return <div className="text-lg text-white font-bold">loading...</div>;
} else {
const online = isRunning(item.info.status);
const [uptime, upunit] = startDuration(item.info.uptime);
const [memory, memunit] = formatMemory(online ? item.stats.memory_usage.rss : 0);
const stats = [
{ name: 'Status', value: item.info.status },
{ name: 'Uptime', value: online ? uptime : 'none', unit: online ? upunit : '' },
{ name: 'Memory', value: online ? memory : 'offline', unit: online ? memunit : '' },
{ name: 'CPU', value: online ? item.stats.cpu_percent : 'offline', unit: online ? '%' : '' },
];
return (
<Fragment>
<div className="flex flex-col items-start justify-between gap-x-8 gap-y-4 bg-zinc-700/10 px-4 py-4 sm:flex-row sm:items-center sm:px-6 lg:px-8">
<div>
<div className="flex items-center gap-x-3">
<h1 className="flex gap-x-1 text-base leading-7">
<span className="font-semibold text-white cursor-default">{server != null ? `${server}/${item.info.name}` : item.info.name}</span>
</h1>
<div className={`flex-none rounded-full p-1 ${badge[item.info.status]}`}>
<div className="h-2 w-2 rounded-full bg-current" />
</div>
{online && (
<div className="order-first flex-none rounded-full bg-sky-400/10 px-2 py-0.5 text-xs font-medium text-sky-400 ring-1 ring-inset ring-sky-400/30 sm:order-none">
{item.info.pid}
</div>
)}
</div>
<p className="text-xs leading-6 text-zinc-400">{item.info.command}</p>
</div>
<div className="mt-5 flex lg:ml-4 lg:mt-0">
<span>
<button
type="button"
onClick={() => action(props.id, 'restart')}
className="disabled:opacity-50 transition inline-flex items-center justify-center space-x-1.5 border focus:outline-none focus:ring-0 focus:ring-offset-0 focus:z-10 shrink-0 saturate-[110%] border-zinc-700 hover:border-zinc-600 bg-zinc-800 text-zinc-50 hover:bg-zinc-700 px-4 py-2 text-sm font-semibold rounded-lg">
{online ? 'Restart' : 'Start'}
</button>
</span>
<span className="ml-3">
<Menu as="div" className="relative inline-block text-left">
<div>
<Menu.Button className="transition inline-flex items-center justify-center space-x-1.5 border focus:outline-none focus:ring-0 focus:ring-offset-0 focus:z-10 shrink-0 border-zinc-700 bg-transparent hover:bg-zinc-800 p-2 text-sm font-semibold rounded-lg">
<EllipsisVerticalIcon className="h-5 w-5 text-zinc-50" aria-hidden="true" />
</Menu.Button>
</div>
<Transition
as={Fragment}
enter="transition ease-out duration-100"
enterFrom="transform opacity-0 scale-95"
enterTo="transform opacity-100 scale-100"
leave="transition ease-in duration-75"
leaveFrom="transform opacity-100 scale-100"
leaveTo="transform opacity-0 scale-95">
<Menu.Items className="absolute right-0 z-10 mt-2 w-48 origin-top-right rounded-lg bg-zinc-900/80 backdrop-blur-md border border-zinc-800 shadow-lg ring-1 ring-black ring-opacity-5 focus:outline-none text-base divide-y divide-zinc-800/50">
<div className="p-1.5">
<Menu.Item>
{({ active }) => (
<a
onClick={() => action(props.id, 'stop')}
className={classNames(
active ? 'bg-yellow-400/10 text-amber-500' : 'text-zinc-200',
'rounded-md block p-2 w-full text-left cursor-pointer'
)}>
Terminate
</a>
)}
</Menu.Item>
<Menu.Item>
{({ active }) => <Rename base={props.base} process={props.id} active={active} callback={fetch} old={item.info.name} />}
</Menu.Item>
</div>
<div className="p-1.5">
<Menu.Item>
{({ active }) => (
<a
onClick={() => action(props.id, 'delete')}
className={classNames(
active ? 'bg-red-700/10 text-red-500' : 'text-red-400',
'rounded-md block p-2 w-full text-left cursor-pointer'
)}>
Delete
</a>
)}
</Menu.Item>
</div>
</Menu.Items>
</Transition>
</Menu>
</span>
</div>
</div>
<div className="grid grid-cols-1 bg-zinc-700/10 sm:grid-cols-2 lg:grid-cols-4">
{stats.map((stat: any, index: number) => (
<div
key={stat.name}
className={classNames(
index % 2 === 1 ? 'sm:border-l' : index === 2 ? 'lg:border-l' : '',
'border-t border-white/5 py-6 px-4 sm:px-6 lg:px-8'
)}>
<p className="text-sm font-medium leading-6 text-zinc-400">{stat.name}</p>
<p className="mt-2 flex items-baseline gap-x-2">
<span className="text-4xl font-semibold tracking-tight text-white">{stat.value}</span>
{stat.unit ? <span className="text-sm text-zinc-400">{stat.unit}</span> : null}
</p>
</div>
))}
</div>
<LogViewer server={server} id={parseInt(props.id)} base={props.base} />
</Fragment>
);
}
};
export default View;
diff --git a/src/webui/src/store.ts b/src/webui/src/store.ts
index 9b5b7d8..a389be5 100644
--- a/src/webui/src/store.ts
+++ b/src/webui/src/store.ts
@@ -1,8 +1,7 @@
import { persistentMap } from '@nanostores/persistent';
export interface SettingsStore {
token?: string;
- servers?: string;
}
export const $settings = persistentMap<SettingsStore>('settings:', {});

File Metadata

Mime Type
text/x-diff
Expires
Sun, Feb 1, 11:03 AM (6 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
494665
Default Alt Text
(76 KB)

Event Timeline