From 21f3d35eab0a4139c881a3e74fbd2fcf48d7555f Mon Sep 17 00:00:00 2001 From: Sander Date: Thu, 15 May 2025 13:36:48 +0200 Subject: [PATCH 1/4] cachix: increase event loop size --- cachix/src/Cachix/Daemon/EventLoop.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cachix/src/Cachix/Daemon/EventLoop.hs b/cachix/src/Cachix/Daemon/EventLoop.hs index cea65021..74d163ab 100644 --- a/cachix/src/Cachix/Daemon/EventLoop.hs +++ b/cachix/src/Cachix/Daemon/EventLoop.hs @@ -22,7 +22,7 @@ import Protolude new :: (MonadIO m) => m (EventLoop event a) new = do exitLatch <- liftIO newEmptyMVar - queue <- liftIO $ newTBMQueueIO 100 + queue <- liftIO $ newTBMQueueIO 100_000 return $ EventLoop {queue, exitLatch} -- | Send an event to the event loop with logging. From 8f59ab21e602517b0031109f13d04df25356ef79 Mon Sep 17 00:00:00 2001 From: Sander Date: Thu, 15 May 2025 14:27:39 +0200 Subject: [PATCH 2/4] daemon: harden event loop to respond to shutdown latch immediately Use STM to short-circuit reading from the event queue if the latch is triggered. --- cachix/src/Cachix/Daemon/EventLoop.hs | 88 ++++++++++++++------- cachix/src/Cachix/Daemon/ShutdownLatch.hs | 66 +++++++++++++--- cachix/src/Cachix/Daemon/Types/EventLoop.hs | 9 +-- 3 files changed, 118 insertions(+), 45 deletions(-) diff --git a/cachix/src/Cachix/Daemon/EventLoop.hs b/cachix/src/Cachix/Daemon/EventLoop.hs index 74d163ab..bfa96388 100644 --- a/cachix/src/Cachix/Daemon/EventLoop.hs +++ b/cachix/src/Cachix/Daemon/EventLoop.hs @@ -4,11 +4,15 @@ module Cachix.Daemon.EventLoop sendIO, run, exitLoopWith, + exitLoopWithFailure, EventLoop, ) where +import Cachix.Daemon.ShutdownLatch (ShutdownLatch) +import Cachix.Daemon.ShutdownLatch qualified as ShutdownLatch import Cachix.Daemon.Types.EventLoop (EventLoop (..), EventLoopError (..)) +import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue ( isFullTBMQueue, newTBMQueueIO, @@ -21,9 +25,9 @@ import Protolude new :: (MonadIO m) => m (EventLoop event a) new = do - exitLatch <- liftIO newEmptyMVar + shutdownLatch <- ShutdownLatch.newShutdownLatch queue <- liftIO $ newTBMQueueIO 100_000 - return $ EventLoop {queue, exitLatch} + return $ EventLoop {queue, shutdownLatch} -- | Send an event to the event loop with logging. send :: (Katip.KatipContext m) => EventLoop event a -> event -> m () @@ -38,41 +42,67 @@ sendIO = send' logger logger _ _ = return () send' :: (MonadIO m) => (Katip.Severity -> Katip.LogStr -> m ()) -> EventLoop event a -> event -> m () -send' logger eventloop@(EventLoop {queue}) event = do - res <- liftIO $ atomically $ tryWriteTBMQueue queue event - case res of - -- The queue is closed. - Nothing -> - logger Katip.DebugS "Ignored an event because the event loop is closed" - -- Successfully wrote to the queue - Just True -> return () - -- Failed to write to the queue - Just False -> do - isFull <- liftIO $ atomically $ isFullTBMQueue queue - let message = - if isFull - then "Event loop is full" - else "Unknown error" - logger Katip.ErrorS $ "Failed to write to event loop: " <> message - exitLoopWithFailure EventLoopFull eventloop +send' logger eventloop@(EventLoop {queue, shutdownLatch}) event = do + -- First check if shutdown has been requested + isExiting <- ShutdownLatch.isShuttingDown shutdownLatch + if isExiting + then logger Katip.DebugS "Ignored an event because the event loop is shutting down" + else do + res <- liftIO $ atomically $ tryWriteTBMQueue queue event + case res of + -- The queue is closed. + Nothing -> + logger Katip.DebugS "Ignored an event because the event loop is closed" + -- Successfully wrote to the queue + Just True -> return () + -- Failed to write to the queue + Just False -> do + isFull <- liftIO $ atomically $ isFullTBMQueue queue + let message = + if isFull + then "Event loop is full" + else "Unknown error" + logger Katip.ErrorS $ "Failed to write to event loop: " <> message + exitLoopWithFailure EventLoopFull eventloop -- | Run the event loop until it exits with 'exitLoopWith'. run :: (MonadIO m) => EventLoop event a -> (event -> m ()) -> m (Either EventLoopError a) -run eventloop f = do +run eventloop@(EventLoop {queue, shutdownLatch}) f = fix $ \loop -> do - mevent <- liftIO $ atomically $ readTBMQueue (queue eventloop) - case mevent of - Just event -> f event - Nothing -> exitLoopWithFailure EventLoopClosed eventloop + -- Wait for either a shutdown signal or a message from the queue + eitherResult <- + liftIO $ + atomically $ + fmap Left (ShutdownLatch.waitForShutdownSTM shutdownLatch) + `orElse` + -- Try to read from queue + ( do + mevent <- readTBMQueue queue + case mevent of + -- Got an event, return it + Just event -> return $ Right event + -- Queue is closed, signal shutdown + Nothing -> do + ShutdownLatch.initiateShutdownWithResultSTM (Left EventLoopClosed) shutdownLatch + result <- ShutdownLatch.waitForShutdownSTM shutdownLatch + return $ Left result + ) - liftIO (tryReadMVar (exitLatch eventloop)) >>= \case - Just exitValue -> return exitValue - Nothing -> loop + -- Process the result + case eitherResult of + -- Shutdown requested, return the result + Left result -> return result + -- Got an event, process it and continue looping + Right event -> do + f event + loop -- | Short-circuit the event loop and exit with a given return value. exitLoopWith :: (MonadIO m) => a -> EventLoop event a -> m () -exitLoopWith exitValue (EventLoop {exitLatch}) = void $ liftIO $ tryPutMVar exitLatch (Right exitValue) +exitLoopWith exitValue (EventLoop {shutdownLatch}) = + ShutdownLatch.initiateShutdown exitValue shutdownLatch -- | Short-circuit the event loop in case of an internal error. exitLoopWithFailure :: (MonadIO m) => EventLoopError -> EventLoop event a -> m () -exitLoopWithFailure err (EventLoop {exitLatch}) = void $ liftIO $ tryPutMVar exitLatch (Left err) +exitLoopWithFailure err (EventLoop {shutdownLatch}) = + ShutdownLatch.initiateShutdownWithResult (Left err) shutdownLatch diff --git a/cachix/src/Cachix/Daemon/ShutdownLatch.hs b/cachix/src/Cachix/Daemon/ShutdownLatch.hs index 5d456b5b..d00f0d8d 100644 --- a/cachix/src/Cachix/Daemon/ShutdownLatch.hs +++ b/cachix/src/Cachix/Daemon/ShutdownLatch.hs @@ -3,24 +3,70 @@ module Cachix.Daemon.ShutdownLatch newShutdownLatch, waitForShutdown, initiateShutdown, + initiateShutdownWithResult, + getResult, isShuttingDown, + -- STM operations + isShuttingDownSTM, + initiateShutdownSTM, + initiateShutdownWithResultSTM, + getResultSTM, + waitForShutdownSTM, ) where -import Control.Concurrent.MVar +import Control.Concurrent.STM import Protolude -- | A latch to keep track of the shutdown process. -newtype ShutdownLatch = ShutdownLatch {unShutdownLatch :: MVar ()} +-- A shutdown latch holds a result value (Either e a) when shutdown is initiated. +-- Nothing means that shutdown has not been requested yet. +newtype ShutdownLatch e a = ShutdownLatch {unShutdownLatch :: TVar (Maybe (Either e a))} -newShutdownLatch :: (MonadIO m) => m ShutdownLatch -newShutdownLatch = ShutdownLatch <$> liftIO newEmptyMVar +-- | Create a new shutdown latch +newShutdownLatch :: (MonadIO m) => m (ShutdownLatch e a) +newShutdownLatch = ShutdownLatch <$> liftIO (newTVarIO Nothing) -waitForShutdown :: (MonadIO m) => ShutdownLatch -> m () -waitForShutdown = liftIO . readMVar . unShutdownLatch +-- | Block until shutdown is requested and return the result +waitForShutdown :: (MonadIO m) => ShutdownLatch e a -> m (Either e a) +waitForShutdown latch = liftIO $ atomically $ waitForShutdownSTM latch -initiateShutdown :: (MonadIO m) => ShutdownLatch -> m () -initiateShutdown = void . liftIO . flip tryPutMVar () . unShutdownLatch +-- | Signal shutdown with a "success" result +initiateShutdown :: (MonadIO m) => a -> ShutdownLatch e a -> m () +initiateShutdown val latch = liftIO $ atomically $ initiateShutdownSTM val latch -isShuttingDown :: (MonadIO m) => ShutdownLatch -> m Bool -isShuttingDown = liftIO . fmap not . isEmptyMVar . unShutdownLatch +-- | Signal shutdown with a specific result +initiateShutdownWithResult :: (MonadIO m) => Either e a -> ShutdownLatch e a -> m () +initiateShutdownWithResult result latch = liftIO $ atomically $ initiateShutdownWithResultSTM result latch + +-- | Get the shutdown result if available +getResult :: (MonadIO m) => ShutdownLatch e a -> m (Maybe (Either e a)) +getResult latch = liftIO $ atomically $ getResultSTM latch + +-- | Check if shutdown has been requested +isShuttingDown :: (MonadIO m) => ShutdownLatch e a -> m Bool +isShuttingDown latch = liftIO $ atomically $ isShuttingDownSTM latch + +-- STM Operations for use in atomic transactions + +-- | Check if shutdown is requested +isShuttingDownSTM :: ShutdownLatch e a -> STM Bool +isShuttingDownSTM latch = isJust <$> readTVar (unShutdownLatch latch) + +-- | Signal shutdown with a "success" result +initiateShutdownSTM :: a -> ShutdownLatch e a -> STM () +initiateShutdownSTM val latch = writeTVar (unShutdownLatch latch) (Just (Right val)) + +-- | Signal shutdown with a specific result +initiateShutdownWithResultSTM :: Either e a -> ShutdownLatch e a -> STM () +initiateShutdownWithResultSTM result latch = writeTVar (unShutdownLatch latch) (Just result) + +-- | Get the shutdown result (if available) +getResultSTM :: ShutdownLatch e a -> STM (Maybe (Either e a)) +getResultSTM = readTVar . unShutdownLatch + +-- | Block until shutdown is requested, then return the result +waitForShutdownSTM :: ShutdownLatch e a -> STM (Either e a) +waitForShutdownSTM latch = do + mresult <- readTVar (unShutdownLatch latch) + maybe retry return mresult diff --git a/cachix/src/Cachix/Daemon/Types/EventLoop.hs b/cachix/src/Cachix/Daemon/Types/EventLoop.hs index ef5a0726..5ebf58bd 100644 --- a/cachix/src/Cachix/Daemon/Types/EventLoop.hs +++ b/cachix/src/Cachix/Daemon/Types/EventLoop.hs @@ -1,23 +1,20 @@ module Cachix.Daemon.Types.EventLoop ( EventLoop (..), EventLoopError (..), - ExitLatch, ) where +import Cachix.Daemon.ShutdownLatch (ShutdownLatch) import Control.Concurrent.STM.TBMQueue (TBMQueue) import Protolude -- | An event loop that processes a queue of events. data EventLoop event output = EventLoop { queue :: TBMQueue event, - exitLatch :: ExitLatch output + -- | Latch for signaling shutdown and returning results + shutdownLatch :: ShutdownLatch EventLoopError output } --- | An exit latch is a semaphore that signals the event loop to exit. --- The exit code should be returned by the 'EventLoop'. -type ExitLatch a = MVar (Either EventLoopError a) - data EventLoopError = EventLoopClosed | EventLoopFull From 4d60969c40bcdcb92c12296934ec92a7bb60426a Mon Sep 17 00:00:00 2001 From: Sander Date: Thu, 15 May 2025 15:09:01 +0200 Subject: [PATCH 3/4] daemon: force shutdown on sigterm Re-use the event loop for shutdown, since that's the main run loop of the daemon anyway. Remove the unused daemon shutdown latch. We now also gracefully handle sigINT. --- cachix/src/Cachix/Daemon.hs | 62 ++++++++++++++++++------ cachix/src/Cachix/Daemon/Types.hs | 4 ++ cachix/src/Cachix/Daemon/Types/Daemon.hs | 2 - 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/cachix/src/Cachix/Daemon.hs b/cachix/src/Cachix/Daemon.hs index a9e6399a..4d11728a 100644 --- a/cachix/src/Cachix/Daemon.hs +++ b/cachix/src/Cachix/Daemon.hs @@ -23,7 +23,6 @@ import Cachix.Daemon.Log qualified as Log import Cachix.Daemon.Protocol as Protocol import Cachix.Daemon.Push as Push import Cachix.Daemon.PushManager qualified as PushManager -import Cachix.Daemon.ShutdownLatch import Cachix.Daemon.SocketStore qualified as SocketStore import Cachix.Daemon.Subscription as Subscription import Cachix.Daemon.Types as Types @@ -33,6 +32,7 @@ import Cachix.Types.BinaryCache (BinaryCacheName) import Cachix.Types.BinaryCache qualified as BinaryCache import Control.Concurrent.STM.TMChan import Control.Exception.Safe (catchAny) +import Data.IORef (IORef, atomicModifyIORef', newIORef) import Data.Text qualified as T import Hercules.CNix.Store (Store, withStore) import Hercules.CNix.Util qualified as CNix.Util @@ -43,7 +43,7 @@ import Protolude hiding (bracket) import System.IO.Error (isResourceVanishedError) import System.Posix.Process (getProcessID) import System.Posix.Signals qualified as Signal -import UnliftIO (MonadUnliftIO) +import UnliftIO (MonadUnliftIO, withRunInIO) import UnliftIO.Async qualified as Async import UnliftIO.Exception (bracket) @@ -71,7 +71,6 @@ new daemonEnv nixStore daemonOptions daemonLogHandle daemonPushOptions daemonCac daemonLogger <- Log.new "cachix.daemon" daemonLogHandle daemonLogLevel daemonEventLoop <- EventLoop.new - daemonShutdownLatch <- newShutdownLatch daemonPid <- getProcessID daemonSocketPath <- maybe getSocketPath pure (Options.daemonSocketPath daemonOptions) @@ -99,7 +98,7 @@ start :: Env -> DaemonOptions -> PushOptions -> BinaryCacheName -> IO () start daemonEnv daemonOptions daemonPushOptions daemonCacheName = withStore $ \store -> do daemon <- new daemonEnv store daemonOptions Nothing daemonPushOptions daemonCacheName - installSignalHandlers daemon + void $ runDaemon daemon installSignalHandlers result <- run daemon exitWith (toExitCode result) @@ -156,14 +155,52 @@ stopIO :: DaemonEnv -> IO () stopIO DaemonEnv {daemonEventLoop} = EventLoop.sendIO daemonEventLoop ShutdownGracefully -installSignalHandlers :: DaemonEnv -> IO () -installSignalHandlers daemon = do - for_ [Signal.sigTERM, Signal.sigINT] $ \signal -> - Signal.installHandler signal (Signal.CatchOnce handler) Nothing +installSignalHandlers :: Daemon () +installSignalHandlers = do + withRunInIO $ \runInIO -> do + mainThreadId <- myThreadId + -- Track Ctrl+C attempts + interruptRef <- newIORef False + + -- Install signal handlers using runInIO to properly run Daemon actions from IO + _ <- Signal.installHandler Signal.sigTERM (Signal.Catch (runInIO (termHandler mainThreadId))) Nothing + _ <- Signal.installHandler Signal.sigINT (Signal.Catch (runInIO (intHandler mainThreadId interruptRef))) Nothing + + return () where - handler = do - CNix.Util.triggerInterrupt - stopIO daemon + -- SIGTERM: Trigger immediate shutdown + termHandler :: ThreadId -> Daemon () + termHandler mainThreadId = do + Katip.logFM Katip.InfoS "sigTERM received. Exiting immediately..." + startExitTimer mainThreadId + liftIO CNix.Util.triggerInterrupt + eventLoop <- asks daemonEventLoop + -- Signal directly to the event loop to ensure exit even if queue is full + EventLoop.exitLoopWithFailure EventLoopClosed eventLoop + + -- SIGINT: First try to shutdown gracefully, on second press force exit + intHandler :: ThreadId -> IORef Bool -> Daemon () + intHandler mainThreadId interruptRef = do + liftIO CNix.Util.triggerInterrupt + isSecondInterrupt <- liftIO $ atomicModifyIORef' interruptRef (True,) + eventLoop <- asks daemonEventLoop + + if isSecondInterrupt + then do + Katip.logFM Katip.InfoS "Exiting immediately..." + startExitTimer mainThreadId + -- Force shutdown at the event loop level to ensure exit even if queue is full + EventLoop.exitLoopWithFailure EventLoopClosed eventLoop + else do + Katip.logFM Katip.InfoS "Shutting down gracefully (Ctrl+C again to force exit)..." + EventLoop.send eventLoop ShutdownGracefully + + -- Start a timer to ensure we exit even if the event loop hangs + startExitTimer :: ThreadId -> Daemon () + startExitTimer mainThreadId = do + void $ liftIO $ forkIO $ do + threadDelay (15 * 1000 * 1000) -- 15 seconds + throwTo mainThreadId ExitSuccess queueJob :: Protocol.PushRequest -> Daemon () queueJob pushRequest = do @@ -208,9 +245,6 @@ shutdownGracefully :: Daemon (Either DaemonError ()) shutdownGracefully = do DaemonEnv {..} <- ask - -- Indicate that the daemon is shutting down - initiateShutdown daemonShutdownLatch - -- Stop the push manager and wait for any remaining paths to be uploaded shutdownPushManager daemonPushManager diff --git a/cachix/src/Cachix/Daemon/Types.hs b/cachix/src/Cachix/Daemon/Types.hs index 74ae03c2..76e783fa 100644 --- a/cachix/src/Cachix/Daemon/Types.hs +++ b/cachix/src/Cachix/Daemon/Types.hs @@ -9,6 +9,9 @@ module Cachix.Daemon.Types DaemonError (..), HasExitCode (..), + -- * EventLoop errors + EventLoop.EventLoopError (..), + -- * Log LogLevel (..), @@ -27,6 +30,7 @@ where import Cachix.Daemon.Types.Daemon import Cachix.Daemon.Types.Error +import Cachix.Daemon.Types.EventLoop as EventLoop import Cachix.Daemon.Types.Log import Cachix.Daemon.Types.PushEvent as PushEvent import Cachix.Daemon.Types.PushManager as PushManager diff --git a/cachix/src/Cachix/Daemon/Types/Daemon.hs b/cachix/src/Cachix/Daemon/Types/Daemon.hs index 5f8d58cf..3af6485a 100644 --- a/cachix/src/Cachix/Daemon/Types/Daemon.hs +++ b/cachix/src/Cachix/Daemon/Types/Daemon.hs @@ -73,8 +73,6 @@ data DaemonEnv = DaemonEnv daemonSubscriptionManagerThread :: MVar (Async ()), -- | Logging env daemonLogger :: Logger, - -- | Shutdown latch - daemonShutdownLatch :: ShutdownLatch, -- | The PID of the daemon process daemonPid :: ProcessID } From 7a846cc7c620b51a09bd9716650a605288da7243 Mon Sep 17 00:00:00 2001 From: Sander Date: Thu, 15 May 2025 16:08:40 +0200 Subject: [PATCH 4/4] lint --- cachix/src/Cachix/Deploy/Agent.hs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cachix/src/Cachix/Deploy/Agent.hs b/cachix/src/Cachix/Deploy/Agent.hs index ba58d3ff..79c1987c 100644 --- a/cachix/src/Cachix/Deploy/Agent.hs +++ b/cachix/src/Cachix/Deploy/Agent.hs @@ -188,7 +188,8 @@ withAgentLock agent action = installSignalHandlers :: IO () -> IO () installSignalHandlers shutdown = do mainThreadId <- myThreadId - sigintAttemptedRef <- newIORef False + -- Track Ctrl+C attempts + interruptRef <- newIORef False let safeShutdown = do -- Timeout after 30 seconds to ensure we don't hang @@ -209,12 +210,10 @@ installSignalHandlers shutdown = do Signals.installHandler Signals.sigINT ( Signals.Catch $ do - sigintAttempted <- readIORef sigintAttemptedRef - if sigintAttempted + isSecondInterrupt <- liftIO $ atomicModifyIORef' interruptRef (True,) + if isSecondInterrupt then throwTo mainThreadId ExitSuccess - else do - atomicWriteIORef sigintAttemptedRef True - safeShutdown + else safeShutdown ) Nothing