Commit 2d52ba98 authored by Charles Ferguson's avatar Charles Ferguson
Browse files

Updates to handle errno properly. Improved failure case errors in tests.

When the error number was used on Linux and BSD, the values for the
Resource Unavailable (EWOULDBLOCK) were different. For no good reason,
this was using a hard-coded number, when it should have used a symbol.
This fixes a few of the linux tests.

The failure cases when the threaded tests were not working were not great.
They would result in hanging processes, which didn't help us very much.
The processes are now tracked and we try to destroy as part of the tear
down code.

The linux and darwin systems have different implementations of the
'netstat' tool we are using for checking that the threaded version does
stream properly. So these differences are very roughly abstracted. This
should be done better in the future, when we add Windows support.
parent 27ae5d02
......@@ -38,6 +38,7 @@ Example usage
print "Time: %s, Output: %s" % (chunk[0], chunk[1])
import errno
import os
import select
import shlex
......@@ -278,7 +279,7 @@ class StreamedInput(object):
data =
except IOError as ex:
if ex.errno == 35: # Resource unavailable (no data present)
if ex.errno == errno.EWOULDBLOCK: # Resource unavailable (no data present)
if not data:
......@@ -425,6 +425,14 @@ class Test80Repr(BaseStreamedInput):
class BaseThreadedInput(BaseStreamedInput):
Base class which replaces the implementation of StreamedInputClass as called,
with the Threaded version. This allows us to check that the classes work without
duplicating code.
Special magic is needed to ensure that we only replace the calls within the
test code with those to the threaded version.
def setUp(self):
super(BaseThreadedInput, self).setUp()
......@@ -503,35 +511,51 @@ class TestThreaded80Repr(BaseThreadedInput, Test80Repr):
class TestThreaded90ThreadControl(BaseThreadedInput):
def __init__(self, *args, **kwargs):
super(TestThreaded90ThreadControl, self).__init__(*args, **kwargs)
if sys.platform == 'darwin':
self.netstat = ['netstat', '-i', '-w1']
elif sys.platform.startswith('linux'):
self.netstat = ['netstat', '-i', '-c']
raise NotImplementedError = None
def tearDown(self):
super(TestThreaded90ThreadControl, self).tearDown()
def test_001_start_and_stop_long_process(self):
si = streamedinput.StreamedInput(['netstat', '-i', '-w1'])
si.start() = streamedinput.StreamedInput(self.netstat)
initial_datalen = len(si)
second_datalen = len(si)
initial_datalen = len(
second_datalen = len(
self.assertGreater(second_datalen, initial_datalen, "We should have got more data by now")
# Read all the data
chunks = 0
chunks += 1
self.assertLess(chunks, 20, "We should not have read this much data")
self.assertFalse(si.eof(), "We shouldn't be at the end of the input yet")
self.assertFalse(, "We shouldn't be at the end of the input yet")
chunks = 0
chunks += 1
self.assertLess(chunks, 5, "We should not have read this much data")
self.assertTrue(si.eof(), "We should now be at the end of the list, and the process should be gone")
self.assertTrue(, "We should now be at the end of the list, and the process should be gone")
def test_002_context_handler_stops(self):
with streamedinput.ThreadedStreamedInput(['netstat', '-i', '-w1']) as si:
test_process = psutil.Process(
self.assertTrue(si.is_running(), "Process should be running")
with streamedinput.ThreadedStreamedInput(self.netstat) as
test_process = psutil.Process(
self.assertTrue(, "Process should be running")
# Wait a moment for the thread to terminate
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment