8000 daemon: handle event queueing and shutdown issues by sandydoo · Pull Request #692 · cachix/cachix · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

daemon: handle event queueing and shutdown issues #692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 48 additions & 14 deletions cachix/src/Cachix/Daemon.hs
10000
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import Cachix.Daemon.Log qualified as Log
import Cachix.Daemon.Protocol as Protocol
import Cachix.Daemon.Push as Push
import Cachix.Daemon.PushManager qualified as PushManager
import Cachix.Daemon.ShutdownLatch
import Cachix.Daemon.SocketStore qualified as SocketStore
import Cachix.Daemon.Subscription as Subscription
import Cachix.Daemon.Types as Types
Expand All @@ -33,6 +32,7 @@ import Cachix.Types.BinaryCache (BinaryCacheName)
import Cachix.Types.BinaryCache qualified as BinaryCache
import Control.Concurrent.STM.TMChan
import Control.Exception.Safe (catchAny)
import Data.IORef (IORef, atomicModifyIORef', newIORef)
import Data.Text qualified as T
import Hercules.CNix.Store (Store, withStore)
import Hercules.CNix.Util qualified as CNix.Util
Expand All @@ -43,7 +43,7 @@ import Protolude hiding (bracket)
import System.IO.Error (isResourceVanishedError)
import System.Posix.Process (getProcessID)
import System.Posix.Signals qualified as Signal
import UnliftIO (MonadUnliftIO)
import UnliftIO (MonadUnliftIO, withRunInIO)
import UnliftIO.Async qualified as Async
import UnliftIO.Exception (bracket)

Expand Down Expand Up @@ -71,7 +71,6 @@ new daemonEnv nixStore daemonOptions daemonLogHandle daemonPushOptions daemonCac
daemonLogger <- Log.new "cachix.daemon" daemonLogHandle daemonLogLevel

daemonEventLoop <- EventLoop.new
daemonShutdownLatch <- newShutdownLatch
daemonPid <- getProcessID

daemonSocketPath <- maybe getSocketPath pure (Options.daemonSocketPath daemonOptions)
Expand Down Expand Up @@ -99,7 +98,7 @@ start :: Env -> DaemonOptions -> PushOptions -> BinaryCacheName -> IO ()
start daemonEnv daemonOptions daemonPushOptions daemonCacheName =
withStore $ \store -> do
daemon <- new daemonEnv store daemonOptions Nothing daemonPushOptions daemonCacheName
installSignalHandlers daemon
void $ runDaemon daemon installSignalHandlers
result <- run daemon
exitWith (toExitCode result)

Expand Down Expand Up @@ -156,14 +155,52 @@ stopIO :: DaemonEnv -> IO ()
stopIO DaemonEnv {daemonEventLoop} =
EventLoop.sendIO daemonEventLoop ShutdownGracefully

installSignalHandlers :: DaemonEnv -> IO ()
installSignalHandlers daemon = do
for_ [Signal.sigTERM, Signal.sigINT] $ \signal ->
Signal.installHandler signal (Signal.CatchOnce handler) Nothing
installSignalHandlers :: Daemon ()
installSignalHandlers = do
withRunInIO $ \runInIO -> do
mainThreadId <- myThreadId
-- Track Ctrl+C attempts
interruptRef <- newIORef False

-- Install signal handlers using runInIO to properly run Daemon actions from IO
_ <- Signal.installHandler Signal.sigTERM (Signal.Catch (runInIO (termHandler mainThreadId))) Nothing
_ <- Signal.installHandler Signal.sigINT (Signal.Catch (runInIO (intHandler mainThreadId interruptRef))) Nothing

return ()
where
handler = do
CNix.Util.triggerInterrupt
stopIO daemon
-- SIGTERM: Trigger immediate shutdown
termHandler :: ThreadId -> Daemon ()
termHandler mainThreadId = do
Katip.logFM Katip.InfoS "sigTERM received. Exiting immediately..."
startExitTimer mainThreadId
liftIO CNix.Util.triggerInterrupt
eventLoop <- asks daemonEventLoop
-- Signal directly to the event loop to ensure exit even if queue is full
EventLoop.exitLoopWithFailure EventLoopClosed eventLoop

-- SIGINT: First try to shutdown gracefully, on second press force exit
intHandler :: ThreadId -> IORef Bool -> Daemon ()
intHandler mainThreadId interruptRef = do
liftIO CNix.Util.triggerInterrupt
isSecondInterrupt <- liftIO $ atomicModifyIORef' interruptRef (True,)
eventLoop <- asks daemonEventLoop

if isSecondInterrupt
then do
Katip.logFM Katip.InfoS "Exiting immediately..."
startExitTimer mainThreadId
-- Force shutdown at the event loop level to ensure exit even if queue is full
EventLoop.exitLoopWithFailure EventLoopClosed eventLoop
else do
Katip.logFM Katip.InfoS "Shutting down gracefully (Ctrl+C again to force exit)..."
EventLoop.send eventLoop ShutdownGracefully

-- Start a timer to ensure we exit even if the event loop hangs
startExitTimer :: ThreadId -> Daemon ()
startExitTimer mainThreadId = do
void $ liftIO $ forkIO $ do
threadDelay (15 * 1000 * 1000) -- 15 seconds
Copy link
Preview
Copilot AI May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting the magic number for the exit timer delay into a named constant to improve maintainability.

Suggested change
threadDelay (15 * 1000 * 1000) -- 15 seconds
threadDelay exitTimerDelayMicroseconds

Copilot uses AI. Check for mistakes.

throwTo mainThreadId ExitSuccess

queueJob :: Protocol.PushRequest -> Daemon ()
queueJob pushRequest = do
Expand Down Expand Up @@ -208,9 +245,6 @@ shutdownGracefully :: Daemon (Either DaemonError ())
shutdownGracefully = do
DaemonEnv {..} <- ask

-- Indicate that the daemon is shutting down
initiateShutdown daemonShutdownLatch

-- Stop the push manager and wait for any remaining paths to be uploaded
shutdownPushManager daemonPushManager

Expand Down
90 changes: 60 additions & 30 deletions cachix/src/Cachix/Daemon/EventLoop.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ module Cachix.Daemon.EventLoop
sendIO,
run,
exitLoopWith,
exitLoopWithFailure,
EventLoop,
)
where

import Cachix.Daemon.ShutdownLatch (ShutdownLatch)
import Cachix.Daemon.ShutdownLatch qualified as ShutdownLatch
import Cachix.Daemon.Types.EventLoop (EventLoop (..), EventLoopError (..))
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
( isFullTBMQueue,
newTBMQueueIO,
Expand All @@ -21,9 +25,9 @@ import Protolude

new :: (MonadIO m) => m (EventLoop event a)
new = do
exitLatch <- liftIO newEmptyMVar
queue <- liftIO $ newTBMQueueIO 100
return $ EventLoop {queue, exitLatch}
shutdownLatch <- ShutdownLatch.newShutdownLatch
queue <- liftIO $ newTBMQueueIO 100_000
Copy link
Preview
Copilot AI May 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The increased queue capacity to 100_000 may have performance implications under high load; consider reviewing resource usage and potential memory constraints.

Suggested change
queue <- liftIO $ newTBMQueueIO 100_000
let defaultQueueCapacity = 10_000 -- Default capacity for the event loop queue
queue <- liftIO $ newTBMQueueIO defaultQueueCapacity

Copilot uses AI. Check for mistakes.

return $ EventLoop {queue, shutdownLatch}

-- | Send an event to the event loop with logging.
send :: (Katip.KatipContext m) => EventLoop event a -> event -> m ()
Expand All @@ -38,41 +42,67 @@ sendIO = send' logger
logger _ _ = return ()

send' :: (MonadIO m) => (Katip.Severity -> Katip.LogStr -> m ()) -> EventLoop event a -> event -> m ()
send' logger eventloop@(EventLoop {queue}) event = do
res <- liftIO $ atomically $ tryWriteTBMQueue queue event
case res of
-- The queue is closed.
Nothing ->
logger Katip.DebugS "Ignored an event because the event loop is closed"
-- Successfully wrote to the queue
Just True -> return ()
-- Failed to write to the queue
Just False -> do
isFull <- liftIO $ atomically $ isFullTBMQueue queue
let message =
if isFull
then "Event loop is full"
else "Unknown error"
logger Katip.ErrorS $ "Failed to write to event loop: " <> message
exitLoopWithFailure EventLoopFull eventloop
send' logger eventloop@(EventLoop {queue, shutdownLatch}) event = do
-- First check if shutdown has been requested
isExiting <- ShutdownLatch.isShuttingDown shutdownLatch
if isExiting
then logger Katip.DebugS "Ignored an event because the event loop is shutting down"
else do
res <- liftIO $ atomically $ tryWriteTBMQueue queue event
case res of
-- The queue is closed.
Nothing ->
logger Katip.DebugS "Ignored an event because the event loop is closed"
-- Successfully wrote to the queue
Just True -> return ()
-- Failed to write to the queue
Just False -> do
isFull <- liftIO $ atomically $ isFullTBMQueue queue
let message =
if isFull
then "Event loop is full"
else "Unknown error"
logger Katip.ErrorS $ "Failed to write to event loop: " <> message
exitLoopWithFailure EventLoopFull eventloop

-- | Run the event loop until it exits with 'exitLoopWith'.
run :: (MonadIO m) => EventLoop event a -> (event -> m ()) -> m (Either EventLoopError a)
run eventloop f = do
run eventloop@(EventLoop {queue, shutdownLatch}) f =
fix $ \loop -> do
mevent <- liftIO $ atomically $ readTBMQueue (queue eventloop)
case mevent of
Just event -> f event
Nothing -> exitLoopWithFailure EventLoopClosed eventloop
-- Wait for either a shutdown signal or a message from the queue
eitherResult <-
liftIO $
atomically $
fmap Left (ShutdownLatch.waitForShutdownSTM shutdownLatch)
`orElse`
-- Try to read from queue
( do
mevent <- readTBMQueue queue
case mevent of
-- Got an event, return it
Just event -> return $ Right event
-- Queue is closed, signal shutdown
Nothing -> do
ShutdownLatch.initiateShutdownWithResultSTM (Left EventLoopClosed) shutdownLatch
result <- ShutdownLatch.waitForShutdownSTM shutdownLatch
return $ Left result
)

liftIO (tryReadMVar (exitLatch eventloop)) >>= \case
Just exitValue -> return exitValue
Nothing -> loop
-- Process the result
case eitherResult of
-- Shutdown requested, return the result
Left result -> return result
-- Got an event, process it and continue looping
Right event -> do
f event
loop

-- | Short-circuit the event loop and exit with a given return value.
exitLoopWith :: (MonadIO m) => a -> EventLoop event a -> m ()
exitLoopWith exitValue (EventLoop {exitLatch}) = void $ liftIO $ tryPutMVar exitLatch (Right exitValue)
exitLoopWith exitValue (EventLoop {shutdownLatch}) =
ShutdownLatch.initiateShutdown exitValue shutdownLatch

-- | Short-circuit the event loop in case of an internal error.
exitLoopWithFailure :: (MonadIO m) => EventLoopError -> EventLoop event a -> m ()
exitLoopWithFailure err (EventLoop {exitLatch}) = void $ liftIO $ tryPutMVar exitLatch (Left err)
exitLoopWithFailure err (EventLoop {shutdownLatch}) =
ShutdownLatch.initiateShutdownWithResult (Left err) shutdownLatch
66 changes: 56 additions & 10 deletions cachix/src/Cachix/Daemon/ShutdownLatch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,70 @@ module Cachix.Daemon.ShutdownLatch
newShutdownLatch,
waitForShutdown,
initiateShutdown,
initiateShutdownWithResult,
getResult,
isShuttingDown,
-- STM operations
isShuttingDownSTM,
initiateShutdownSTM,
initiateShutdownWithResultSTM,
getResultSTM,
waitForShutdownSTM,
)
where

import Control.Concurrent.MVar
import Control.Concurrent.STM
import Protolude

-- | A latch to keep track of the shutdown process.
newtype ShutdownLatch = ShutdownLatch {unShutdownLatch :: MVar ()}
-- A shutdown latch holds a result value (Either e a) when shutdown is initiated.
-- Nothing means that shutdown has not been requested yet.
newtype ShutdownLatch e a = ShutdownLatch {unShutdownLatch :: TVar (Maybe (Either e a))}

newShutdownLatch :: (MonadIO m) => m ShutdownLatch
newShutdownLatch = ShutdownLatch <$> liftIO newEmptyMVar
-- | Create a new shutdown latch
newShutdownLatch :: (MonadIO m) => m (ShutdownLatch e a)
newShutdownLatch = ShutdownLatch <$> liftIO (newTVarIO Nothing)

waitForShutdown :: (MonadIO m) => ShutdownLatch -> m ()
waitForShutdown = liftIO . readMVar . unShutdownLatch
-- | Block until shutdown is requested and return the result
waitForShutdown :: (MonadIO m) => ShutdownLatch e a -> m (Either e a)
waitForShutdown latch = liftIO $ atomically $ waitForShutdownSTM latch

initiateShutdown :: (MonadIO m) => ShutdownLatch -> m ()
initiateShutdown = void . liftIO . flip tryPutMVar () . unShutdownLatch
-- | Signal shutdown with a "success" result
initiateShutdown :: (MonadIO m) => a -> ShutdownLatch e a -> m ()
initiateShutdown val latch = liftIO $ atomically $ initiateShutdownSTM val latch

isShuttingDown :: (MonadIO m) => ShutdownLatch -> m Bool
isShuttingDown = liftIO . fmap not . isEmptyMVar . unShutdownLatch
-- | Signal shutdown with a specific result
initiateShutdownWithResult :: (MonadIO m) => Either e a -> ShutdownLatch e a -> m ()
initiateShutdownWithResult result latch = liftIO $ atomically $ initiateShutdownWithResultSTM result latch

-- | Get the shutdown result if available
getResult :: (MonadIO m) => ShutdownLatch e a -> m (Maybe (Either e a))
getResult latch = liftIO $ atomically $ getResultSTM latch

-- | Check if shutdown has been requested
isShuttingDown :: (MonadIO m) => ShutdownLatch e a -> m Bool
isShuttingDown latch = liftIO $ atomically $ isShuttingDownSTM latch

-- STM Operations for use in atomic transactions

-- | Check if shutdown is requested
isShuttingDownSTM :: ShutdownLatch e a -> STM Bool
isShuttingDownSTM latch = isJust <$> readTVar (unShutdownLatch latch)

-- | Signal shutdown with a "success" result
initiateShutdownSTM :: a -> ShutdownLatch e a -> STM ()
initiateShutdownSTM val latch = writeTVar (unShutdownLatch latch) (Just (Right val))

-- | Signal shutdown with a specific result
initiateShutdownWithResultSTM :: Either e a -> ShutdownLatch e a -> STM ()
initiateShutdownWithResultSTM result latch = writeTVar (unShutdownLatch latch) (Just result)

-- | Get the shutdown result (if available)
getResultSTM :: ShutdownLatch e a -> STM (Maybe (Either e a))
getResultSTM = readTVar . unShutdownLatch

-- | Block until shutdown is requested, then return the result
waitForShutdownSTM :: ShutdownLatch e a -> STM (Either e a)
waitForShutdownSTM latch = do
mresult <- readTVar (unShutdownLatch latch)
maybe retry return mresult
4 changes: 4 additions & 0 deletions cachix/src/Cachix/Daemon/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ module Cachix.Daemon.Types
DaemonError (..),
HasExitCode (..),

-- * EventLoop errors
EventLoop.EventLoopError (..),

-- * Log
LogLevel (..),

Expand All @@ -27,6 +30,7 @@ where

import Cachix.Daemon.Types.Daemon
import Cachix.Daemon.Types.Error
import Cachix.Daemon.Types.EventLoop as EventLoop
import Cachix.Daemon.Types.Log
import Cachix.Daemon.Types.PushEvent as PushEvent
import Cachix.Daemon.Types.PushManager as PushManager
2 changes: 0 additions & 2 deletions cachix/src/Cachix/Daemon/Types/Daemon.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ data DaemonEnv = DaemonEnv
daemonSubscriptionManagerThread :: MVar (Async ()),
-- | Logging env
daemonLogger :: Logger,
-- | Shutdown latch
daemonShutdownLatch :: ShutdownLatch,
-- | The PID of the daemon process
daemonPid :: ProcessID
}
Expand Down
9 changes: 3 additions & 6 deletions cachix/src/Cachix/Daemon/Types/EventLoop.hs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
module Cachix.Daemon.Types.EventLoop
( EventLoop (..),
EventLoopError (..),
ExitLatch,
)
where

import Cachix.Daemon.ShutdownLatch (ShutdownLatch)
import Control.Concurrent.STM.TBMQueue (TBMQueue)
import Protolude

-- | An event loop that processes a queue of events.
data EventLoop event output = EventLoop
{ queue :: TBMQueue event,
exitLatch :: ExitLatch output
-- | Latch for signaling shutdown and returning results
shutdownLatch :: ShutdownLatch EventLoopError output
}

-- | An exit latch is a semaphore that signals the event loop to exit.
-- The exit code should be returned by the 'EventLoop'.
type ExitLatch a = MVar (Either EventLoopError a)

data EventLoopError
= EventLoopClosed
| EventLoopFull
Expand Down
11 changes: 5 additions & 6 deletions cachix/src/Cachix/Deploy/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ withAgentLock agent action =
installSignalHandlers :: IO () -> IO ()
installSignalHandlers shutdown = do
mainThreadId <- myThreadId
sigintAttemptedRef <- newIORef False
-- Track Ctrl+C attempts
interruptRef <- newIORef False

let safeShutdown = do
-- Timeout after 30 seconds to ensure we don't hang
Expand All @@ -209,12 +210,10 @@ installSignalHandlers shutdown = do
Signals.installHandler
Signals.sigINT
( Signals.Catch $ do
sigintAttempted <- readIORef sigintAttemptedRef
if sigintAttempted
isSecondInterrupt <- liftIO $ atomicModifyIORef' interruptRef (True,)
if isSecondInterrupt
then throwTo mainThreadId ExitSuccess
else do
atomicWriteIORef sigintAttemptedRef True
safeShutdown
else safeShutdown
)
Nothing

Expand Down
0