diff --git a/src/response/redis/connection/connection.rs b/src/response/redis/connection/connection.rs deleted file mode 100644 index 08e6f20..0000000 --- a/src/response/redis/connection/connection.rs +++ /dev/null @@ -1,150 +0,0 @@ -use super::super::Error as ManagerErr; -use super::super::RedisCmd; -use super::RedisConnErr; -use crate::config::Redis; -use crate::request::Timeline; - -use futures::{Async, Poll}; -use lru::LruCache; -use std::io::{self, Read, Write}; -use std::net::TcpStream; -use std::time::Duration; - -type Result = std::result::Result; - -#[derive(Debug)] -pub(in super::super) struct RedisConn { - primary: TcpStream, - secondary: TcpStream, - pub(in super::super) namespace: Option, - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. - pub(in super::super) tag_name_cache: LruCache, -} - -impl RedisConn { - pub(in super::super) fn new(redis_cfg: &Redis) -> Result { - let addr = [&*redis_cfg.host, ":", &*redis_cfg.port.to_string()].concat(); - - let conn = Self::new_connection(&addr, redis_cfg.password.as_ref())?; - conn.set_nonblocking(true) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - Ok(Self { - primary: conn, - secondary: Self::new_connection(&addr, redis_cfg.password.as_ref())?, - tag_name_cache: LruCache::new(1000), - namespace: redis_cfg.namespace.clone().0, - }) - } - - pub(in super::super) fn poll_redis( - &mut self, - start: usize, - buffer: &mut Vec, - ) -> Poll { - const BLOCK: usize = 4096 * 2; - // if self.input.len() < start + BLOCK { - // self.input.resize(self.input.len() * 2, 0); - // log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); - // // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); - // } - - use Async::*; - match self.primary.read(&mut buffer[start..start + BLOCK]) { - Ok(n) => Ok(Ready(n)), - Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), - Err(e) => { - Ready(log::error!("{}", e)); - Ok(Ready(0)) - } - } - } - - pub(crate) fn send_cmd(&mut self, cmd: RedisCmd, timelines: &[Timeline]) -> Result<()> { - let namespace = self.namespace.take(); - let timelines: Result> = timelines - .iter() - .map(|tl| { - let hashtag = tl.tag().and_then(|id| self.tag_name_cache.get(&id)); - match &namespace { - Some(ns) => Ok(format!("{}:{}", ns, tl.to_redis_raw_timeline(hashtag)?)), - None => Ok(tl.to_redis_raw_timeline(hashtag)?), - } - }) - .collect(); - - let (primary_cmd, secondary_cmd) = cmd.into_sendable(&timelines?[..]); - self.primary.write_all(&primary_cmd)?; - - // We also need to set a key to tell the Puma server that we've subscribed or - // unsubscribed to the channel because it stops publishing updates when it thinks - // no one is subscribed. - // (Documented in [PR #3278](https://github.com/tootsuite/mastodon/pull/3278)) - // Question: why can't the Puma server just use NUMSUB for this? - self.secondary.write_all(&secondary_cmd)?; - Ok(()) - } - - fn new_connection(addr: &str, pass: Option<&String>) -> Result { - let mut conn = TcpStream::connect(&addr)?; - if let Some(password) = pass { - Self::auth_connection(&mut conn, &addr, password)?; - } - - Self::validate_connection(&mut conn, &addr)?; - conn.set_read_timeout(Some(Duration::from_millis(10))) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - Self::set_connection_name(&mut conn, &addr)?; - Ok(conn) - } - - fn auth_connection(conn: &mut TcpStream, addr: &str, pass: &str) -> Result<()> { - conn.write_all( - &[ - b"*2\r\n$4\r\nauth\r\n$", - pass.len().to_string().as_bytes(), - b"\r\n", - pass.as_bytes(), - b"\r\n", - ] - .concat(), - ) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 5]; - conn.read_exact(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - if String::from_utf8_lossy(&buffer) != "+OK\r\n" { - Err(RedisConnErr::IncorrectPassword(pass.to_string()))? - } - Ok(()) - } - - fn validate_connection(conn: &mut TcpStream, addr: &str) -> Result<()> { - conn.write_all(b"PING\r\n") - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 100]; - conn.read(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let reply = String::from_utf8_lossy(&buffer); - match &*reply { - r if r.starts_with("+PONG\r\n") => Ok(()), - r if r.starts_with("-NOAUTH") => Err(RedisConnErr::MissingPassword), - r if r.starts_with("HTTP/1.") => Err(RedisConnErr::NotRedis(addr.to_string())), - _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), - } - } - - fn set_connection_name(conn: &mut TcpStream, addr: &str) -> Result<()> { - conn.write_all(b"*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$8\r\nflodgatt\r\n") - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let mut buffer = vec![0_u8; 100]; - conn.read(&mut buffer) - .map_err(|e| RedisConnErr::with_addr(&addr, e))?; - let reply = String::from_utf8_lossy(&buffer); - match &*reply { - r if r.starts_with("+OK\r\n") => Ok(()), - _ => Err(RedisConnErr::InvalidRedisReply(reply.to_string())), - } - } -} diff --git a/src/response/redis/connection/mock.rs b/src/response/redis/connection/mock.rs deleted file mode 100644 index 3043671..0000000 --- a/src/response/redis/connection/mock.rs +++ /dev/null @@ -1,71 +0,0 @@ -pub(crate) use super::RedisConnErr; - -use super::super::Error as ManagerErr; -use super::super::RedisCmd; -use crate::config::Redis; -use crate::request::Timeline; - -use futures::{Async, Poll}; -use lru::LruCache; - -type Result = std::result::Result; - -#[derive(Debug)] -pub(in super::super) struct RedisConn { - pub(in super::super) namespace: Option, - // TODO: eventually, it might make sense to have Mastodon publish to timelines with - // the tag number instead of the tag name. This would save us from dealing - // with a cache here and would be consistent with how lists/users are handled. - pub(in super::super) tag_name_cache: LruCache, - pub(in super::super) input: Vec, -} - -impl RedisConn { - pub(in super::super) fn new(redis_cfg: &Redis) -> Result { - Ok(Self { - tag_name_cache: LruCache::new(1000), - namespace: redis_cfg.namespace.clone().0, - input: vec![0; 4096 * 4], - }) - } - pub(in super::super) fn poll_redis(&mut self, start: usize) -> Poll { - const BLOCK: usize = 4096 * 2; - if self.input.len() < start + BLOCK { - self.input.resize(self.input.len() * 2, 0); - log::info!("Resizing input buffer to {} KiB.", self.input.len() / 1024); - // log::info!("Current buffer: {}", String::from_utf8_lossy(&self.input)); - } - - use Async::*; - // self.input[start..start + BLOCK] = &"foo".as_bytes(); - let mut n = 0; - for i in 0..BLOCK { - if let Some(byte) = TEST_INPUT.get(start + i) { - self.input[start + 1] = *byte; - n += 1; - } - } - - Ok(Ready(n)) - // match self.primary.read(&mut self.input[start..start + BLOCK]) { - // Ok(n) => Ok(Ready(n)), - // Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => Ok(NotReady), - // Err(e) => { - // Ready(log::error!("{}", e)); - // Ok(Ready(0)) - // } - // } - } - - pub(crate) fn send_cmd(&mut self, _cmd: RedisCmd, _timelines: &[Timeline]) -> Result<()> { - Ok(()) - } -} - -const TEST_INPUT: &[u8] = r##"*3 -$7 -message -$15 -timeline:public -$3790 -{"event":"update","payload":{"id":"102775370117886890","created_at":"2019-09-11T18:42:19.000Z","in_reply_to_id":null,"in_reply_to_account_id":null,"sensitive":false,"spoiler_text":"","visibility":"unlisted","language":"en","uri":"https://mastodon.host/users/federationbot/statuses/102775346916917099","url":"https://mastodon.host/@federationbot/102775346916917099","replies_count":0,"reblogs_count":0,"favourites_count":0,"favourited":false,"reblogged":false,"muted":false,"content":"

Trending tags:
#neverforget
#4styles
#newpipe
#uber
#mercredifiction

","reblog":null,"account":{"id":"78","username":"federationbot","acct":"federationbot@mastodon.host","display_name":"Federation Bot","locked":false,"bot":false,"created_at":"2019-09-10T15:04:25.559Z","note":"

Hello, I am mastodon.host official semi bot.

Follow me if you want to have some updates on the view of the fediverse from here ( I only post unlisted ).

I also randomly boost one of my followers toot every hour !

If you don't feel confortable with me following you, tell me: unfollow and I'll do it :)

If you want me to follow you, just tell me follow !

If you want automatic follow for new users on your instance and you are an instance admin, contact me !

Other commands are private :)

","url":"https://mastodon.host/@federationbot","avatar":"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863","avatar_static":"https://instance.codesections.com/system/accounts/avatars/000/000/078/original/d9e2be5398629cf8.jpeg?1568127863","header":"https://instance.codesections.com/headers/original/missing.png","header_static":"https://instance.codesections.com/headers/original/missing.png","followers_count":16636,"following_count":179532,"statuses_count":50554,"emojis":[],"fields":[{"name":"More stats","value":"https://mastodon.host/stats.html","verified_at":null},{"name":"More infos","value":"https://mastodon.host/about/more","verified_at":null},{"name":"Owner/Friend","value":"@gled","verified_at":null}]},"media_attachments":[],"mentions":[],"tags":[{"name":"4styles","url":"https://instance.codesections.com/tags/4styles"},{"name":"neverforget","url":"https://instance.codesections.com/tags/neverforget"},{"name":"mercredifiction","url":"https://instance.codesections.com/tags/mercredifiction"},{"name":"uber","url":"https://instance.codesections.com/tags/uber"},{"name":"newpipe","url":"https://instance.codesections.com/tags/newpipe"}],"emojis":[],"card":null,"poll":null},"queued_at":1568227693541}"##.as_bytes();