diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 56532c4..f3849b8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -115,7 +115,10 @@ jobs: - name: Setup php-fpm for Linux if: matrix.os == 'ubuntu-24.04' run: | - sudo apt-get update + sudo apt-get update + sudo apt-get install -y software-properties-common + sudo add-apt-repository -y ppa:ondrej/php + sudo apt-get update sudo apt-get install -y php${{ matrix.flag.php_version }}-fpm sudo ln -sf /usr/sbin/php-fpm${{ matrix.flag.php_version }} /usr/sbin/php-fpm diff --git a/Cargo.toml b/Cargo.toml index 87cf48d..b5a6c0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ members = [ ] [workspace.package] -version = "1.1.0" +version = "1.2.0" authors = ["Apache Software Foundation", "jmjoy ", "Yanlong He "] edition = "2024" rust-version = "1.85" diff --git a/README.md b/README.md index a256adb..60c8415 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,8 @@ Sky Walking logo -**SkyWalking PHP** The PHP Agent for Apache SkyWalking, which provides the native tracing abilities for PHP project. +**SkyWalking PHP** The PHP Agent for Apache SkyWalking, which provides native tracing and PHP Health Metrics (PHM) +runtime reporting for PHP projects. **SkyWalking** an APM(application performance monitor) system, especially designed for microservices, cloud native and container-based (Docker, Kubernetes, Mesos) architectures. diff --git a/docs/en/configuration/ini-settings.md b/docs/en/configuration/ini-settings.md index 214677e..0277de1 100644 --- a/docs/en/configuration/ini-settings.md +++ b/docs/en/configuration/ini-settings.md @@ -27,3 +27,5 @@ This is the configuration list supported in `php.ini`. | skywalking_agent.instance_name | Instance name. You can set `${HOSTNAME}`, refer to [Example #1](https://www.php.net/manual/en/install.fpm.configuration.php) | | | skywalking_agent.standalone_socket_path | Unix domain socket file path of standalone skywalking php worker. Only available when `reporter_type` is `standalone`. | | | skywalking_agent.psr_logging_level | The log level reported to SkyWalking, based on PSR-3, one of `Off`, `Debug`, `Info`, Notice`, Warning`, Error`, Critical`, Alert`, Emergency`. | Off | +| skywalking_agent.metrics_enable | Enable PHP Health Metrics (PHM) meter reporting via native MeterReportService. **Linux only** (requires `/proc`). Default **On** on Linux when the agent is active; default **Off** on macOS/Windows. Set to `Off` to disable on Linux. Reports six process meters: CPU utilization, memory used/peak, virtual memory, thread count, and open FD count. See [PHP agent README](../setup/service-agent/php-agent/README.md#php-health-metrics-phm). | On (Linux); Off (other) | +| skywalking_agent.metrics_report_period | PHM meter collection interval in seconds. Process meters are collected periodically by the reporter worker via `/proc`. **Linux only.** | 30 | diff --git a/docs/en/setup/service-agent/php-agent/README.md b/docs/en/setup/service-agent/php-agent/README.md index 5464177..73ee0bb 100644 --- a/docs/en/setup/service-agent/php-agent/README.md +++ b/docs/en/setup/service-agent/php-agent/README.md @@ -113,6 +113,44 @@ Refer to the Configuration section for more configuration items. > Enabling it by default will cause extra meaningless consumption when skywalking agent is not > needed (such as simply executing a php script). +### PHP Health Metrics (PHM) + +> **Platform:** PHM process meters are **Linux only**. The reporter worker reads the parent +> PHP-FPM process via `/proc` (`/proc/{pid}/status`, `stat`, and `fd`). They are not available +> on macOS or Windows. Trace and other agent features are unchanged. + +PHM reports PHP runtime meters through the native Meter protocol (same transport as trace reporting). +Process CPU, memory, virtual memory, thread count, and open file descriptors are collected +periodically by the reporter worker via `/proc`, without requiring HTTP traffic, +similar to Python PVM and Ruby runtime meters. +**PHM is enabled by default on Linux** when the agent is active (`skywalking_agent.enable = On`). +To disable it or tune the interval, use `php.ini`: + +```ini +; Disable PHM if not needed (default is On on Linux). +; skywalking_agent.metrics_enable = Off + +; Report interval in seconds (default 30). +skywalking_agent.metrics_report_period = 30 +``` + +PHM reports six process meters (aligned with OAP `php-runtime.yaml` and Horizon UI widgets): + +| Agent meter name | OAP / UI expression | Source | +| --- | --- | --- | +| `instance_php_process_cpu_utilization` | `meter_instance_php_process_cpu_utilization` | `/proc/{pid}/stat` utime+stime delta | +| `instance_php_memory_used_mb` | `meter_instance_php_memory_used_mb` | `/proc/{pid}/status` VmRSS | +| `instance_php_memory_peak_mb` | `meter_instance_php_memory_peak_mb` | `/proc/{pid}/status` VmHWM | +| `instance_php_virtual_memory_mb` | `meter_instance_php_virtual_memory_mb` | `/proc/{pid}/status` VmSize | +| `instance_php_thread_count` | `meter_instance_php_thread_count` | `/proc/{pid}/status` Threads | +| `instance_php_open_fd_count` | `meter_instance_php_open_fd_count` | `/proc/{pid}/fd` count | + +On the OAP side, activate the `php-runtime` entry in +`agent-analyzer.default.meterAnalyzerActiveFiles`. Horizon UI shows the widgets on the **General +Service → Instance** dashboard when data is available. + +See [INI Settings](../../../configuration/ini-settings.md) for all PHM options. + ## Run Start `php-fpm` server: diff --git a/src/lib.rs b/src/lib.rs index 1c6736f..c813268 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,6 +118,14 @@ const SKYWALKING_AGENT_STANDALONE_SOCKET_PATH: &str = "skywalking_agent.standalo /// `Info`, Notice`, Warning`, Error`, Critical`, Alert`, Emergency`. const SKYWALKING_AGENT_PSR_LOGGING_LEVEL: &str = "skywalking_agent.psr_logging_level"; +/// Whether to report PHP Health Metrics (PHM) via native meter protocol. +/// Default is **On on Linux** when the agent extension is active (`/proc` +/// sampling only); **Off** on other platforms. +const SKYWALKING_AGENT_METRICS_ENABLE: &str = "skywalking_agent.metrics_enable"; + +/// PHM report period in seconds. Meters are sampled at most once per period. +const SKYWALKING_AGENT_METRICS_REPORT_PERIOD: &str = "skywalking_agent.metrics_report_period"; + #[php_get_module] pub fn get_module() -> Module { let mut module = Module::new( @@ -214,6 +222,15 @@ pub fn get_module() -> Module { "".to_string(), Policy::System, ); + #[cfg(target_os = "linux")] + module.add_ini(SKYWALKING_AGENT_METRICS_ENABLE, true, Policy::System); + #[cfg(not(target_os = "linux"))] + module.add_ini(SKYWALKING_AGENT_METRICS_ENABLE, false, Policy::System); + module.add_ini( + SKYWALKING_AGENT_METRICS_REPORT_PERIOD, + 30i64, + Policy::System, + ); // Hooks. module.on_module_init(module::init); diff --git a/src/module.rs b/src/module.rs index c9eea05..b81bc61 100644 --- a/src/module.rs +++ b/src/module.rs @@ -167,6 +167,12 @@ pub static PSR_LOGGING_LEVEL: Lazy = Lazy::new(|| { .into() }); +pub static METRICS_ENABLE: Lazy = + Lazy::new(|| ini_get::(SKYWALKING_AGENT_METRICS_ENABLE)); + +pub static METRICS_REPORT_PERIOD: Lazy = + Lazy::new(|| ini_get::(SKYWALKING_AGENT_METRICS_REPORT_PERIOD)); + pub fn init() { if !is_enable() { return; @@ -193,6 +199,8 @@ pub fn init() { Lazy::force(&KAFKA_PRODUCER_CONFIG); Lazy::force(&INJECT_CONTEXT); Lazy::force(&PSR_LOGGING_LEVEL); + Lazy::force(&METRICS_ENABLE); + Lazy::force(&METRICS_REPORT_PERIOD); if let Err(err) = try_init_logger() { eprintln!("skywalking_agent: initialize logger failed: {}", err); @@ -239,7 +247,11 @@ pub fn init() { reporter.clone(), )); - logger::set_global_logger(Logger::new(&*SERVICE_NAME, &*SERVICE_INSTANCE, reporter)); + logger::set_global_logger(Logger::new( + &*SERVICE_NAME, + &*SERVICE_INSTANCE, + reporter.clone(), + )); // Hook functions. register_execute_functions(); diff --git a/src/worker.rs b/src/worker.rs index c01f7c5..2ef7516 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -14,9 +14,10 @@ // limitations under the License. use crate::module::{ - AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD, PROPERTIES_REPORT_PERIOD_FACTOR, REPORTER_TYPE, - SERVER_ADDR, SERVICE_INSTANCE, SERVICE_NAME, SOCKET_FILE_PATH, SSL_CERT_CHAIN_PATH, - SSL_KEY_PATH, SSL_TRUSTED_CA_PATH, WORKER_THREADS, is_standalone_reporter_type, + AUTHENTICATION, ENABLE_TLS, HEARTBEAT_PERIOD, METRICS_ENABLE, METRICS_REPORT_PERIOD, + PROPERTIES_REPORT_PERIOD_FACTOR, REPORTER_TYPE, SERVER_ADDR, SERVICE_INSTANCE, SERVICE_NAME, + SOCKET_FILE_PATH, SSL_CERT_CHAIN_PATH, SSL_KEY_PATH, SSL_TRUSTED_CA_PATH, WORKER_THREADS, + is_standalone_reporter_type, }; #[cfg(feature = "kafka-reporter")] use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG}; @@ -24,6 +25,7 @@ use crate::module::{KAFKA_BOOTSTRAP_SERVERS, KAFKA_PRODUCER_CONFIG}; use skywalking_php_worker::reporter::KafkaReporterConfiguration; use skywalking_php_worker::{ HeartBeatConfiguration, WorkerConfiguration, new_tokio_runtime, + phm::PhmConfiguration, reporter::{GrpcReporterConfiguration, ReporterConfiguration}, start_worker, }; @@ -79,6 +81,7 @@ pub fn init_worker() { properties_report_period_factor: *PROPERTIES_REPORT_PERIOD_FACTOR, }), reporter_config, + phm: phm_configuration(), }; // Run the worker in subprocess. @@ -106,3 +109,21 @@ fn worker_threads() -> usize { worker_threads as usize } } + +#[cfg(target_os = "linux")] +fn phm_configuration() -> Option { + if !*METRICS_ENABLE { + return None; + } + Some(PhmConfiguration { + service_name: SERVICE_NAME.clone(), + service_instance: SERVICE_INSTANCE.clone(), + report_period_secs: *METRICS_REPORT_PERIOD, + php_process_pid: unsafe { libc::getppid() as i32 }, + }) +} + +#[cfg(not(target_os = "linux"))] +fn phm_configuration() -> Option { + None +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 8083027..c0f659d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -319,8 +319,17 @@ fn setup_php_fpm(index: usize, fpm_addr: &str) -> Child { "-d", "skywalking_agent.psr_logging_level=Warning", ]; + let mut args: Vec = args.iter().map(|s| (*s).to_string()).collect(); + if index == 1 { + args.extend([ + "-d".to_owned(), + "skywalking_agent.metrics_enable=On".to_owned(), + "-d".to_owned(), + "skywalking_agent.metrics_report_period=5".to_owned(), + ]); + } info!(cmd = args.join(" "), "start command"); - let child = Command::new(args[0]) + let child = Command::new(&args[0]) .args(&args[1..]) .stdin(Stdio::null()) .stdout(File::create("/tmp/fpm-skywalking-stdout.log").unwrap()) diff --git a/tests/data/expected_context.yaml b/tests/data/expected_context.yaml index d67e37b..40fbc20 100644 --- a/tests/data/expected_context.yaml +++ b/tests/data/expected_context.yaml @@ -2059,3 +2059,31 @@ logItems: - {key: bar, value: 'false'} - {key: baz, value: test} layer: '' +meterItems: + - serviceName: skywalking-agent-test-1 + meterSize: ge 6 + meters: + - meterId: + name: instance_php_process_cpu_utilization + tags: [] + singleValue: ge 0 + - meterId: + name: instance_php_memory_used_mb + tags: [] + singleValue: ge 0 + - meterId: + name: instance_php_memory_peak_mb + tags: [] + singleValue: ge 0 + - meterId: + name: instance_php_virtual_memory_mb + tags: [] + singleValue: ge 0 + - meterId: + name: instance_php_thread_count + tags: [] + singleValue: ge 1 + - meterId: + name: instance_php_open_fd_count + tags: [] + singleValue: ge 1 diff --git a/tests/e2e.rs b/tests/e2e.rs index 97ba724..a76ce6c 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -74,7 +74,8 @@ async fn run_e2e() { request_swoole_2_predis().await; request_swoole_2_mongodb().await; request_swoole_2_memcache().await; - sleep(Duration::from_secs(3)).await; + // Wait for PHM meter report (metrics_report_period=5s on FPM test-1). + sleep(Duration::from_secs(8)).await; request_collector_validate().await; } diff --git a/worker/src/lib.rs b/worker/src/lib.rs index 45df23f..9f04a08 100644 --- a/worker/src/lib.rs +++ b/worker/src/lib.rs @@ -14,6 +14,7 @@ // limitations under the License. pub mod channel; +pub mod phm; pub mod reporter; use crate::{ @@ -45,6 +46,7 @@ pub struct WorkerConfiguration { pub socket_file_path: PathBuf, pub heart_beat: Option, pub reporter_config: ReporterConfiguration, + pub phm: Option, } pub struct HeartBeatConfiguration { @@ -126,7 +128,11 @@ pub async fn start_worker(config: WorkerConfiguration) -> anyhow::Result<()> { }); if let Some(heart_beat_config) = config.heart_beat { - report_properties_and_keep_alive(heart_beat_config, TxReporter(tx_)); + report_properties_and_keep_alive(heart_beat_config, TxReporter(tx_.clone())); + } + + if let Some(phm_config) = config.phm { + phm::run_phm_collector(phm_config, TxReporter(tx_)); } // Run reporter with blocking. diff --git a/worker/src/phm.rs b/worker/src/phm.rs new file mode 100644 index 0000000..319aaf6 --- /dev/null +++ b/worker/src/phm.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Periodic PHM meter collection in the reporter worker, similar to Java +//! `JVMService` / Go runtime meter collector. Samples the parent PHP-FPM worker +//! process via `/proc` so meters are reported without waiting for HTTP traffic. + +use crate::channel::TxReporter; +use skywalking::{ + proto::v3::{MeterData, MeterSingleValue, meter_data::Metric}, + reporter::{CollectItem, Report}, +}; +use std::{ + fs, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tracing::{debug, trace, warn}; + +const METRIC_PROCESS_CPU: &str = "instance_php_process_cpu_utilization"; +const DEFAULT_CLK_TCK: f64 = 100.0; +const METRIC_MEMORY_USED_MB: &str = "instance_php_memory_used_mb"; +const METRIC_MEMORY_PEAK_MB: &str = "instance_php_memory_peak_mb"; +const METRIC_THREAD_COUNT: &str = "instance_php_thread_count"; +const METRIC_VIRTUAL_MEMORY_MB: &str = "instance_php_virtual_memory_mb"; +const METRIC_OPEN_FD_COUNT: &str = "instance_php_open_fd_count"; + +pub struct PhmConfiguration { + pub service_name: String, + pub service_instance: String, + pub report_period_secs: i64, + /// Parent PHP-FPM worker PID (the process that forked this worker). + pub php_process_pid: i32, +} + +struct CpuStatSample { + utime: u64, + stime: u64, + wall_ms: u128, +} + +pub fn run_phm_collector(config: PhmConfiguration, reporter: TxReporter) { + tokio::spawn(async move { + let period = Duration::from_secs(config.report_period_secs.max(1) as u64); + let mut cpu_sample: Option = None; + loop { + let pid = resolve_php_process_pid(config.php_process_pid); + if !process_alive(pid) { + warn!(pid, "PHM target PHP process is gone, stop collector"); + break; + } + + if let Some(mb) = read_status_kib(pid, "VmRSS") { + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_MEMORY_USED_MB, + mb, + ); + } + if let Some(mb) = read_status_kib(pid, "VmHWM") { + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_MEMORY_PEAK_MB, + mb, + ); + } + if let Some(mb) = read_status_kib(pid, "VmSize") { + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_VIRTUAL_MEMORY_MB, + mb, + ); + } + if let Some(count) = read_status_count(pid, "Threads") { + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_THREAD_COUNT, + count as f64, + ); + } + if let Some(count) = read_open_fd_count(pid) { + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_OPEN_FD_COUNT, + count, + ); + } + if let Some((utime, stime)) = read_proc_stat_cpu(pid) { + let now_ms = current_time_millis(); + let cpu = match &mut cpu_sample { + None => { + cpu_sample = Some(CpuStatSample { + utime, + stime, + wall_ms: now_ms, + }); + None + } + Some(sample) => { + let delta_jiffies = + utime.saturating_sub(sample.utime) + stime.saturating_sub(sample.stime); + let delta_wall_ms = now_ms.saturating_sub(sample.wall_ms); + sample.utime = utime; + sample.stime = stime; + sample.wall_ms = now_ms; + Some(cpu_percent(delta_jiffies, delta_wall_ms)) + } + }; + if let Some(cpu) = cpu { + trace!(pid, cpu, "report PHM process CPU from worker collector"); + report_meter( + &reporter, + &config.service_name, + &config.service_instance, + METRIC_PROCESS_CPU, + cpu, + ); + } + } else { + warn!(pid, "failed to read /proc stat for PHM CPU sampling"); + } + debug!(pid, "PHM proc meters reported from worker collector"); + tokio::time::sleep(period).await; + } + }); +} + +fn resolve_php_process_pid(fallback: i32) -> i32 { + let ppid = unsafe { libc::getppid() as i32 }; + if ppid > 1 { ppid } else { fallback } +} + +fn process_alive(pid: i32) -> bool { + fs::metadata(format!("/proc/{pid}")).is_ok() +} + +fn read_status_count(pid: i32, key: &str) -> Option { + let content = fs::read_to_string(format!("/proc/{pid}/status")).ok()?; + let prefix = format!("{key}:"); + for line in content.lines() { + if line.starts_with(&prefix) { + return line.split_whitespace().nth(1)?.parse().ok(); + } + } + None +} + +fn read_open_fd_count(pid: i32) -> Option { + let count = fs::read_dir(format!("/proc/{pid}/fd")) + .ok()? + .filter_map(|entry| entry.ok()) + .count(); + Some(count as f64) +} + +fn read_status_kib(pid: i32, key: &str) -> Option { + let content = fs::read_to_string(format!("/proc/{pid}/status")).ok()?; + let prefix = format!("{key}:"); + for line in content.lines() { + if line.starts_with(&prefix) { + let kb: f64 = line.split_whitespace().nth(1)?.parse().ok()?; + return Some(kb / 1024.0); + } + } + None +} + +fn read_proc_stat_cpu(pid: i32) -> Option<(u64, u64)> { + let content = fs::read_to_string(format!("/proc/{pid}/stat")).ok()?; + let rparen = content.rfind(')')?; + let fields: Vec<&str> = content[rparen + 2..].split_whitespace().collect(); + let utime: u64 = fields.get(11)?.parse().ok()?; + let stime: u64 = fields.get(12)?.parse().ok()?; + Some((utime, stime)) +} + +fn cpu_percent(delta_jiffies: u64, delta_wall_ms: u128) -> f64 { + if delta_wall_ms == 0 { + return 0.0; + } + let clk_tck = unsafe { libc::sysconf(libc::_SC_CLK_TCK) }; + let clk_tck = if clk_tck > 0 { + clk_tck as f64 + } else { + warn!( + clk_tck, + "sysconf(_SC_CLK_TCK) unavailable, using default {DEFAULT_CLK_TCK}" + ); + DEFAULT_CLK_TCK + }; + let cpu_sec = delta_jiffies as f64 / clk_tck; + let wall_sec = delta_wall_ms as f64 / 1000.0; + cpu_sec / wall_sec * 100.0 +} + +fn report_meter(reporter: &TxReporter, service: &str, instance: &str, name: &str, value: f64) { + reporter.report(CollectItem::Meter(Box::new(MeterData { + service: service.to_owned(), + service_instance: instance.to_owned(), + timestamp: current_time_millis() as i64, + metric: Some(Metric::SingleValue(MeterSingleValue { + name: name.to_owned(), + labels: vec![], + value, + })), + }))); +} + +fn current_time_millis() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis()) + .unwrap_or_default() +} diff --git a/worker/src/reporter/meter_batch.rs b/worker/src/reporter/meter_batch.rs new file mode 100644 index 0000000..6a68204 --- /dev/null +++ b/worker/src/reporter/meter_batch.rs @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Batched meter reporter using `MeterReportServiceClient::collect_batch`. +//! +//! SkyWalking OAP applies meter aggregation when a gRPC stream completes. The +//! streaming `collect` RPC keeps the stream open while the agent sends points, +//! so OAP does not finalize those meters until disconnect. PHP emits metrics on +//! a periodic tick rather than a long-lived stream, which would delay or lose +//! data if we used `collect` alone. `collectBatch` accepts a short stream of +//! `MeterDataCollection` messages and processes each batch when that stream +//! ends, which matches our flush-every-interval reporting model. +//! +//! On `collectBatch` failure the batch is kept and retried every 5 seconds +//! (same backoff as Go). Retries stop after [`MAX_BATCH_RETRY_ATTEMPTS`] +//! failures or [`MAX_BATCH_RETENTION`], whichever comes first; the batch is +//! then dropped. + +use skywalking::proto::v3::{ + MeterData, MeterDataCollection, meter_report_service_client::MeterReportServiceClient, +}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + select, + sync::mpsc, + time::{self, MissedTickBehavior}, +}; +use tokio_stream::{self as stream}; +use tonic::{ + Status, + metadata::{Ascii, MetadataValue}, + service::{Interceptor, interceptor::InterceptedService}, + transport::Channel, +}; +use tracing::warn; + +const FLUSH_INTERVAL: Duration = Duration::from_secs(5); +/// Same backoff as Go agent meter stream reopen (`time.Sleep(5 * +/// time.Second)`). +const BATCH_RETRY_INTERVAL: Duration = Duration::from_secs(5); +/// Stop retrying a stuck batch after this many consecutive send failures. +const MAX_BATCH_RETRY_ATTEMPTS: u32 = 12; +/// Wall-clock cap on holding a failed batch (5 minutes). +const MAX_BATCH_RETENTION: Duration = Duration::from_secs(300); + +#[derive(Clone, Default)] +struct AuthInterceptor { + authentication: Option>, +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, mut request: tonic::Request<()>) -> Result, Status> { + if let Some(authentication) = &self.authentication { + if let Ok(authentication) = authentication.parse::>() { + request + .metadata_mut() + .insert("authentication", authentication); + } + } + Ok(request) + } +} + +type MeterClient = MeterReportServiceClient>; + +pub async fn run_meter_batch_reporter( + channel: Channel, authentication: String, mut meter_rx: mpsc::Receiver, +) -> anyhow::Result<()> { + let authentication = if authentication.is_empty() { + None + } else { + Some(Arc::new(authentication)) + }; + + let mut client = + MeterReportServiceClient::with_interceptor(channel, AuthInterceptor { authentication }); + + let mut buffer = Vec::new(); + let mut interval = time::interval(FLUSH_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + select! { + item = meter_rx.recv() => { + match item { + Some(meter) => buffer.push(meter), + None => { + flush_batch(&mut client, &mut buffer).await; + break; + } + } + } + _ = interval.tick() => { + flush_batch(&mut client, &mut buffer).await; + } + } + } + + Ok(()) +} + +async fn send_meter_batch( + client: &mut MeterClient, meter_data: Vec, +) -> Result<(), Status> { + let collection = MeterDataCollection { meter_data }; + client.collect_batch(stream::iter([collection])).await?; + Ok(()) +} + +async fn flush_batch(client: &mut MeterClient, buffer: &mut Vec) { + if buffer.is_empty() { + return; + } + + let pending = std::mem::take(buffer); + let count = pending.len(); + let retention_deadline = time::Instant::now() + MAX_BATCH_RETENTION; + let mut failures = 0u32; + + loop { + match send_meter_batch(client, pending.clone()).await { + Ok(()) => return, + Err(status) => { + failures += 1; + if failures >= MAX_BATCH_RETRY_ATTEMPTS + || time::Instant::now() >= retention_deadline + { + warn!( + ?status, + count, + failures, + max_failures = MAX_BATCH_RETRY_ATTEMPTS, + max_retention_secs = MAX_BATCH_RETENTION.as_secs(), + "Dropping meter batch after collectBatch retry limits exceeded" + ); + return; + } + warn!( + ?status, + count, + failures, + max_failures = MAX_BATCH_RETRY_ATTEMPTS, + max_retention_secs = MAX_BATCH_RETENTION.as_secs(), + "Collect meter data by collectBatch failed, retry after {:?}", + BATCH_RETRY_INTERVAL + ); + time::sleep(BATCH_RETRY_INTERVAL).await; + } + } + } +} diff --git a/worker/src/reporter/meter_filter.rs b/worker/src/reporter/meter_filter.rs new file mode 100644 index 0000000..8ee9cff --- /dev/null +++ b/worker/src/reporter/meter_filter.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use skywalking::{ + proto::v3::MeterData, + reporter::{CollectItem, CollectItemConsume, ConsumeResult}, +}; +use tokio::sync::mpsc; +use tonic::async_trait; +use tracing::warn; + +pub struct MeterFilteringConsumer { + inner: C, + meter_tx: mpsc::Sender, +} + +impl MeterFilteringConsumer { + pub fn new(inner: C, meter_tx: mpsc::Sender) -> Self { + Self { inner, meter_tx } + } + + fn forward_meter(&self, meter: MeterData) -> ConsumeResult { + if let Err(err) = self.meter_tx.try_send(meter) { + warn!(?err, "Failed to enqueue meter data for batch reporter"); + } + Ok(None) + } +} + +#[async_trait] +impl CollectItemConsume for MeterFilteringConsumer { + async fn consume(&mut self) -> ConsumeResult { + loop { + match self.inner.consume().await? { + None => return Ok(None), + Some(CollectItem::Meter(meter)) => { + self.forward_meter(*meter)?; + } + Some(item) => return Ok(Some(item)), + } + } + } + + async fn try_consume(&mut self) -> ConsumeResult { + loop { + match self.inner.try_consume().await? { + None => return Ok(None), + Some(CollectItem::Meter(meter)) => { + self.forward_meter(*meter)?; + } + Some(item) => return Ok(Some(item)), + } + } + } +} diff --git a/worker/src/reporter/mod.rs b/worker/src/reporter/mod.rs index 049c9fe..c2d75d4 100644 --- a/worker/src/reporter/mod.rs +++ b/worker/src/reporter/mod.rs @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod meter_batch; +mod meter_filter; mod reporter_grpc; mod reporter_kafka; diff --git a/worker/src/reporter/reporter_grpc.rs b/worker/src/reporter/reporter_grpc.rs index a32f150..cdbf38a 100644 --- a/worker/src/reporter/reporter_grpc.rs +++ b/worker/src/reporter/reporter_grpc.rs @@ -13,10 +13,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use super::{meter_batch, meter_filter}; use anyhow::anyhow; +use meter_filter::MeterFilteringConsumer; use skywalking::reporter::{CollectItemConsume, CollectItemProduce, grpc::GrpcReporter}; use std::time::Duration; -use tokio::time::sleep; +use tokio::{sync::mpsc, time::sleep}; use tonic::transport::{Certificate, Channel, ClientTlsConfig, Endpoint, Identity}; use tracing::{debug, info, warn}; @@ -36,6 +38,20 @@ pub async fn run_reporter( let endpoint = create_endpoint(&config).await?; let channel = connect(endpoint).await; + let (meter_tx, meter_rx) = mpsc::channel(128); + let meter_channel = channel.clone(); + let meter_authentication = config.authentication.clone(); + tokio::spawn(async move { + if let Err(err) = + meter_batch::run_meter_batch_reporter(meter_channel, meter_authentication, meter_rx) + .await + { + warn!(?err, "Meter batch reporter failed"); + } + }); + + let consumer = MeterFilteringConsumer::new(consumer, meter_tx); + let mut reporter = GrpcReporter::new_with_pc(channel, producer, consumer); if !config.authentication.is_empty() {