Skip to content

Commit 49c98e4

Browse files
SFR-2249: Refactor and clean up s3 file process (#399)
1 parent fc1ada9 commit 49c98e4

File tree

3 files changed

+228
-235
lines changed

3 files changed

+228
-235
lines changed

processes/s3Files.py

+79-80
Original file line numberDiff line numberDiff line change
@@ -18,112 +18,111 @@ def __init__(self, *args):
1818
super(S3Process, self).__init__(*args[:4])
1919

2020
def runProcess(self):
21-
self.receiveAndProcessMessages()
21+
try:
22+
number_of_processes = 4
23+
file_processes = []
24+
25+
for _ in range(number_of_processes):
26+
file_process = Process(target=S3Process.process_files)
27+
file_process.start()
2228

23-
def receiveAndProcessMessages(self):
24-
processes = 4
25-
epubProcesses = []
26-
for _ in range(processes):
27-
proc = Process(target=S3Process.storeFilesInS3)
28-
proc.start()
29-
epubProcesses.append(proc)
29+
file_processes.append(file_process)
3030

31-
for proc in epubProcesses:
32-
proc.join()
31+
for file_process in file_processes:
32+
file_process.join()
33+
except Exception:
34+
logger.exception('Failed to run S3 Process')
3335

3436
@staticmethod
35-
def storeFilesInS3():
36-
storageManager = S3Manager()
37-
storageManager.createS3Client()
38-
39-
fileQueue = os.environ['FILE_QUEUE']
40-
fileRoute = os.environ['FILE_ROUTING_KEY']
41-
epubConverterURL = os.environ['WEBPUB_CONVERSION_URL']
42-
43-
rabbitManager = RabbitMQManager()
44-
rabbitManager.createRabbitConnection()
45-
rabbitManager.createOrConnectQueue(fileQueue, fileRoute)
46-
47-
bucket = os.environ['FILE_BUCKET']
48-
49-
attempts = 1
50-
while True:
51-
msgProps, _, msgBody = rabbitManager.getMessageFromQueue(fileQueue)
52-
if msgProps is None:
53-
if attempts <= 3:
54-
sleep(30 * attempts)
55-
attempts += 1
56-
continue
37+
def process_files():
38+
storage_manager = S3Manager()
39+
storage_manager.createS3Client()
40+
41+
file_queue = os.environ['FILE_QUEUE']
42+
file_route = os.environ['FILE_ROUTING_KEY']
43+
44+
rabbit_mq_manager = RabbitMQManager()
45+
rabbit_mq_manager.createRabbitConnection()
46+
rabbit_mq_manager.createOrConnectQueue(file_queue, file_route)
47+
48+
s3_file_bucket = os.environ['FILE_BUCKET']
49+
50+
attempts_to_poll = 1
51+
max_poll_attempts = 3
52+
53+
while attempts_to_poll <= max_poll_attempts:
54+
message_props, _, message_body = rabbit_mq_manager.getMessageFromQueue(file_queue)
55+
56+
if not message_props:
57+
if attempts_to_poll <= max_poll_attempts:
58+
wait_time = attempts_to_poll * 30
59+
60+
logger.info(f'Waiting {wait_time}s for S3 file messages')
61+
sleep(wait_time)
62+
63+
attempts_to_poll += 1
5764
else:
65+
logger.info('Exiting S3 process - no more messages.')
5866
break
5967

60-
attempts = 1
68+
continue
69+
70+
attempts_to_poll = 1
6171

62-
fileMeta = json.loads(msgBody)['fileData']
63-
fileURL = fileMeta['fileURL']
64-
filePath = fileMeta['bucketPath']
72+
file_data = json.loads(message_body)['fileData']
73+
file_url = file_data['fileURL']
74+
file_path = file_data['bucketPath']
6575

6676
try:
67-
logger.info('Storing {}'.format(fileURL))
68-
epubB = S3Process.getFileContents(fileURL)
69-
70-
storageManager.putObjectInBucket(epubB, filePath, bucket)
77+
file_contents = S3Process.get_file_contents(file_url)
7178

72-
if '.epub' in filePath:
73-
fileRoot = '.'.join(filePath.split('.')[:-1])
79+
storage_manager.putObjectInBucket(file_contents, file_path, s3_file_bucket)
80+
81+
del file_contents
7482

75-
webpubManifest = S3Process.generateWebpub(
76-
epubConverterURL, fileRoot, bucket
77-
)
83+
if '.epub' in file_path:
84+
file_root = '.'.join(file_path.split('.')[:-1])
7885

79-
storageManager.putObjectInBucket(
80-
webpubManifest,
81-
'{}/manifest.json'.format(fileRoot),
82-
bucket
83-
)
86+
web_pub_manifest = S3Process.generate_webpub(file_root, s3_file_bucket)
8487

85-
rabbitManager.acknowledgeMessageProcessed(msgProps.delivery_tag)
88+
storage_manager.putObjectInBucket(web_pub_manifest, f'{file_root}/manifest.json', s3_file_bucket)
8689

87-
logger.info('Sending Tag {} for {}'.format(fileURL, msgProps.delivery_tag))
90+
rabbit_mq_manager.acknowledgeMessageProcessed(message_props.delivery_tag)
8891

89-
del epubB
90-
except Exception as e:
91-
logger.error('Unable to store file in S3')
92-
logger.debug(e)
92+
logger.info(f'Stored file in S3 for {file_url}')
93+
except Exception:
94+
logger.exception(f'Failed to store file for file url: {file_url}')
9395

9496
@staticmethod
95-
def getFileContents(epubURL):
96-
timeout = 15
97-
epubResp = requests.get(
98-
epubURL,
97+
def get_file_contents(file_url: str):
98+
file_url_response = requests.get(
99+
file_url,
99100
stream=True,
100-
timeout=timeout,
101-
headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5)'}
101+
timeout=15,
102+
headers={ 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5)' }
102103
)
103104

104-
if epubResp.status_code == 200:
105-
content = bytes()
106-
for byteChunk in epubResp.iter_content(1024 * 250):
107-
content += byteChunk
105+
if file_url_response.status_code == 200:
106+
file_contents = bytes()
108107

109-
return content
108+
for byte_chunk in file_url_response.iter_content(1024 * 250):
109+
file_contents += byte_chunk
110110

111-
raise Exception('Unable to fetch ePub file')
111+
return file_contents
112112

113-
@staticmethod
114-
def generateWebpub(converterRoot, fileRoot, bucket):
115-
s3Path = 'https://{}.s3.amazonaws.com/{}/META-INF/container.xml'.format(
116-
bucket, fileRoot
117-
)
113+
raise Exception(f'Unable to fetch file from url: {file_url}')
118114

119-
converterURL = '{}/api/{}'.format(converterRoot, quote_plus(s3Path))
115+
@staticmethod
116+
def generate_webpub(file_root, bucket):
117+
webpub_conversion_url = os.environ['WEBPUB_CONVERSION_URL']
118+
s3_file_path = f'https://{bucket}.s3.amazonaws.com/{file_root}/META-INF/container.xml'
119+
webpub_conversion_url = f'{webpub_conversion_url}/api/{quote_plus(s3_file_path)}'
120120

121121
try:
122-
webpubResp = requests.get(converterURL, timeout=15)
122+
webpub_response = requests.get(webpub_conversion_url, timeout=15)
123123

124-
webpubResp.raise_for_status()
124+
webpub_response.raise_for_status()
125125

126-
return webpubResp.content
127-
except Exception as e:
128-
logger.warning('Unable to generate webpub')
129-
logger.debug(e)
126+
return webpub_response.content
127+
except Exception:
128+
logger.exception(f'Failed to generate webpub for {file_root}')

tests/unit/test_s3_files_process.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import pytest
2+
import requests
3+
4+
from tests.helper import TestHelpers
5+
from processes import S3Process
6+
7+
8+
class TestS3Process:
9+
@classmethod
10+
def setup_class(cls):
11+
TestHelpers.setEnvVars()
12+
13+
@classmethod
14+
def teardown_class(cls):
15+
TestHelpers.clearEnvVars()
16+
17+
@pytest.fixture
18+
def test_instance(self, mocker):
19+
class TestS3Process(S3Process):
20+
def __init__(self, process, customFile, ingestPeriod):
21+
self.bucket = 'testBucket'
22+
23+
return TestS3Process('TestProcess', 'testFile', 'testDate')
24+
25+
@pytest.fixture
26+
def test_file_message(self):
27+
return """
28+
{
29+
"fileData": {
30+
"fileURL": "testSourceURL",
31+
"bucketPath": "testBucketPath.epub"
32+
}
33+
}
34+
"""
35+
36+
def test_run_process(self, test_instance, mocker):
37+
mock_process_files = mocker.patch.object(S3Process, 'process_files')
38+
mock_save_records = mocker.patch.object(S3Process, 'saveRecords')
39+
mock_commit_changes = mocker.patch.object(S3Process, 'commitChanges')
40+
mock_file_process = mocker.MagicMock()
41+
mock_process = mocker.patch('processes.s3Files.Process')
42+
mock_process.return_value = mock_file_process
43+
44+
test_instance.runProcess()
45+
46+
mock_process_files.assert_called_once
47+
mock_save_records.assert_called_once
48+
mock_commit_changes.assert_called_once
49+
assert mock_process.call_count == 4
50+
assert mock_file_process.start.call_count == 4
51+
assert mock_file_process.join.call_count == 4
52+
53+
def test_process_files(self, test_file_message, mocker):
54+
mock_sleep = mocker.patch('processes.s3Files.sleep')
55+
56+
mock_s3 = mocker.MagicMock()
57+
mock_s3_manager = mocker.patch('processes.s3Files.S3Manager')
58+
mock_s3_manager.return_value = mock_s3
59+
60+
mock_rabbit_mq = mocker.MagicMock()
61+
mock_rabbit_mq_manager = mocker.patch('processes.s3Files.RabbitMQManager')
62+
mock_rabbit_mq_manager.return_value = mock_rabbit_mq
63+
mock_message_propse = mocker.MagicMock()
64+
mock_message_propse.delivery_tag = 'rabbitMQTag'
65+
mock_rabbit_mq.getMessageFromQueue.side_effect = [
66+
(mock_message_propse, {}, test_file_message),
67+
(None, None, None),
68+
(None, None, None),
69+
(None, None, None),
70+
(None, None, None)
71+
]
72+
73+
mock_get_file_contents = mocker.patch.object(S3Process, 'get_file_contents')
74+
mock_get_file_contents.return_value = 'testFileBytes'
75+
76+
mock_generate_webpub = mocker.patch.object(S3Process, 'generate_webpub')
77+
mock_generate_webpub.return_value = 'testWebpubJson'
78+
79+
S3Process.process_files()
80+
81+
assert mock_rabbit_mq.getMessageFromQueue.call_count == 4
82+
mock_rabbit_mq.getMessageFromQueue.assert_called_with('test_file_queue')
83+
84+
mock_sleep.assert_has_calls([
85+
mocker.call(30), mocker.call(60), mocker.call(90)
86+
])
87+
88+
mock_generate_webpub.assert_called_once_with('testBucketPath', 'test_aws_bucket')
89+
90+
mock_s3.putObjectInBucket.assert_has_calls([
91+
mocker.call('testFileBytes', 'testBucketPath.epub', 'test_aws_bucket'),
92+
mocker.call('testWebpubJson', 'testBucketPath/manifest.json', 'test_aws_bucket')
93+
])
94+
mock_rabbit_mq.acknowledgeMessageProcessed.assert_called_once_with('rabbitMQTag')
95+
96+
def test_get_file_contents_success(self, test_instance, mocker):
97+
mock_get_request = mocker.patch.object(requests, 'get')
98+
mock_response = mocker.MagicMock()
99+
mock_response.status_code = 200
100+
mock_response.iter_content.return_value = [b'e', b'p', b'u', b'b']
101+
mock_get_request.return_value = mock_response
102+
103+
test_file = test_instance.get_file_contents('testURL')
104+
105+
assert test_file == b'epub'
106+
mock_get_request.assert_called_once_with(
107+
'testURL',
108+
stream=True,
109+
timeout=15,
110+
headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5)'}
111+
)
112+
113+
def test_get_file_contents_error(self, test_instance, mocker):
114+
mock_get_request = mocker.patch.object(requests, 'get')
115+
mock_response = mocker.MagicMock()
116+
mock_response.status_code = 500
117+
mock_get_request.return_value = mock_response
118+
119+
with pytest.raises(Exception):
120+
test_instance.get_file_contents('testURL')
121+
122+
def test_generate_webpub_success(self, mocker):
123+
mock_get_request = mocker.patch.object(requests, 'get')
124+
mock_response = mocker.MagicMock(content='testWebpub')
125+
mock_get_request.return_value = mock_response
126+
127+
test_webpub = S3Process.generate_webpub('testRoot', 'testBucket')
128+
129+
assert test_webpub == 'testWebpub'
130+
131+
mock_get_request.assert_called_once_with(
132+
'test_conversion_url/api/https%3A%2F%2FtestBucket.s3.amazonaws.com%2FtestRoot%2FMETA-INF%2Fcontainer.xml',
133+
timeout=15
134+
)
135+
136+
def test_generate_webpub_error(self, mocker):
137+
mock_get_request = mocker.patch.object(requests, 'get')
138+
mock_response = mocker.MagicMock(content='testWebpub')
139+
mock_response.raise_for_status.side_effect = Exception
140+
mock_get_request.return_value = mock_response
141+
142+
test_webpub = S3Process.generate_webpub('testRoot', 'testBucket')
143+
144+
assert test_webpub == None
145+
146+
mock_get_request.assert_called_once_with(
147+
'test_conversion_url/api/https%3A%2F%2FtestBucket.s3.amazonaws.com%2FtestRoot%2FMETA-INF%2Fcontainer.xml',
148+
timeout=15
149+
)

0 commit comments

Comments
 (0)