Commit ab96c98d authored by Charles Ferguson's avatar Charles Ferguson
Browse files

Updates to handle errno properly. Improved failure case errors in tests.

When the error number was used on Linux and BSD, the values for the
Resource Unavailable (EWOULDBLOCK) were different. For no good reason,
this was using a hard-coded number, when it should have used a symbol.
This fixes a few of the linux tests.

The failure cases when the threaded tests were not working were not great.
They would result in hanging processes, which didn't help us very much.
The processes are now tracked and we try to destroy as part of the tear
down code.

The linux and darwin systems have different implementations of the
'netstat' tool we are using for checking that the threaded version does
stream properly. So these differences are very roughly abstracted. This
should be done better in the future, when we add Windows support.
parent 6538fd3c
...@@ -38,6 +38,7 @@ Example usage ...@@ -38,6 +38,7 @@ Example usage
print "Time: %s, Output: %s" % (chunk[0], chunk[1]) print "Time: %s, Output: %s" % (chunk[0], chunk[1])
""" """
import errno
import os import os
import select import select
import subprocess import subprocess
...@@ -272,7 +273,7 @@ class StreamedInput(object): ...@@ -272,7 +273,7 @@ class StreamedInput(object):
try: try:
data = self.proc.stdout.read(self.buffer_size) data = self.proc.stdout.read(self.buffer_size)
except IOError as ex: except IOError as ex:
if ex.errno == 35: # Resource unavailable (no data present) if ex.errno == errno.EWOULDBLOCK: # Resource unavailable (no data present)
break break
raise raise
if not data: if not data:
......
...@@ -420,6 +420,14 @@ class Test80Repr(BaseStreamedInput): ...@@ -420,6 +420,14 @@ class Test80Repr(BaseStreamedInput):
class BaseThreadedInput(BaseStreamedInput): class BaseThreadedInput(BaseStreamedInput):
"""
Base class which replaces the implementation of StreamedInputClass as called,
with the Threaded version. This allows us to check that the classes work without
duplicating code.
Special magic is needed to ensure that we only replace the calls within the
test code with those to the threaded version.
"""
def setUp(self): def setUp(self):
super(BaseThreadedInput, self).setUp() super(BaseThreadedInput, self).setUp()
...@@ -498,35 +506,51 @@ class TestThreaded80Repr(BaseThreadedInput, Test80Repr): ...@@ -498,35 +506,51 @@ class TestThreaded80Repr(BaseThreadedInput, Test80Repr):
class TestThreaded90ThreadControl(BaseThreadedInput): class TestThreaded90ThreadControl(BaseThreadedInput):
def __init__(self, *args, **kwargs):
super(TestThreaded90ThreadControl, self).__init__(*args, **kwargs)
if sys.platform == 'darwin':
self.netstat = ['netstat', '-i', '-w1']
elif sys.platform.startswith('linux'):
self.netstat = ['netstat', '-i', '-c']
else:
raise NotImplementedError
self.si = None
def tearDown(self):
self.si.stop()
super(TestThreaded90ThreadControl, self).tearDown()
def test_001_start_and_stop_long_process(self): def test_001_start_and_stop_long_process(self):
si = streamedinput.StreamedInput(['netstat', '-i', '-w1']) self.si = streamedinput.StreamedInput(self.netstat)
si.start() self.si.start()
time.sleep(0.1) time.sleep(0.1)
initial_datalen = len(si) initial_datalen = len(self.si)
time.sleep(1) time.sleep(1.2)
second_datalen = len(si) second_datalen = len(self.si)
self.assertGreater(second_datalen, initial_datalen, "We should have got more data by now") self.assertGreater(second_datalen, initial_datalen, "We should have got more data by now")
# Read all the data # Read all the data
chunks = 0 chunks = 0
while si.read(): while self.si.read():
chunks += 1 chunks += 1
self.assertLess(chunks, 20, "We should not have read this much data") self.assertLess(chunks, 20, "We should not have read this much data")
self.assertFalse(si.eof(), "We shouldn't be at the end of the input yet") self.assertFalse(self.si.eof(), "We shouldn't be at the end of the input yet")
si.stop() self.si.stop()
time.sleep(0.1) time.sleep(0.1)
chunks = 0 chunks = 0
while si.read(): while self.si.read():
chunks += 1 chunks += 1
self.assertLess(chunks, 5, "We should not have read this much data") self.assertLess(chunks, 5, "We should not have read this much data")
self.assertTrue(si.eof(), "We should now be at the end of the list, and the process should be gone") self.assertTrue(self.si.eof(), "We should now be at the end of the list, and the process should be gone")
def test_002_context_handler_stops(self): def test_002_context_handler_stops(self):
with streamedinput.ThreadedStreamedInput(['netstat', '-i', '-w1']) as si: with streamedinput.ThreadedStreamedInput(self.netstat) as self.si:
test_process = psutil.Process(si.proc.pid) test_process = psutil.Process(self.si.proc.pid)
self.assertTrue(si.is_running(), "Process should be running") self.assertTrue(self.si.is_running(), "Process should be running")
# Wait a moment for the thread to terminate # Wait a moment for the thread to terminate
time.sleep(0.1) time.sleep(0.1)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment