-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3.py
132 lines (110 loc) · 4.01 KB
/
s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""DocString: Scan for buckets policies, versioning and encryption."""
import logging
import sys
import coloredlogs
import boto3
from versioning import get_bucket_versioning, set_bucket_versioning
from encryption import get_bucket_encryption, set_bucket_encryption
from policies import get_bucket_policy, set_bucket_policy
def set_logger():
"""Set the logger."""
logger = logging.getLogger('s3scanner-screen')
logger.setLevel(logging.INFO)
level_styles = {
'info': {'color': 'cyan'},
'warning': {'color': 'yellow'},
'error': {'color': 'red'}
}
field_styles = {
'asctime': {'color': 'white'}
}
coloredlogs.install(level='DEBUG', logger=logger,
fmt='%(asctime)s %(message)s',
level_styles=level_styles, field_styles=field_styles)
return logger
def check_input():
"""Check for inputs and exit if missing."""
fix = False
buck = False
if '-f' in sys.argv:
fix = True
if '-b' in sys.argv:
buck = sys.argv[sys.argv.index('-b') + 1]
return(buck, fix)
def list_all_buckets(s3client):
"""List all buckets in the AWS account."""
list_to_return = []
test = s3client.list_buckets()
for bucket in test["Buckets"]:
list_to_return.append(bucket["Name"])
return list_to_return
def versioning(bucket, s3resource, logger, fix):
"""Check and enable versioning."""
versioning = get_bucket_versioning(bucket, s3resource, logger)
logger.info(bucket + " versioning is " + versioning)
if fix:
if versioning != "Enabled":
versioning = set_bucket_versioning(bucket, s3resource, logger)
logger.info(bucket + " versioning is now " + versioning)
return versioning
def encryption(bucket, s3client, logger, fix):
"""Check and Enable Encryption."""
enc = get_bucket_encryption(bucket, s3client, logger)
logger.info(bucket + " encryption is " + enc)
if fix:
if enc != "Encrypted":
enc = set_bucket_encryption(bucket, s3client, logger)
logger.info(bucket + " encryption is now " + enc)
return enc
def policies(bucket, s3client, logger, fix):
"""Check and Enable Encryption."""
pol = get_bucket_policy(bucket, s3client)
logger.info(bucket + " policy is " + pol[2])
if fix:
if pol[2] != "SSL secured":
pol = set_bucket_policy(bucket, s3client)
logger.info(bucket + " policy is now " + pol[2])
return pol[2]
def main():
"""Main."""
s3client = boto3.client('s3')
s3resource = boto3.resource('s3')
bucket, fix = check_input()
results_file = open("s3results.csv", "w")
results_file.write("Bucket Name,Versioning,Encryption\n")
logger = set_logger()
bucket_list = list_all_buckets(s3client)
if bucket:
if bucket in bucket_list:
ver = versioning(bucket, s3resource, logger, fix)
enc = encryption(bucket, s3client, logger, fix)
pol = policies(bucket, s3client, logger, fix)
results_file.write(bucket + "," + ver +
"," + enc + "," + pol + "\n")
else:
logger.error("Bucket Not Found: %s", bucket)
else:
num_buckets = len(bucket_list)
counter = 1
logger.info("Found: %d Buckets", num_buckets)
for bucket in bucket_list:
ver = versioning(bucket, s3resource, logger, fix)
enc = encryption(bucket, s3client, logger, fix)
pol = policies(bucket, s3client, logger, fix)
logger.info("%d of %d", counter, num_buckets)
counter = counter + 1
results_file.write(bucket + "," + ver +
"," + enc + "," + pol + "\n")
results_file.close()
def exit_gracefully():
"""Handle ctrl+c."""
print("\n")
print("Things should be logged in s3results.csv file")
print("\n")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
finally:
exit_gracefully()