Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
406 changes: 406 additions & 0 deletions awscli/customizations/s3/bucketlister.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions awscli/customizations/s3/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@
AUTO_RESOLVE_TRANSFER_CLIENT = 'auto'
CLASSIC_TRANSFER_CLIENT = 'classic'
CRT_TRANSFER_CLIENT = 'crt'

# Constants for bucket_lister configuration
STANDARD_BUCKET_LISTER = 'standard'
THREADED_BUCKET_LISTER = 'threaded'
33 changes: 32 additions & 1 deletion awscli/customizations/s3/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import awscrt.s3
from botocore.client import Config
from botocore.httpsession import DEFAULT_CA_BUNDLE
from botocore.parsers import ResponseParserFactory
from s3transfer.crt import (
BotocoreCRTCredentialsWrapper,
BotocoreCRTRequestSerializer,
Expand All @@ -33,11 +34,41 @@
LOGGER = logging.getLogger(__name__)


def _identity(value):
return value


class ClientFactory:
_RESPONSE_PARSER_FACTORY_COMPONENT = 'response_parser_factory'

def __init__(self, session):
self._session = session

def create_client(self, params, is_source_client=False):
create_client_kwargs = self._get_client_kwargs(
params, is_source_client=is_source_client
)
return self._session.create_client('s3', **create_client_kwargs)

def create_listing_client(self, params, is_source_client=False):
original_factory = self._session.get_component(
self._RESPONSE_PARSER_FACTORY_COMPONENT
)
listing_factory = ResponseParserFactory()
listing_factory.set_parser_defaults(timestamp_parser=_identity)
self._session.register_component(
self._RESPONSE_PARSER_FACTORY_COMPONENT, listing_factory
)
try:
return self.create_client(
params, is_source_client=is_source_client
)
finally:
self._session.register_component(
self._RESPONSE_PARSER_FACTORY_COMPONENT, original_factory
)

def _get_client_kwargs(self, params, is_source_client=False):
create_client_kwargs = {'verify': params['verify_ssl']}
if params.get('sse') == 'aws:kms':
create_client_kwargs['config'] = Config(signature_version='s3v4')
Expand All @@ -50,7 +81,7 @@ def create_client(self, params, is_source_client=False):

create_client_kwargs['region_name'] = region
create_client_kwargs['endpoint_url'] = endpoint_url
return self._session.create_client('s3', **create_client_kwargs)
return create_client_kwargs


class TransferManagerFactory:
Expand Down
13 changes: 11 additions & 2 deletions awscli/customizations/s3/filegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
from dateutil.tz import tzlocal

from awscli.compat import queue
from awscli.customizations.s3.bucketlister import ThreadedBucketLister
from awscli.customizations.s3.utils import (
EPOCH_TIME,
BucketLister,
create_warning,
find_bucket_key,
find_dest_path_comp_key,
Expand Down Expand Up @@ -134,6 +134,7 @@ class FileGenerator:
under the same common prefix. The generator yields corresponding
``FileInfo`` objects to send to a ``Comparator`` or ``S3Handler``.
"""
_DEFAULT_BUCKET_LISTER_CLS = ThreadedBucketLister
Comment thread
jamesls marked this conversation as resolved.
Outdated

def __init__(
self,
Expand All @@ -143,8 +144,11 @@ def __init__(
page_size=None,
result_queue=None,
request_parameters=None,
listing_client=None,
bucket_lister_cls=None,
):
self._client = client
self._listing_client = listing_client
self.operation_name = operation_name
self.follow_symlinks = follow_symlinks
self.page_size = page_size
Expand All @@ -154,6 +158,9 @@ def __init__(
self.request_parameters = {}
if request_parameters is not None:
self.request_parameters = request_parameters
if bucket_lister_cls is None:
bucket_lister_cls = self._DEFAULT_BUCKET_LISTER_CLS
self._bucket_lister_cls = bucket_lister_cls

def call(self, files):
"""
Expand Down Expand Up @@ -355,7 +362,9 @@ def list_objects(self, s3_path, dir_op):
if not dir_op and prefix:
yield self._list_single_object(s3_path)
else:
lister = BucketLister(self._client)
lister = self._bucket_lister_cls(
self._listing_client or self._client
)
extra_args = self.request_parameters.get('ListObjectsV2', {})
for key in lister.list_objects(
bucket=bucket,
Expand Down
59 changes: 50 additions & 9 deletions awscli/customizations/s3/subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import os
import sys

from botocore.client import Config
from botocore.useragent import register_feature_id
from botocore.utils import ensure_boolean, is_s3express_bucket
from dateutil.parser import parse
Expand All @@ -23,7 +22,11 @@
from awscli.compat import queue
from awscli.customizations.commands import BasicCommand
from awscli.customizations.exceptions import ParamValidationError
from awscli.customizations.s3 import transferconfig
from awscli.customizations.s3 import constants, transferconfig
from awscli.customizations.s3.bucketlister import (
BucketLister,
ThreadedBucketLister,
)
from awscli.customizations.s3.comparator import Comparator
from awscli.customizations.s3.factory import (
ClientFactory,
Expand Down Expand Up @@ -58,6 +61,12 @@
LOGGER = logging.getLogger(__name__)


_BUCKET_LISTERS = {
constants.STANDARD_BUCKET_LISTER: BucketLister,
constants.THREADED_BUCKET_LISTER: ThreadedBucketLister,
}


RECURSIVE = {
'name': 'recursive',
'action': 'store_true',
Expand Down Expand Up @@ -1063,11 +1072,18 @@ def _run_main(self, parsed_args, parsed_globals):
register_feature_id('S3_TRANSFER')
self._convert_path_args(parsed_args)
params = self._get_params(parsed_args, parsed_globals, self._session)
source_client, transfer_client = self._get_source_and_transfer_clients(
params=params
)
(
source_client,
transfer_client,
source_listing_client,
destination_listing_client,
) = self._get_source_and_transfer_clients(params=params)
runtime_config = self._get_runtime_config()
bucket_lister_cls = _BUCKET_LISTERS[runtime_config['bucket_lister']]
transfer_manager = self._get_transfer_manager(
params=params, botocore_transfer_client=transfer_client
params=params,
botocore_transfer_client=transfer_client,
runtime_config=runtime_config,
)
cmd = CommandArchitecture(
self._session,
Expand All @@ -1076,6 +1092,9 @@ def _run_main(self, parsed_args, parsed_globals):
transfer_manager,
source_client,
transfer_client,
source_listing_client,
destination_listing_client,
bucket_lister_cls,
)
cmd.create_instructions()
return cmd.run()
Expand Down Expand Up @@ -1109,10 +1128,22 @@ def _get_source_and_transfer_clients(self, params):
params, is_source_client=True
)
transfer_client = client_factory.create_client(params)
return source_client, transfer_client
source_listing_client = client_factory.create_listing_client(
params, is_source_client=True
)
destination_listing_client = client_factory.create_listing_client(
params
)
return (
source_client,
transfer_client,
source_listing_client,
destination_listing_client,
)

def _get_transfer_manager(self, params, botocore_transfer_client):
runtime_config = self._get_runtime_config()
def _get_transfer_manager(
self, params, botocore_transfer_client, runtime_config
):
return TransferManagerFactory(self._session).create_transfer_manager(
params=params,
runtime_config=runtime_config,
Expand Down Expand Up @@ -1367,6 +1398,9 @@ def __init__(
transfer_manager,
source_client,
transfer_client,
source_listing_client,
destination_listing_client,
bucket_lister_cls,
):
self.session = session
self.cmd = cmd
Expand All @@ -1375,6 +1409,9 @@ def __init__(
self._transfer_manager = transfer_manager
self._source_client = source_client
self._client = transfer_client
self._source_listing_client = source_listing_client
self._destination_listing_client = destination_listing_client
self._bucket_lister_cls = bucket_lister_cls

def create_instructions(self):
"""
Expand Down Expand Up @@ -1472,17 +1509,21 @@ def run(self):

fgen_kwargs = {
'client': self._source_client,
'listing_client': self._source_listing_client,
'operation_name': operation_name,
'follow_symlinks': self.parameters['follow_symlinks'],
'page_size': self.parameters['page_size'],
'result_queue': result_queue,
'bucket_lister_cls': self._bucket_lister_cls,
}
rgen_kwargs = {
'client': self._client,
'listing_client': self._destination_listing_client,
'operation_name': '',
'follow_symlinks': self.parameters['follow_symlinks'],
'page_size': self.parameters['page_size'],
'result_queue': result_queue,
'bucket_lister_cls': self._bucket_lister_cls,
}

fgen_request_parameters = (
Expand Down
7 changes: 6 additions & 1 deletion awscli/customizations/s3/transferconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'should_stream': None,
'disk_throughput': None,
'direct_io': None,
'bucket_lister': constants.STANDARD_BUCKET_LISTER,
}


Expand Down Expand Up @@ -68,7 +69,11 @@ class RuntimeConfig:
constants.AUTO_RESOLVE_TRANSFER_CLIENT,
constants.CLASSIC_TRANSFER_CLIENT,
constants.CRT_TRANSFER_CLIENT,
]
],
'bucket_lister': [
constants.THREADED_BUCKET_LISTER,
constants.STANDARD_BUCKET_LISTER,
],
}
CHOICE_ALIASES = {
'preferred_transfer_client': {
Expand Down
36 changes: 0 additions & 36 deletions awscli/customizations/s3/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from collections import deque, namedtuple
from datetime import datetime

from dateutil.parser import parse
from dateutil.tz import tzlocal, tzutc

from awscli.compat import bytes_print, queue
Expand Down Expand Up @@ -401,41 +400,6 @@ class SetFileUtimeError(Exception):
pass


def _date_parser(date_string):
return parse(date_string).astimezone(tzlocal())


class BucketLister:
"""List keys in a bucket."""

def __init__(self, client, date_parser=_date_parser):
self._client = client
self._date_parser = date_parser

def list_objects(
self, bucket, prefix=None, page_size=None, extra_args=None
):
kwargs = {
'Bucket': bucket,
'PaginationConfig': {'PageSize': page_size},
}
if prefix is not None:
kwargs['Prefix'] = prefix
if extra_args is not None:
kwargs.update(extra_args)

paginator = self._client.get_paginator('list_objects_v2')
pages = paginator.paginate(**kwargs)
for page in pages:
contents = page.get('Contents', [])
for content in contents:
source_path = bucket + '/' + content['Key']
content['LastModified'] = self._date_parser(
content['LastModified']
)
yield source_path, content


class PrintTask(
namedtuple('PrintTask', ['message', 'error', 'total_parts', 'warning'])
):
Expand Down
18 changes: 18 additions & 0 deletions awscli/topics/s3-config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ command set:
and downloading data to and from Amazon S3.
* ``io_chunksize`` - The maximum size of read parts that can be queued in-memory
to be written for a download.
* ``bucket_lister`` - The bucket listing implementation to use when discovering
S3 objects for transfer commands.

For experimental ``s3`` configuration values, see the the
`Experimental Configuration Values <#experimental-configuration-values>`__
Expand Down Expand Up @@ -77,6 +79,7 @@ configuration::
multipart_threshold = 64MB
multipart_chunksize = 16MB
max_bandwidth = 50MB/s
bucket_lister = threaded
use_accelerate_endpoint = true
addressing_style = path

Expand All @@ -93,6 +96,7 @@ could instead run these commands::
$ aws configure set default.s3.multipart_threshold 64MB
$ aws configure set default.s3.multipart_chunksize 16MB
$ aws configure set default.s3.max_bandwidth 50MB/s
$ aws configure set default.s3.bucket_lister threaded
$ aws configure set default.s3.use_accelerate_endpoint true
$ aws configure set default.s3.addressing_style path

Expand Down Expand Up @@ -239,6 +243,20 @@ In cases where network IO is the bottleneck, it is recommended to configure
``max_concurrent_requests`` instead.


bucket_lister
-------------

**Default** - ``standard``

Determines the bucket listing implementation to use when the AWS CLI discovers
S3 objects for transfer commands. Valid choices are:
Comment thread
hssyoo marked this conversation as resolved.

* ``standard`` - Use the non-threaded bucket lister.
This is the default behavior.

* ``threaded`` - Use a background producer thread to fetch object listing pages.


use_accelerate_endpoint
-----------------------

Expand Down
Loading
Loading