Added ability to read emails from *.txt archives
Change-Id: I34095519497f1a0c85b6d640d5457af7ea4df9f9
This commit is contained in:
		| @@ -204,6 +204,7 @@ def _process_repo(repo, runtime_storage_inst, record_processor_inst): | |||||||
|  |  | ||||||
|  |  | ||||||
| def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): | def _process_mail_list(uri, runtime_storage_inst, record_processor_inst): | ||||||
|  |     LOG.info("Processing mail list %s" % uri) | ||||||
|     mail_iterator = mls.log(uri, runtime_storage_inst) |     mail_iterator = mls.log(uri, runtime_storage_inst) | ||||||
|     mail_iterator_typed = _record_typer(mail_iterator, 'email') |     mail_iterator_typed = _record_typer(mail_iterator, 'email') | ||||||
|     processed_mail_iterator = record_processor_inst.process( |     processed_mail_iterator = record_processor_inst.process( | ||||||
|   | |||||||
| @@ -58,8 +58,8 @@ def _get_mail_archive_links(uri): | |||||||
|         LOG.warning('Mail archive list is not found at %s', uri) |         LOG.warning('Mail archive list is not found at %s', uri) | ||||||
|         return [] |         return [] | ||||||
|  |  | ||||||
|     links = set(re.findall(r'\shref\s*=\s*[\'"]([^\'"]*\.txt\.gz)', content, |     links = set(re.findall(r'\shref\s*=\s*[\'"]([^\'"]*\.txt(?:\.gz)?)', | ||||||
|                            flags=re.IGNORECASE)) |                            content, flags=re.IGNORECASE)) | ||||||
|     return [parse.urljoin(uri, link) for link in links] |     return [parse.urljoin(uri, link) for link in links] | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -91,7 +91,12 @@ def _optimize_body(email_body): | |||||||
|  |  | ||||||
| def _retrieve_mails(uri): | def _retrieve_mails(uri): | ||||||
|     LOG.debug('Retrieving mail archive from: %s', uri) |     LOG.debug('Retrieving mail archive from: %s', uri) | ||||||
|  |  | ||||||
|  |     if uri.endswith('.gz'): | ||||||
|         content = utils.read_gzip_from_uri(uri) |         content = utils.read_gzip_from_uri(uri) | ||||||
|  |     else: | ||||||
|  |         content = utils.read_txt_from_uri(uri) | ||||||
|  |  | ||||||
|     if not content: |     if not content: | ||||||
|         LOG.error('Error reading mail archive from: %s', uri) |         LOG.error('Error reading mail archive from: %s', uri) | ||||||
|         return |         return | ||||||
| @@ -129,6 +134,7 @@ def log(uri, runtime_storage_inst): | |||||||
|  |  | ||||||
|     links = _get_mail_archive_links(uri) |     links = _get_mail_archive_links(uri) | ||||||
|     for link in links: |     for link in links: | ||||||
|  |         LOG.info("Processing emails from %s" % link) | ||||||
|         if _uri_content_changed(link, runtime_storage_inst): |         if _uri_content_changed(link, runtime_storage_inst): | ||||||
|             for mail in _retrieve_mails(link): |             for mail in _retrieve_mails(link): | ||||||
|                 LOG.debug('New mail: %s', mail['message_id']) |                 LOG.debug('New mail: %s', mail['message_id']) | ||||||
|   | |||||||
| @@ -350,7 +350,8 @@ class RecordProcessor(object): | |||||||
|         modules, alias_module_map = self._get_modules() |         modules, alias_module_map = self._get_modules() | ||||||
|         for module in modules: |         for module in modules: | ||||||
|             find = subject.find(module) |             find = subject.find(module) | ||||||
|             if (find >= 0) and (find < pos): |             if (find >= 0) and (find < pos) \ | ||||||
|  |                     and (len(module) > len(best_guess_module or '')): | ||||||
|                 pos = find |                 pos = find | ||||||
|                 best_guess_module = module |                 best_guess_module = module | ||||||
|  |  | ||||||
|   | |||||||
| @@ -167,6 +167,14 @@ def _gzip_decompress(content): | |||||||
|         return gzip_fd.read() |         return gzip_fd.read() | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def read_txt_from_uri(uri): | ||||||
|  |     try: | ||||||
|  |         return do_request(uri).content.decode('utf8') | ||||||
|  |     except Exception as e: | ||||||
|  |         LOG.warning('Error "%(error)s" retrieving uri %(uri)s', | ||||||
|  |                     {'error': e, 'uri': uri}) | ||||||
|  |  | ||||||
|  |  | ||||||
| def read_gzip_from_uri(uri): | def read_gzip_from_uri(uri): | ||||||
|     try: |     try: | ||||||
|         return _gzip_decompress(do_request(uri).content) |         return _gzip_decompress(do_request(uri).content) | ||||||
|   | |||||||
| @@ -82,7 +82,7 @@ From: sorlando at nicira.com (Salvatore Orlando) | |||||||
|     def test_log(self, mock_uri_content_changed, mock_get_mail_archive_links, |     def test_log(self, mock_uri_content_changed, mock_get_mail_archive_links, | ||||||
|                  mock_read_gzip_from_uri): |                  mock_read_gzip_from_uri): | ||||||
|         mock_uri_content_changed.return_value = True |         mock_uri_content_changed.return_value = True | ||||||
|         mock_get_mail_archive_links.return_value = ['link'] |         mock_get_mail_archive_links.return_value = ['link.txt.gz'] | ||||||
|         mock_read_gzip_from_uri.return_value = EMAIL_CONTENT |         mock_read_gzip_from_uri.return_value = EMAIL_CONTENT | ||||||
|         mock_rsi = mock.Mock() |         mock_rsi = mock.Mock() | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Sergey Nikitin
					Sergey Nikitin