8000 Introduce a non blocking file appender by thekeys93 · Pull Request #673 · tokio-rs/tracing · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Introduce a non blocking file appender #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
945f4a8
Introduce a non blocking file appender
Apr 7, 2020
9ccaa5d
Use public interface for Rotation enum
thekeys93 Apr 8, 2020
2a34863
simply if statement
thekeys93 Apr 8, 2020
39f2709
Make WriterFactory pub(crate)
thekeys93 Apr 8, 2020
f251bbe
Make BufWriterFactory pub(crate)
thekeys93 Apr 8, 2020
de77617
Make InnerAppender pub(crate)
thekeys93 Apr 8, 2020
7c9ce25
Make InnerAppender constructor pub(crate)
thekeys93 Apr 8, 2020
e028dfa
make refresh_writer pub(crate)
thekeys93 Apr 8, 2020
55bdffe
Make should_rollover pub(crate)
thekeys93 Apr 8, 2020
1a70543
Make WorkerState pub(crate)
thekeys93 Apr 8, 2020
5b4eacf
Make Worker pub(crate)
thekeys93 Apr 8, 2020
18d0340
make constructor pub(crate)
thekeys93 Apr 8, 2020
c4a8447
Apply suggested readability improvements and using pub(crate) where a…
thekeys93 Apr 8, 2020
1c33af9
Use new Rotation const in match expressions. derive Copy trait for Ro…
Apr 8, 2020
6e8cb23
Remove copy trait from Rotation.
Apr 8, 2020
75ee728
Use Path for log_directory and filename prefix
thekeys93 Apr 8, 2020
e5944c0
Properly handle use of Path in non public facing apis
Apr 8, 2020
cc4fde7
Use Arc for error_counter, modify fileappender constructor
Apr 9, 2020
d9968b2
Fix getters for FileAppender, clone writer for getter
Apr 9, 2020
a1f0d01
WIP: Decoupling file appender and nonblocking
Apr 10, 2020
9316656
Apply suggestions from code review
thekeys93 Apr 10, 2020
ac14ca3
Update Rotation he 8000 lpers and add constructor
Apr 10, 2020
b3aadef
Remove generic from InnerAppender, use BufWriter always
Apr 10, 2020
f77a6a8
Move creation of log file to Inner. Get rid of BufWriterFactory
Apr 10, 2020
4417342
Remove multiple impl of InnerAppender
Apr 10, 2020
64b356f
Impl MakeWriter on NonBlocking and remove NonBlockingWriter
Apr 10, 2020
98db6f7
Override write_all
thekeys93 Apr 10, 2020
ec049e0
Use T:Write, get rid of RollingFileWriter
Apr 10, 2020
be7aeb4
cargo fmt
Apr 10, 2020
e87b631
Add lossy option for non_blocking
Apr 10, 2020
5d8475c
Add WorkerGuard and ensure closure of worker thread on drop
Apr 10, 2020
2983370
Apply suggestions from code review
thekeys93 Apr 13, 2020
8e6a395
address comments
Apr 13, 2020
80bf8e2
Add comment about worker thread yielding
Apr 13, 2020
8f9bdc1
Fix clippy warnings
Apr 13, 2020
e467bdd
Fix 1 more clippy warning
Apr 13, 2020
4b73a1c
Add 2 examples
Apr 14, 2020
2b6cb51
Take suggestions for example doc improvements
Apr 14, 2020
9e7620d
fix clippy warning about needless main
Apr 14, 2020
b415265
Revert "fix clippy warning about needless main"
Apr 14, 2020
4ed792c
Add tracing dev dependency, fix clippy warning
Apr 14, 2020
93b26af
Apply suggestions from code review
thekeys93 Apr 14, 2020
f494d53
Supress clippy warning using attribute
Apr 14, 2020
5216e6c
Fix fmt
Apr 14, 2020
97e5e15
Update tracing-appender/src/rolling.rs
thekeys93 Apr 14, 2020
714e5bd
fix doc
Apr 14, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ members = [
"tracing-macros",
"tracing-subscriber",
"tracing-serde",
"tracing-appender",
"examples"
]
18 changes: 18 additions & 0 deletions tracing-appender/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "tracing-appender"
version = "0.1.0"
authors = ["Zeki Sherif <zekshi@amazon.com>"]
edition = "2018"
repository = "https://github.com/tokio-rs/tracing"
homepage = "https://tokio.rs"
description = """
Provides an `appender` that writes to a file
"""

[dependencies]
tracing-subscriber = {path = "../tracing-subscriber", version = "0.2.4"}
crossbeam-channel = "0.4.2"
chrono = "0.4.11"

[dev-dependencies]
tracing = { path = "../tracing", version = "0.1" }
96 changes: 96 additions & 0 deletions tracing-appender/src/inner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::io::{BufWriter, Write};
use std::{fs, io};

use crate::rolling::Rotation;
use chrono::prelude::*;
use std::fmt::Debug;
use std::fs::{File, OpenOptions};
use std::path::Path;

#[derive(Debug)]
pub(crate) struct InnerAppender {
log_directory: String,
log_filename_prefix: String,
writer: BufWriter<File>,
next_date: DateTime<Utc>,
rotation: Rotation,
}

impl io::Write for InnerAppender {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let now = Utc::now();
self.write_timestamped(buf, now)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl InnerAppender {
pub(crate) fn new(
log_directory: &Path,
log_filename_prefix: &Path,
rotation: Rotation,
now: DateTime<Utc>,
) -> io::Result<Self> {
let log_directory = log_directory.to_str().unwrap();
let log_filename_prefix = log_filename_prefix.to_str().unwrap();

let filename = rotation.join_date(log_filename_prefix, &now);
let next_date = rotation.next_date(&now);

Ok(InnerAppender {
log_directory: log_directory.to_string(),
log_filename_prefix: log_filename_prefix.to_string(),
writer: create_writer(log_directory, &filename)?,
next_date,
rotation,
})
}

fn write_timestamped(&mut self, buf: &[u8], date: DateTime<Utc>) -> io::Result<usize> {
// Even if refresh_writer fails, we still have the original writer. Ignore errors
// and proceed with the write.
let buf_len = buf.len();
self.refresh_writer(date);
self.writer.write_all(buf).map(|_| buf_len)
}

fn refresh_writer(&mut self, now: DateTime<Utc>) {
if self.should_rollover(now) {
let filename = self.rotation.join_date(&self.log_filename_prefix, &now);

self.next_date = self.rotation.next_date(&now);

match create_writer(&self.log_directory, &filename) {
Ok(writer) => self.writer = writer,
Err(err) => eprintln!("Couldn't create writer for logs: {}", err),
}
}
}

fn should_rollover(&self, date: DateTime<Utc>) -> bool {
date >= self.next_date
}
}

fn create_writer(directory: &str, filename: &str) -> io::Result<BufWriter<File>> {
let file_path = Path::new(directory).join(filename);
Ok(BufWriter::new(open_file_create_parent_dirs(&file_path)?))
}

fn open_file_create_parent_dirs(path: &Path) -> io::Result<File> {
let mut open_options = OpenOptions::new();
open_options.append(true).create(true);

let new_file = open_options.open(path);
if new_file.is_err() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
return open_options.open(path);
}
}

new_file
}
49 changes: 49 additions & 0 deletions tracing-appender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::non_blocking::{NonBlocking, WorkerGuard};

use std::io::Write;

mod inner;
pub mod non_blocking;
pub mod rolling;
mod worker;

/// Creates a non-blocking, off-thread writer.
///
/// This spawns a dedicated worker thread which is responsible for writing log
/// lines to the provided writer. When a line is written using the returned
/// `NonBlocking` struct's `make_writer` method, it will be enqueued to be
/// written by the worker thread.
///
/// The queue has a fixed capacity, and if it becomes full, any logs written
/// to it will be dropped until capacity is once again available. This may
/// occur if logs are consistently produced faster than the worker thread can
/// output them. The queue capacity and behavior when full (i.e., whether to
/// drop logs or to exert backpressure to slow down senders) can be configured
/// using [`NonBlockingBuilder::default()`]. This function simply returns the default
/// configuration &mdash; it is equivalent to
///
/// ```rust
/// # use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
/// # fn doc() -> (NonBlocking, WorkerGuard) {
/// tracing_appender::non_blocking::NonBlocking::new(std::io::stdout())
/// # }
/// ```
/// [`NonBlocking::builder()`]: /non_blocking/struct.NonBlocking.html#method.builder
///
/// Note that the `WorkerGuard` returned by `non_blocking` _must_ be assigned to a binding that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also explain what WorkerGuard is , as well as noting that it shouldn't be dropped accidentally :)

and, it's fine for users to drop it if they don't care about ensuring that logs are flushed on panics (but they probably do care about this)

/// is not `_`, as `_` will result in the `WorkerGuard` being dropped immediately.
///
/// # Examples
/// ``` rust
///
/// # fn docs() {
/// let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
/// let subscriber = tracing_subscriber::fmt().with_writer(non_blocking);
/// tracing::subscriber::with_default(subscriber.finish(), || {
/// tracing::event!(tracing::Level::INFO, "Hello");
/// });
/// # }
/// ```
pub fn non_blocking<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
NonBlocking::new(writer)
}
146 changes: 146 additions & 0 deletions tracing-appender/src/non_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::worker::Worker;
use crossbeam_channel::{bounded, Sender};
use std::io;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::thread::JoinHandle;
use tracing_subscriber::fmt::MakeWriter;

pub const DEFAULT_BUFFERED_LINES_LIMIT: usize = 128_000;

#[derive(Debug)]
pub struct WorkerGuard {
guard: Option<JoinHandle<()>>,
shutdown_signal: Arc<AtomicBool>,
}
#[derive(Clone, Debug)]
pub struct NonBlocking {
error_counter: Arc<AtomicU64>,
channel: Sender<Vec<u8>>,
is_lossy: bool,
}

impl NonBlocking {
pub fn new<T: Write + Send + Sync + 'static>(writer: T) -> (NonBlocking, WorkerGuard) {
NonBlockingBuilder::default().finish(writer)
}

fn create<T: Write + Send + Sync + 'static>(
writer: T,
buffered_lines_limit: usize,
is_lossy: bool,
) -> (NonBlocking, WorkerGuard) {
let (sender, receiver) = bounded(buffered_lines_limit);
let shutdown_signal = Arc::new(AtomicBool::new(false));

let worker = Worker::new(receiver, writer, shutdown_signal.clone());
let worker_guard = WorkerGuard::new(worker.worker_thread(), shutdown_signal);

(
Self {
channel: sender,
error_counter: Arc::new(AtomicU64::new(0)),
is_lossy,
},
worker_guard,
)
}

pub fn error_counter(&self) -> Arc<AtomicU64> {
self.error_counter.clone()
}
}

#[derive(Debug)]
pub struct NonBlockingBuilder {
buffered_lines_limit: usize,
is_lossy: bool,
}

impl NonBlockingBuilder {
pub fn buffered_lines_limit(mut self, buffered_lines_limit: usize) -> NonBlockingBuilder {
self.buffered_lines_limit = buffered_lines_limit;
self
}

pub fn lossy(mut self, is_lossy: bool) -> NonBlockingBuilder {
self.is_lossy = is_lossy;
self
}

pub fn finish<T: Write + Send + Sync + 'static>(self, writer: T) -> (NonBlocking, WorkerGuard) {
NonBlocking::create(writer, self.buffered_lines_limit, self.is_lossy)
}
}

impl Default for NonBlockingBuilder {
fn default() -> Self {
NonBlockingBuilder {
buffered_lines_limit: DEFAULT_BUFFERED_LINES_LIMIT,
is_lossy: true,
}
}
}

impl std::io::Write for NonBlocking {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buf_size = buf.len();
if self.is_lossy {
if self.channel.try_send(buf.to_vec()).is_err() {
self.error_counter.fetch_add(1, Ordering::Relaxed);
}
} else {
return match self.channel.send(buf.to_vec()) {
Ok(_) => Ok(buf_size),
Err(_) => Err(io::Error::from(io::ErrorKind::Other)),
};
}
Ok(buf_size)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}

#[inline]
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.write(buf).map(|_| ())
}
}

impl MakeWriter for NonBlocking {
type Writer = NonBlocking;

fn make_writer(&self) -> Self::Writer {
self.clone()
}
}

impl WorkerGuard {
fn new(handle: JoinHandle<()>, shutdown_signal: Arc<AtomicBool>) -> Self {
WorkerGuard {
guard: Some(handle),
shutdown_signal,
}
}

fn stop(&mut self) -> std::thread::Result<()> {
match self.guard.take() {
Some(handle) => handle.join(),
None => Ok(()),
}
}
}

impl Drop for WorkerGuard {
fn drop(&mut self) {
self.shutdown_signal.store(true, Ordering::Relaxed);
match self.stop() {
Ok(_) => (),
Err(e) => println!("Failed to join worker thread. Error: {:?}", e),
}
}
}
Loading
0