From 15cfe48e660dd6b06834652515ca93d973b83fda Mon Sep 17 00:00:00 2001 From: Drew Fradette Date: Fri, 16 Feb 2018 14:36:24 -0500 Subject: [PATCH 1/2] Resignal workers that become stuck. There is a rare case where after an upgrade, a worker will become stuck after being issued a USR2. If the signal_timeout timer doesn't fire for some reason, the child will remain running forever. This changes the signal-timeout timer to process all signaled children and use the `last_signaled_at` to determine if the child process should be killed. This should reduce/eliminate old stuck workers in the long run since each future signal timeout timer will attempt to kill anything that has been signaled and is still running. --- lib/einhorn/command.rb | 29 +++++++------ .../fixtures/signal_timeout/sleepy_server.rb | 22 ++++++++++ .../_lib/helpers/einhorn_helpers.rb | 5 +++ test/integration/upgrading.rb | 43 +++++++++++++++++++ 4 files changed, 86 insertions(+), 13 deletions(-) create mode 100755 test/integration/_lib/fixtures/signal_timeout/sleepy_server.rb diff --git a/lib/einhorn/command.rb b/lib/einhorn/command.rb index 97cb588..6335992 100644 --- a/lib/einhorn/command.rb +++ b/lib/einhorn/command.rb @@ -101,8 +101,8 @@ def self.register_ack(pid) def self.signal_all(signal, children=nil, record=true) children ||= Einhorn::WorkerPool.workers - signaled = {} + Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade) children.each do |child| @@ -116,11 +116,13 @@ def self.signal_all(signal, children=nil, record=true) Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.", :upgrade) end spec[:signaled].add(signal) + spec[:last_signaled_at] = Time.now end begin Process.kill(signal, child) rescue Errno::ESRCH + Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade) else signaled[child] = spec end @@ -128,22 +130,22 @@ def self.signal_all(signal, children=nil, record=true) if Einhorn::State.signal_timeout && record Einhorn::Event::Timer.open(Einhorn::State.signal_timeout) do - children.each do |child| - spec = Einhorn::State.children[child] - next unless spec # Process is already dead and removed by mourn - signaled_spec = signaled[child] - next unless signaled_spec # We got ESRCH when trying to signal - if spec[:spinup_time] != signaled_spec[:spinup_time] - Einhorn.log_info("Different spinup time recorded for #{child} after #{Einhorn::State.signal_timeout}s. This probably indicates a PID rollover.", :upgrade) - next - end + Einhorn::State.children.select{|_, c| c[:signaled].length > 0}.each do |pid, child| + next unless child[:last_signaled_at] - Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.") + now = Time.now + expires_at = child[:last_signaled_at] + Einhorn::State.signal_timeout + next unless now >= expires_at + + Einhorn.log_info("Child #{pid.inspect} was signaled #{child[:last_signaled_at] - now}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade) begin - Process.kill('KILL', child) + Process.kill('KILL', pid) rescue Errno::ESRCH + Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.") end - spec[:signaled].add('KILL') + + child[:signaled].add('KILL') + child[:last_signaled_at] = Time.now end end end @@ -314,6 +316,7 @@ def self.spinup(cmd=nil) :version => Einhorn::State.version, :acked => false, :signaled => Set.new, + :last_signaled_at => nil, :index => index, :spinup_time => Einhorn::State.last_spinup, } diff --git a/test/integration/_lib/fixtures/signal_timeout/sleepy_server.rb b/test/integration/_lib/fixtures/signal_timeout/sleepy_server.rb new file mode 100755 index 0000000..7a06ec2 --- /dev/null +++ b/test/integration/_lib/fixtures/signal_timeout/sleepy_server.rb @@ -0,0 +1,22 @@ +require 'bundler/setup' +require 'socket' +require 'einhorn/worker' + +def einhorn_main + serv = Socket.for_fd(Einhorn::Worker.socket!) + Einhorn::Worker.ack! + + Signal.trap('USR2') do + sleep ENV.fetch("TRAP_SLEEP").to_i + exit + end + + while true + s, _ = serv.accept + s.write($$) + s.flush + s.close + end +end + +einhorn_main if $0 == __FILE__ diff --git a/test/integration/_lib/helpers/einhorn_helpers.rb b/test/integration/_lib/helpers/einhorn_helpers.rb index b95d24f..af5c416 100644 --- a/test/integration/_lib/helpers/einhorn_helpers.rb +++ b/test/integration/_lib/helpers/einhorn_helpers.rb @@ -106,6 +106,11 @@ def find_free_port(host='127.0.0.1') open_port.close end + def get_state(client) + client.send_command('command' => 'state') + YAML.load(client.receive_message['message'])[:state] + end + def wait_for_open_port max_retries = 50 begin diff --git a/test/integration/upgrading.rb b/test/integration/upgrading.rb index 6355d71..05c964a 100644 --- a/test/integration/upgrading.rb +++ b/test/integration/upgrading.rb @@ -154,4 +154,47 @@ class UpgradeTests < EinhornIntegrationTestCase end end end + + describe "with --signal-timeout" do + before do + @dir = prepare_fixture_directory('signal_timeout') + @port = find_free_port + @server_program = File.join(@dir, "sleepy_server.rb") + @socket_path = File.join(@dir, "einhorn.sock") + end + + after { cleanup_fixtured_directories } + + it 'issues a SIGKILL to outdated children when signal-timeout has passed' do + signal_timeout = 3 + sleep_for = 5 + cmd = %W{ + einhorn + -b 127.0.0.1:#{@port} + -d #{@socket_path} + --signal-timeout #{signal_timeout} + -- ruby #{@server_program} + } + + with_running_einhorn(cmd, env: ENV.to_h.merge({'TRAP_SLEEP' => sleep_for.to_s})) do |process| + wait_for_open_port + client = Einhorn::Client.for_path(@socket_path) + einhornsh(%W{-d #{@socket_path} -e upgrade}) + + state = get_state(client) + assert_equal(2, state[:children].count) + signaled_children = state[:children].select{|_,c| c[:signaled].length > 0} + assert_equal(1, signaled_children.length) + + sleep(signal_timeout + 1) + + state = get_state(client) + assert_equal(1, state[:children].count) + signaled_children = state[:children].select{|_,c| c[:signaled].length > 0} + assert_equal(0, signaled_children.length) + + process.terminate + end + end + end end From a061913161914b57f40b18e56c17ba93aa839e73 Mon Sep 17 00:00:00 2001 From: Drew Fradette Date: Wed, 18 Apr 2018 12:08:22 -0400 Subject: [PATCH 2/2] Refactored KILLs to also occur during culling --- lib/einhorn.rb | 8 +++++ lib/einhorn/command.rb | 67 ++++++++++++++++++++++++++--------- lib/einhorn/event.rb | 11 +++++- lib/einhorn/version.rb | 2 +- test/integration/upgrading.rb | 6 ++-- test/unit/einhorn/command.rb | 32 +++++++++++++++++ 6 files changed, 105 insertions(+), 21 deletions(-) diff --git a/lib/einhorn.rb b/lib/einhorn.rb index f21f05a..c5766d5 100644 --- a/lib/einhorn.rb +++ b/lib/einhorn.rb @@ -411,6 +411,14 @@ def self.run Einhorn::State.reloading_for_upgrade = false end + # If setting a signal-timeout, timeout the event loop + # in the same timeframe, ensuring processes are culled + # on a regular basis. + if Einhorn::State.signal_timeout + Einhorn::Event.default_timeout = Einhorn::Event.default_timeout.nil? ? + Einhorn::State.signal_timeout : [Einhorn::State.signal_timeout, Einhorn::Event.default_timeout].min + end + while Einhorn::State.respawn || Einhorn::State.children.size > 0 log_debug("Entering event loop") diff --git a/lib/einhorn/command.rb b/lib/einhorn/command.rb index 6335992..2716b39 100644 --- a/lib/einhorn/command.rb +++ b/lib/einhorn/command.rb @@ -11,7 +11,6 @@ def self.reap begin while true Einhorn.log_debug('Going to reap a child process') - pid = Process.wait(-1, Process::WNOHANG) return unless pid mourn(pid) @@ -122,7 +121,7 @@ def self.signal_all(signal, children=nil, record=true) begin Process.kill(signal, child) rescue Errno::ESRCH - Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade) + Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade) else signaled[child] = spec end @@ -130,29 +129,30 @@ def self.signal_all(signal, children=nil, record=true) if Einhorn::State.signal_timeout && record Einhorn::Event::Timer.open(Einhorn::State.signal_timeout) do - Einhorn::State.children.select{|_, c| c[:signaled].length > 0}.each do |pid, child| - next unless child[:last_signaled_at] - - now = Time.now - expires_at = child[:last_signaled_at] + Einhorn::State.signal_timeout - next unless now >= expires_at + children.each do |child| + spec = Einhorn::State.children[child] + next unless spec # Process is already dead and removed by mourn + signaled_spec = signaled[child] + next unless signaled_spec # We got ESRCH when trying to signal + if spec[:spinup_time] != signaled_spec[:spinup_time] + Einhorn.log_info("Different spinup time recorded for #{child} after #{Einhorn::State.signal_timeout}s. This probably indicates a PID rollover.", :upgrade) + next + end - Einhorn.log_info("Child #{pid.inspect} was signaled #{child[:last_signaled_at] - now}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade) + Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.") begin - Process.kill('KILL', pid) + Process.kill('KILL', child) rescue Errno::ESRCH - Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.") end - - child[:signaled].add('KILL') - child[:last_signaled_at] = Time.now + spec[:signaled].add('KILL') end end - end - "Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}" + Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}") + end end + def self.increment Einhorn::Event.break_loop old = Einhorn::State.config[:number] @@ -491,6 +491,41 @@ def self.cull Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.") signal_all("USR2", excess) end + + # Ensure all signaled workers that have outlived signal_timeout get killed. + kill_expired_signaled_workers if Einhorn::State.signal_timeout + end + + def self.kill_expired_signaled_workers + now = Time.now + children = Einhorn::State.children.select do |_,c| + # Only interested in USR2 signaled workers + next unless c[:signaled] && c[:signaled].length > 0 + next unless c[:signaled].include?('USR2') + + # Ignore processes that have received KILL since it can't be trapped. + next if c[:signaled].include?('KILL') + + # Filter out those children that have not reached signal_timeout yet. + next unless c[:last_signaled_at] + expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout + next unless now >= expires_at + + true + end + + Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0 + children.each do |pid, child| + Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade) + begin + Process.kill('KILL', pid) + rescue Errno::ESRCH + Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.") + end + + child[:signaled].add('KILL') + child[:last_signaled_at] = Time.now + end end def self.stop_respawning diff --git a/lib/einhorn/event.rb b/lib/einhorn/event.rb index 8305ce1..675cefa 100644 --- a/lib/einhorn/event.rb +++ b/lib/einhorn/event.rb @@ -4,6 +4,7 @@ module Einhorn module Event @@loopbreak_reader = nil @@loopbreak_writer = nil + @@default_timeout = nil @@signal_actions = [] @@readable = {} @@writeable = {} @@ -120,7 +121,7 @@ def self.timeout if expires_at = @@timers.keys.sort[0] expires_at - Time.now else - nil + @@default_timeout end end @@ -165,6 +166,14 @@ def self.break_loop Einhorn.log_error("Loop break pipe is full -- probably means that we are quite backlogged") end end + + def self.default_timeout=(val) + @@default_timeout = val.to_i == 0 ? nil : val.to_i + end + + def self.default_timeout + @@default_timeout + end end end diff --git a/lib/einhorn/version.rb b/lib/einhorn/version.rb index 34b7ffc..bac92d0 100644 --- a/lib/einhorn/version.rb +++ b/lib/einhorn/version.rb @@ -1,3 +1,3 @@ module Einhorn - VERSION = '0.7.5' + VERSION = '0.7.6' end diff --git a/test/integration/upgrading.rb b/test/integration/upgrading.rb index 05c964a..ba89175 100644 --- a/test/integration/upgrading.rb +++ b/test/integration/upgrading.rb @@ -166,8 +166,8 @@ class UpgradeTests < EinhornIntegrationTestCase after { cleanup_fixtured_directories } it 'issues a SIGKILL to outdated children when signal-timeout has passed' do - signal_timeout = 3 - sleep_for = 5 + signal_timeout = 2 + sleep_for = 10 cmd = %W{ einhorn -b 127.0.0.1:#{@port} @@ -186,7 +186,7 @@ class UpgradeTests < EinhornIntegrationTestCase signaled_children = state[:children].select{|_,c| c[:signaled].length > 0} assert_equal(1, signaled_children.length) - sleep(signal_timeout + 1) + sleep(signal_timeout * 2) state = get_state(client) assert_equal(1, state[:children].count) diff --git a/test/unit/einhorn/command.rb b/test/unit/einhorn/command.rb index 9d2b9be..beaf810 100644 --- a/test/unit/einhorn/command.rb +++ b/test/unit/einhorn/command.rb @@ -18,4 +18,36 @@ class CommandTest < EinhornTestCase Command.quieter end end + + describe "resignal_timeout" do + it "does not kill any children" do + Einhorn::State.stubs(signal_timeout: 5 * 60) + Einhorn::State.stubs(children: { + 12345 => {last_signaled_at: nil}, + 12346 => {signaled: Set.new(["USR1"]), last_signaled_at: Time.now - (2 * 60)}, + }) + + Process.expects(:kill).never + Einhorn::Command.kill_expired_signaled_workers + + refute(Einhorn::State.children[12346][:signaled].include?("KILL"), "Process was KILLed when it shouldn't have been") + end + + it "KILLs stuck child processes" do + Time.stub :now, Time.at(0) do + Process.stub(:kill, true) do + Einhorn::State.stubs(signal_timeout: 60) + Einhorn::State.stubs(children: { + 12346 => {signaled: Set.new(["USR2"]), last_signaled_at: Time.now - (2 * 60)}, + }) + + Einhorn::Command.kill_expired_signaled_workers + + child = Einhorn::State.children[12346] + assert(child[:signaled].include?("KILL"), "Process was not KILLed as expected") + assert(child[:last_signaled_at] == Time.now, "The last_processed_at was not updated as expected") + end + end + end + end end