Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
openSUSE:13.1:Update
openstack-glance-doc
glance-dont-test-qpid.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File glance-dont-test-qpid.patch of Package openstack-glance-doc
diff -ruN a/glance/tests/unit/test_notifier.py b/glance/tests/unit/test_notifier.py --- a/glance/tests/unit/test_notifier.py 2013-10-01 17:32:17.000000000 +0200 +++ b/glance/tests/unit/test_notifier.py 2013-10-02 10:44:51.371564341 +0200 @@ -18,8 +18,6 @@ import datetime import kombu.entity import mox -import qpid -import qpid.messaging import stubout import webob @@ -314,114 +312,6 @@ self.assertEquals(info['conn_called'], 2) -class TestQpidNotifier(utils.BaseTestCase): - """Test Qpid notifier.""" - - def setUp(self): - super(TestQpidNotifier, self).setUp() - - self.mocker = mox.Mox() - - self.mock_connection = None - self.mock_session = None - self.mock_sender = None - self.mock_receiver = None - - self.orig_connection = qpid.messaging.Connection - self.orig_session = qpid.messaging.Session - self.orig_sender = qpid.messaging.Sender - self.orig_receiver = qpid.messaging.Receiver - qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection - qpid.messaging.Session = lambda *_x, **_y: self.mock_session - qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender - qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver - - self.notify_qpid = importutils.import_module("glance.notifier." - "notify_qpid") - self.addCleanup(self.reset_qpid) - self.addCleanup(self.mocker.ResetAll) - - def reset_qpid(self): - - qpid.messaging.Connection = self.orig_connection - qpid.messaging.Session = self.orig_session - qpid.messaging.Sender = self.orig_sender - qpid.messaging.Receiver = self.orig_receiver - - def _test_notify(self, priority, exception=False, exception_send=False): - test_msg = {'a': 'b'} - - self.mock_connection = self.mocker.CreateMock(self.orig_connection) - self.mock_session = self.mocker.CreateMock(self.orig_session) - self.mock_sender = self.mocker.CreateMock(self.orig_sender) - - self.mock_connection.username = "" - if exception: - self.mock_connection.open().AndRaise( - Exception('Test open Exception')) - else: - self.mock_connection.open() - self.mock_connection.session().AndReturn(self.mock_session) - expected_address = ('glance/notifications.%s ; ' - '{"node": {"x-declare": {"auto-delete": true, ' - '"durable": false}, "type": "topic"}, ' - '"create": "always"}' % priority) - self.mock_session.sender(expected_address).AndReturn( - self.mock_sender) - if exception_send: - self.mock_sender.send(mox.IgnoreArg()).AndRaise( - Exception('Test send Exception')) - # NOTE(afazekas): the opened and close call is expected - # in this case, but not expected if the open fails - else: - self.mock_sender.send(mox.IgnoreArg()) - self.mock_connection.opened().AndReturn(True) - self.mock_connection.close() - - self.mocker.ReplayAll() - - self.config(notifier_strategy="qpid") - notifier = self.notify_qpid.QpidStrategy() - if priority == 'info': - if exception or exception_send: - self.assertRaises(Exception, notifier.info, test_msg) - else: - notifier.info(test_msg) - elif priority == 'warn': - if exception or exception_send: - self.assertRaises(Exception, notifier.warn, test_msg) - else: - notifier.warn(test_msg) - elif priority == 'error': - if exception or exception_send: - self.assertRaises(Exception, notifier.error, test_msg) - else: - notifier.error(test_msg) - - self.mocker.VerifyAll() - - def test_info(self): - self._test_notify('info') - - def test_warn(self): - self._test_notify('warn') - - def test_error(self): - self._test_notify('error') - - def test_exception_open_successful(self): - self._test_notify('info', exception=True) - - def test_info_fail(self): - self._test_notify('info', exception_send=True) - - def test_warn_fail(self): - self._test_notify('warn', exception_send=True) - - def test_error_fail(self): - self._test_notify('error', exception_send=True) - - class TestRabbitContentType(utils.BaseTestCase): """Test AMQP/Rabbit notifier works."""
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor