Provides examples that show you how to connect to an S3 storage service and perform bucket and object operations in Python.
import boto3
import urllib3
from botocore.client import Config
from operations import demo
if __name__ == '__main__':
host = "http://localhost:9000"
username = "minioadmin"
password = "minioadmin"
bucketName = "test"
file = "file"
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
s3 = boto3.client('s3',
endpoint_url=host,
aws_access_key_id=username,
aws_secret_access_key=password,
config=Config(signature_version='s3v4'),
region_name='us-east-1',
verify=False)
demo(s3, bucketName, file)
from datetime import datetime
from botocore.exceptions import ClientError
def demo(s3, bucket_name, file):
print("\n")
list_buckets(s3)
print("\n")
create_bucket_if_not_exists(s3, bucket_name)
list_buckets(s3)
print("\n")
upload_file_if_not_exists(s3, bucket_name, file)
list_folder(s3, bucket_name)
print("\n")
read_file(s3, bucket_name, file)
print("\n")
delete_all(s3, bucket_name)
list_folder(s3, bucket_name)
print("\n")
delete_bucket_if_exists(s3, bucket_name)
list_buckets(s3)
def list_buckets(s3):
print("Buckets:")
for bucket in s3.list_buckets()['Buckets']:
print(bucket['Name'])
def create_bucket_if_not_exists(s3, bucket_name):
if check_if_bucket_exists(s3, bucket_name):
print("Bucket exists")
return
print("Bucket does not exist, creating...")
s3.create_bucket(Bucket=bucket_name)
def check_if_bucket_exists(s3, bucket_name):
print("Checking if bucket '" + bucket_name + "' exists")
try:
s3.head_bucket(Bucket=bucket_name)
except ClientError as e:
return int(e.response['Error']['Code']) != 404
return True
def delete_bucket_if_exists(s3, bucket_name):
print("Checking if bucket '" + bucket_name + "' exists")
try:
s3.head_bucket(Bucket=bucket_name)
print("Bucket exists, deleting...")
s3.delete_bucket(Bucket=bucket_name)
except ClientError:
print("Bucket does not exist")
def list_folder(s3, bucket_name):
print("Listing bucket " + bucket_name + ":")
objects = s3.list_objects(Bucket=bucket_name)
if 'Contents' in objects:
for key in objects['Contents']:
print(key['Key'])
else:
print("Is empty")
def delete_all(s3, bucket_name):
print("Cleaning all form bucket: " + bucket_name)
objects = s3.list_objects(Bucket=bucket_name)
if 'Contents' in objects:
for key in objects['Contents']:
s3.delete_object(Bucket=bucket_name, Key=key['Key'])
def upload_file_if_not_exists(s3, bucket_name, file_name):
if check_if_file_exist(s3, bucket_name, file_name):
print("File '" + file_name + "' already exists in bucket '" + bucket_name + "'")
return
date = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
data = "Hello world on " + date + "!"
print("Uploading '" + file_name + "' to '" + bucket_name + "' with data:\n" + data)
s3.put_object(Body=data.encode('ascii'), Bucket=bucket_name, Key=file_name)
def read_file(s3, bucket_name, file_name):
print("Reading file '" + file_name + "' from bucket '" + bucket_name + "'")
obj = s3.get_object(Bucket=bucket_name, Key=file_name)
print("Data:")
print(obj['Body'].read().decode('utf-8'))
def check_if_file_exist(s3, bucket_name, file):
print("Checking if file '" + file + "' exists in bucket '" + bucket_name + "'")
try:
s3.head_object(Bucket=bucket_name, Key=file)
except ClientError as e:
return int(e.response['Error']['Code']) != 404
return True