8000 Ensure stuck workers are killed by dfradette-stripe · Pull Request #72 · contribsys/einhorn · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Ensure stuck workers are killed #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/einhorn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ def self.run
Einhorn::State.reloading_for_upgrade = false
end

# If setting a signal-timeout, timeout the event loop
# in the same timeframe, ensuring processes are culled
# on a regular basis.
if Einhorn::State.signal_timeout
Einhorn::Event.default_timeout = Einhorn::Event.default_timeout.nil? ?
Einhorn::State.signal_timeout : [Einhorn::State.signal_timeout, Einhorn::Event.default_timeout].min
end

while Einhorn::State.respawn || Einhorn::State.children.size > 0
log_debug("Entering event loop")

Expand Down
46 changes: 42 additions & 4 deletions lib/einhorn/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def self.reap
begin
while true
Einhorn.log_debug('Going to reap a child process')

pid = Process.wait(-1, Process::WNOHANG)
return unless pid
mourn(pid)
Expand Down Expand Up @@ -101,8 +100,8 @@ def self.register_ack(pid)

def self.signal_all(signal, children=nil, record=true)
children ||= Einhorn::WorkerPool.workers

signaled = {}

Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade)

children.each do |child|
Expand All @@ -116,11 +115,13 @@ def self.signal_all(signal, children=nil, record=true)
Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.", :upgrade)
end
spec[:signaled].add(signal)
spec[:last_signaled_at] = Time.now
end

begin
Process.kill(signal, child)
rescue Errno::ESRCH
Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade)
else
signaled[child] = spec
end
Expand All @@ -146,11 +147,12 @@ def self.signal_all(signal, children=nil, record=true)
spec[:signaled].add('KILL')
end
end
end

"Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}"
Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}")
end
end


def self.increment
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
Expand Down Expand Up @@ -314,6 +316,7 @@ def self.spinup(cmd=nil)
:version => Einhorn::State.version,
:acked => false,
:signaled => Set.new,
:last_signaled_at => nil,
:index => index,
:spinup_time => Einhorn::State.last_spinup,
}
Expand Down Expand Up @@ -488,6 +491,41 @@ def self.cull
Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.")
signal_all("USR2", excess)
end

# Ensure all signaled workers that have outlived signal_timeout get killed.
kill_expired_signaled_workers if Einhorn::State.signal_timeout
end

def self.kill_expired_signaled_workers
now = Time.now
children = Einhorn::State.children.select do |_,c|
# Only interested in USR2 signaled workers
next unless c[:signaled] && c[:signaled].length > 0
next unless c[:signaled].include?('USR2')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also filter out the processes that have received KILL already - since the signal can't be trapped, the child process should (barring kernel/machine trouble) always exit.

If einhorn should get a bug where it doesn't remove the child from the list of its children, this seems like a very easy way to set up a trap for later processes that get the same PID later on.


# Ignore processes that have received KILL since it can't be trapped.
next if c[:signaled].include?('KILL')

# Filter out those children that have not reached signal_timeout yet.
next unless c[:last_signaled_at]
expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout
next unless now >= expires_at

true
end

Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0
children.each do |pid, child|
Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade)
begin
Process.kill('KILL', pid)
rescue Errno::ESRCH
Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.")
end

child[:signaled].add('KILL')
child[:last_signaled_at] = Time.now
end
end

def self.stop_respawning
Expand Down
11 changes: 10 additions & 1 deletion lib/einhorn/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Einhorn
module Event
@@loopbreak_reader = nil
@@loopbreak_writer = nil
@@default_timeout = nil
@@signal_actions = []
@@readable = {}
@@writeable = {}
Expand Down Expand Up @@ -120,7 +121,7 @@ def self.timeout
if expires_at = @@timers.keys.sort[0]
expires_at - Time.now
else
nil
@@default_timeout
end
end

Expand Down Expand Up @@ -165,6 +166,14 @@ def self.break_loop
Einhorn.log_error("Loop break pipe is full -- probably means that we are quite backlogged")
end
end

def self.default_timeout=(val)
@@default_timeout = val.to_i == 0 ? nil : val.to_i
end

def self.default_timeout
@@default_timeout
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/einhorn/version.rb
6D4E
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Einhorn
VERSION = '0.7.5'
VERSION = '0.7.6'
end
22 changes: 22 additions & 0 deletions test/integration/_lib/fixtures/signal_timeout/sleepy_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require 'bundler/setup'
require 'socket'
require 'einhorn/worker'

def einhorn_main
serv = Socket.for_fd(Einhorn::Worker.socket!)
Einhorn::Worker.ack!

Signal.trap('USR2') do
sleep ENV.fetch("TRAP_SLEEP").to_i
exit
end

while true
s, _ = serv.accept
s.write($$)
s.flush
s.close
end
end

einhorn_main if $0 == __FILE__
5 changes: 5 additions & 0 deletions test/integration/_lib/helpers/einhorn_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ def find_free_port(host='127.0.0.1')
open_port.close
end

def get_state(client)
client.send_command('command' => 'state')
YAML.load(client.receive_message['message'])[:state]
end

def wait_for_open_port
max_retries = 50
begin
Expand Down
43 changes: 43 additions & 0 deletions test/integration/upgrading.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,47 @@ class UpgradeTests < EinhornIntegrationTestCase
end
end
end

describe F438 "with --signal-timeout" do
before do
@dir = prepare_fixture_directory('signal_timeout')
@port = find_free_port
@server_program = File.join(@dir, "sleepy_server.rb")
@socket_path = File.join(@dir, "einhorn.sock")
end

after { cleanup_fixtured_directories }

it 'issues a SIGKILL to outdated children when signal-timeout has passed' do
signal_timeout = 2
sleep_for = 10
cmd = %W{
einhorn
-b 127.0.0.1:#{@port}
-d #{@socket_path}
--signal-timeout #{signal_timeout}
-- ruby #{@server_program}
}

with_running_einhorn(cmd, env: ENV.to_h.merge({'TRAP_SLEEP' => sleep_for.to_s})) do |process|
wait_for_open_port
client = Einhorn::Client.for_path(@socket_path)
einhornsh(%W{-d #{@socket_path} -e upgrade})

state = get_state(client)
assert_equal(2, state[:children].count)
signaled_children = state[:children].select{|_,c| c[:signaled].length > 0}
assert_equal(1, signaled_children.length)

sleep(signal_timeout * 2)

state = get_state(client)
assert_equal(1, state[:children].count)
signaled_children = state[:children].select{|_,c| c[:signaled].length > 0}
assert_equal(0, signaled_children.length)

process.terminate
end
end
end
end
32 changes: 32 additions & 0 deletions test/unit/einhorn/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,36 @@ class CommandTest < EinhornTestCase
Command.quieter
end
end

describe "resignal_timeout" do
it "does not kill any children" do
Einhorn::State.stubs(signal_timeout: 5 * 60)
Einhorn::State.stubs(children: {
12345 => {last_signaled_at: nil},
12346 => {signaled: Set.new(["USR1"]), last_signaled_at: Time.now - (2 * 60)},
})

Process.expects(:kill).never
Einhorn::Command.kill_expired_signaled_workers

refute(Einhorn::State.children[12346][:signaled].include?("KILL"), "Process was KILLed when it shouldn't have been")
end

it "KILLs stuck child processes" do
Time.stub :now, Time.at(0) do
Process.stub(:kill, true) do
Einhorn::State.stubs(signal_timeout: 60)
Einhorn::State.stubs(children: {
12346 => {signaled: Set.new(["USR2"]), last_signaled_at: Time.now - (2 * 60)},
})

Einhorn::Command.kill_expired_signaled_workers

child = Einhorn::State.children[12346]
assert(child[:signaled].include?("KILL"), "Process was not KILLed as expected")
assert(child[:last_signaled_at] == Time.now, "The last_processed_at was not updated as expected")
end
end
end
end
end
0