mirror of https://github.com/mastodon/flodgatt
Add temporary perf metrics
This commit is contained in:
parent
37b652ad79
commit
160e31a20f
18
src/main.rs
18
src/main.rs
|
@ -78,14 +78,23 @@ fn main() -> Result<(), FatalErr> {
|
||||||
.allow_methods(cfg.cors.allowed_methods)
|
.allow_methods(cfg.cors.allowed_methods)
|
||||||
.allow_headers(cfg.cors.allowed_headers);
|
.allow_headers(cfg.cors.allowed_headers);
|
||||||
|
|
||||||
|
// use futures::future::Future;
|
||||||
let streaming_server = move || {
|
let streaming_server = move || {
|
||||||
let manager = shared_manager.clone();
|
let manager = shared_manager.clone();
|
||||||
let stream = Interval::new(Instant::now(), poll_freq)
|
let stream = Interval::new(Instant::now(), poll_freq)
|
||||||
|
// .take(1200)
|
||||||
.map_err(|e| log::error!("{}", e))
|
.map_err(|e| log::error!("{}", e))
|
||||||
.for_each(move |_| {
|
.for_each(
|
||||||
let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover);
|
move |_| {
|
||||||
manager.poll_broadcast().map_err(FatalErr::log)
|
let mut manager = manager.lock().unwrap_or_else(redis::Manager::recover);
|
||||||
});
|
manager.poll_broadcast().map_err(FatalErr::log)
|
||||||
|
}, // ).and_then(|_| {
|
||||||
|
// log::info!("shutting down!");
|
||||||
|
// std::process::exit(0);
|
||||||
|
// futures::future::ok(())
|
||||||
|
// }
|
||||||
|
);
|
||||||
|
|
||||||
warp::spawn(lazy(move || stream));
|
warp::spawn(lazy(move || stream));
|
||||||
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
|
warp::serve(ws.or(sse).with(cors).or(status).recover(Handler::err))
|
||||||
};
|
};
|
||||||
|
@ -95,7 +104,6 @@ fn main() -> Result<(), FatalErr> {
|
||||||
fs::remove_file(socket).unwrap_or_default();
|
fs::remove_file(socket).unwrap_or_default();
|
||||||
let incoming = UnixListener::bind(socket)?.incoming();
|
let incoming = UnixListener::bind(socket)?.incoming();
|
||||||
fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?;
|
fs::set_permissions(socket, PermissionsExt::from_mode(0o666))?;
|
||||||
|
|
||||||
tokio::run(lazy(|| streaming_server().serve_incoming(incoming)));
|
tokio::run(lazy(|| streaming_server().serve_incoming(incoming)));
|
||||||
} else {
|
} else {
|
||||||
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
|
let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
|
||||||
|
|
|
@ -58,7 +58,7 @@ impl RedisConn {
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
};
|
};
|
||||||
if first_read {
|
if first_read {
|
||||||
size = 2000;
|
size = 5000;
|
||||||
buffer = vec![0_u8; size];
|
buffer = vec![0_u8; size];
|
||||||
first_read = false;
|
first_read = false;
|
||||||
}
|
}
|
||||||
|
@ -115,6 +115,14 @@ impl RedisConn {
|
||||||
Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id),
|
Timeline(Stream::Hashtag(id), _, _) => self.tag_name_cache.get(id),
|
||||||
_non_hashtag_timeline => None,
|
_non_hashtag_timeline => None,
|
||||||
};
|
};
|
||||||
|
log::info!(
|
||||||
|
"RedisConn.redis_input size: {}\n\
|
||||||
|
RedisConn.redis_input capacity: {}\n\
|
||||||
|
RedisConn.redis_input length: {}",
|
||||||
|
std::mem::size_of_val(&self.redis_input),
|
||||||
|
self.redis_input.capacity(),
|
||||||
|
self.redis_input.len()
|
||||||
|
);
|
||||||
|
|
||||||
let tl = timeline.to_redis_raw_timeline(hashtag)?;
|
let tl = timeline.to_redis_raw_timeline(hashtag)?;
|
||||||
let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl);
|
let (primary_cmd, secondary_cmd) = cmd.into_sendable(&tl);
|
||||||
|
|
|
@ -36,8 +36,6 @@ pub enum RedisParseOutput<'a> {
|
||||||
NonMsg(&'a str),
|
NonMsg(&'a str),
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO -- should this impl Iterator?
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub struct RedisMsg<'a> {
|
pub struct RedisMsg<'a> {
|
||||||
pub timeline_txt: &'a str,
|
pub timeline_txt: &'a str,
|
||||||
|
|
Loading…
Reference in New Issue