Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache hit / miss rate metrics when using cached procedural macro #209

Open
kembofly opened this issue May 30, 2024 · 3 comments
Open

Cache hit / miss rate metrics when using cached procedural macro #209

kembofly opened this issue May 30, 2024 · 3 comments

Comments

@kembofly
Copy link

kembofly commented May 30, 2024

Questions:

  • Is there a way to get cache hit and miss metrics when using the cached procedural macro?
  • If not, why am I seeing performance regressions when I am literally implementing the same logic found when expanding the procedural macro and implementing the code myself? fya, I have checked and there is no performance regressions from emitting metrics so I don't think that contributes to the increased latency.

The hit/miss metrics are working and producing correctly results as far as I can see. I have done some more benchmarking and it turns out that managing our own global cache (which was required to get the hit/miss metrics) as opposed to using a procedural macro introduces performance regressions. This is most likely due to lock contention (as far as I can tell) caused by a Mutex on the global cache. I think there is performance optimizations done in the procedural macro in how they manage the cache that I am not able to replicate in my code.

Side note:
I was thinking I could use RwLock instead of Mutex to reduce lock contention by allowing for read and write locks. However, since the cache_get method takes a mutable reference to self, it would require a write-lock instead of a read-lock, thus it RwLock would end up behaving just the same as Mutex would

Using procedural macro cached

Using procedural macro I am achieving about 40ms average latency for my method read_kvstore

Latency metric:
image

code:

#[inline(always)]
#[tracing::instrument]
#[cached(
    time = 1800,
    size = 40000,
    sync_writes = true,
    key = "String",
    convert = r#"{ format!("{:?}:{}:{}:{}:{}", k, table, namespace, cluster, psm) }"#
)]
pub async fn read_kvstore(
    _ctx: Context,
    rpc_option: RpcOption,
    k: Vec<u8>,
    table: String,
    namespace: String,
    cluster: String,
    psm: String,
) -> Result<Vec<u8>, KVError> {
 ...

}

Using global cache:

Latency metric:

Using procedural macro I am achieving about 60ms average latency for my method read_kvstore, which is about a 20ms increase from using procedural macro.

image

Code:

static KV_CACHE: Lazy<Mutex<TimedSizedCache<String, Vec<u8>>>> = Lazy::new(|| {
    Mutex::new(TimedSizedCache::with_size_and_lifespan_and_refresh(
        40000, 1800, false,
    ))
});

lazy_static! {
    pub static ref KV_CACHE_HIT_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "kv_reader.cache.hit",
       metrics::MeasurerType::RateCounter
    );
    pub static ref KV_CACHE_MISS_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "kv_reader.cache.miss",
        metrics::MeasurerType::RateCounter
    );
}

#[inline(always)]
#[tracing::instrument]
pub async fn read_kvstore(
    _ctx: Context,
    rpc_option: RpcOption,
    k: Vec<u8>,
    table: String,
    namespace: String,
    cluster: String,
    psm: String,
) -> Result<Vec<u8>, KVError> {
    // let cache_key = make_cache_key(&k, &table, &namespace, &cluster, &psm);
    let cache_key = format!("{:?}:{}:{}:{}:{}", k, table, namespace, cluster, psm);
    let cache_tags = tags_vec! {
        "table" => table.clone(),
        "ns" => namespace.clone(),
        "cluster" => cluster.clone(),
    };

    let cached_value = {
        let mut cache = KV_CACHE.lock().await;
        cache.cache_get(&cache_key).cloned()
    };

    if let Some(value) = cached_value {
        KV_CACHE_HIT_METRIC.emit(&cache_tags, 1);
        return Ok(value);
    }

    let read_start = Instant::now();
    debug!(
        "Start to read kv, rpc_option={:?}, table={}, ns={}, cluster={}, psm={}",
        rpc_option, table, namespace, cluster, psm
    );
    let pool = KV_CLIENT_POOL.get();
    let kv_client = pool.get_client(&namespace, &cluster, &psm).await;
    let mut key_not_found = false;
    let read_res = match kv_client {
        None => {
            warn!("Fail to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}");
            Err(KVError {
                msg: format!(
                    "fail to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}"
                ),
            })
        }
        Some(kv_client) => {
            debug!(
                "Succeed to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}"
            );
            match tokio::time::timeout(
                Duration::from_millis(rpc_option.timeout_ms),
                kv_client.get(
                    table.as_str(),
                    k,
                    Timeout::MilliSeconds(rpc_option.timeout_ms),
                ),
            )
                .await
            {
                Err(tokio_err) => {
                    warn!("Fail to query kv, tokio timed out, table={table}, e={tokio_err}");
                    Err(HuygensError {
                        msg: format!(
                            "fail to query kv, tokio timed out, table={table}, e={tokio_err}"
                        ),
                    })
                }
                Ok(resp) => match resp {
                    Err(_e) => {
                        warn!("Fail to query kv, table={table}, e={_e}");
                        Err(KVError {
                            msg: format!("fail to query kv, e={_e}"),
                        })
                    }
                    Ok(value) => match value.0 {
                        Some(v) => {
                            debug!("Succeed to get reply from kv, resp_len={}", v.len());
                            Ok(v)
                        }
                        None => {
                            key_not_found = true;
                            Err(key_not_found_error())
                        }
                    },
                },
            }
        }
    };

    KV_CACHE_MISS_METRIC.emit(&cache_tags, 1);

    // Write the computed value into the cache before returning it
    if let Ok(ref value) = read_res {
        let mut cache_guard = KV_CACHE.lock().await;
        cache_guard.cache_set(cache_key, value.clone());
    }

    read_res
}
@jaemk
Copy link
Owner

jaemk commented May 30, 2024

Hi @kembofly the global cache produced by the procedural macro is in all caps with the name of the function, see

let cache = FIB.lock().unwrap();
The cache hit metrics will be available there.

Have you tried commenting out the metrics in your implementation to confirm it really isn't them causing an increase?

Another option, like you touched on, is to use a rwlock instead of a mutex. Unfortunately, this isn't supported by the macros yet and will require a new trait. Fortunately though, I recently added a cache store that supports size and time based eviction and doesn't requiring &mut access for reads: https://docs.rs/cached/latest/cached/stores/struct.ExpiringSizedCache.html. a consequence of non mutating reads is that it cannot collect its own metrics, but luckily you've already manually implemented the global static and metrics so you could swap out the mutex timedsizedcache for a rwlock expiringsizedcache and see if it helps improve things

@kembofly
Copy link
Author

kembofly commented Jun 3, 2024

@jaemk thank you for clarifying! :) I have tried using the procedural macro with the global cache variable above like you showed and I have also tried commenting out the metrics in my implementation but the issue still remains. As you can see below, when the new service is deployed but including the cache hit/miss metrics (see code change below), the average latency per request for my rpc service increases about 60ms.

image

Also, the cpu usage shot from about 20% to about 87% during the same period (ps, time is skewed 1h in the below metric)

img_v3_02bg_13d557a1-b475-4406-bd25-7de65862d9hu

I also tried using the ExpiringSizedCache struct like you suggested, but I didn't see any noteable difference in latency. However, I will give it another go again! Any suggestions on how to continue troubleshooting this? I included the latest edition of my code below.

code:

lazy_static! {
    pub static ref LATENCY_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "kv_reader.latency.us",
        metrics::MeasurerType::Timer
    );
    pub static ref THROUGHPUT_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "kv_reader.throughput",
        metrics::MeasurerType::Counter
    );
    pub static ref KV_CACHE_HIT_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "reader.cache.hit",
        metrics::MeasurerType::RateCounter
    );
    pub static ref KV_CACHE_MISS_METRIC: metrics::Metric = METRICS_CLIENT.metric(
        "kv_reader.cache.miss",
        metrics::MeasurerType::RateCounter
    );
}

#[inline(always)]
#[tracing::instrument]
pub async fn read_kv(
    _ctx: Context,
    rpc_option: RpcOption,
    k: Vec<u8>,
    table: String,
    namespace: String,
    cluster: String,
    psm: String,
) -> Result<Vec<u8>, KVError> {
    use cached::Cached;
    let cache_key = format!(
        "{:?}:{}:{}:{}:{}",
        k,
        table,
        namespace,
        cluster,
        psm,
    );

    let cache_tags = tags_vec! {
        "table" => table.clone(),
        "ns" => namespace.clone(),
        "cluster" => cluster.clone(),
    };

    // check if key exist in cache, if so, publish hit metric
    if READ_KV_WITH_CACHING.lock().await.cache_get(&cache_key).is_some() {
        // NOTE! an additional call to cache_get increases the hit rate. So using the `cache_hits`
        // method directly will result in an incorrect picture of how often the cache is hit
        KV_CACHE_HIT_METRIC.emit(&cache_tags, 1);
    }

    read_kv_with_caching(_ctx, rpc_option, k, table, namespace, cluster, psm).await
}

#[cached(
time = 1800,
size = 40000,
sync_writes = true,
key = "String",
convert = r#"{ format!("{:?}:{}:{}:{}:{}", k, table, namespace, cluster, psm) }"#
)]
pub async fn read_kv_with_caching(
    _ctx: Context,
    rpc_option: RpcOption,
    k: Vec<u8>,
    table: String,
    namespace: String,
    cluster: String,
    psm: String,
) -> Result<Vec<u8>, KVError> {
    let cache_tags = tags_vec! {
        "table" => table.clone(),
        "ns" => namespace.clone(),
        "cluster" => cluster.clone(),
    };

    let read_start = Instant::now();
    debug!(
        "Start to read kv, rpc_option={:?}, table={}, ns={}, cluster={}, psm={}",
        rpc_option, table, namespace, cluster, psm
    );
    let pool = KV_CLIENT_POOL.get();
    let kv_client = pool.get_client(&namespace, &cluster, &psm).await;
    let mut key_not_found = false;
    let read_res = match kv_client {
        None => {
            warn!("Fail to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}");
            Err(KVError {
                msg: format!(
                    "fail to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}"
                ),
            })
        }
        Some(kv_client) => {
            debug!(
                "Succeed to get kv client, ns: ${namespace}, cluster:${cluster}, psm: ${psm}"
            );
            match tokio::time::timeout(
                Duration::from_millis(rpc_option.timeout_ms),
                kv_client.get(
                    table.as_str(),
                    k,
                    Timeout::MilliSeconds(rpc_option.timeout_ms),
                ),
            )
                .await
            {
                Err(tokio_err) => {
                    warn!("Fail to query kv, tokio timed out, table={table}, e={tokio_err}");
                    Err(KVError {
                        msg: format!(
                            "fail to query kv, tokio timed out, table={table}, e={tokio_err}"
                        ),
                    })
                }
                Ok(resp) => match resp {
                    Err(_e) => {
                        warn!("Fail to query kv, table={table}, e={_e}");
                        Err(KVError {
                            msg: format!("fail to query kv, e={_e}"),
                        })
                    }
                    Ok(value) => match value.0 {
                        Some(v) => {
                            debug!("Succeed to get reply from kv, resp_len={}", v.len());
                            Ok(v)
                        }
                        None => {
                            key_not_found = true;
                            Err(key_not_found_error())
                        }
                    },
                },
            }
        }
    };


    let cluster_ns = format!("{}.{}", cluster, namespace);
    LATENCY_METRIC.emit(
        tags!["cluster_ns" => cluster_ns.clone()],
        read_start.elapsed().as_micros() as i64,
    );
    let is_error = if read_res.is_err() {
        if key_not_found {
            "1"
        } else {
            "10"
        }
    } else {
        "0"
    };

    THROUGHPUT_METRIC.emit(
        tags!["_is_error" => is_error, "cluster_ns" => cluster_ns.clone()],
        1,
    );

   KV_CACHE_MISS_METRIC.emit(&cache_tags, 1);

    read_res
}

@jaemk
Copy link
Owner

jaemk commented Jul 20, 2024

Hi @kembofly sorry for the late reply - I'm curious what the test and results of using ExpiringSizedCache were. Were you sure to make use of its ability to read using a .read lock and remove all other unnecessary .lock/.writes? ExpiringSizedCache also does not implement the Cached trait and doesn't collect hit/miss metrics on itself since doing so requires exclusive access for reading (which it is designed not to need). Your example would need to be refactored a bit

  • change read_kv_with_caching to not be cached annotated, maybe renamed to read_kv_inner
  • define a global static RwLock<ExpiringSizedCached<_, _>>
  • update read_kv to check the global cache (with .read), increment your metrics counts, record time, call the _inner method, and write to the cache (with .write)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants