Skip to content

Commit

Permalink
Merge pull request #2 from datatrails/steve/hash-signing
Browse files Browse the repository at this point in the history
Hash Signing Updates
  • Loading branch information
SteveLasker committed Jul 19, 2024
2 parents e765d9c + 9bc85c6 commit c41e32d
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 105 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Secure your Software Supply Chain and your Content Authenticity with immutable data trails. This GitHub Action uses DataTrails implementation of the IETF Supply Chain, Integrity and Trust ([SCITT](https://scitt.io)) APIs.

**NOTE:**:
This SCITT GitHub Action is in Preview, pending adoption of the [SCITT Reference APIs (SCRAPI)](https://datatracker.ietf.org/doc/draft-ietf-scitt-scrapi/).
To use a production supported implementation, please contact [DataTrails](https://www.datatrails.ai/contactus/) for more info.

## Getting Started

To create immutable data trails, an account with a `Client_ID` and `Secret` are required.
Expand Down
43 changes: 26 additions & 17 deletions action.yml
Original file line number Diff line number Diff line change
@@ -1,34 +1,41 @@
name: 'DataTrails SCITT API'
description: 'Register, Get Receipts and Query Feeds from the DataTrails SCITT API'
inputs:
content-type:
description: 'The payload content type (iana mediaType) to be registered on the SCITT Service (eg: application/spdx+json, application/vnd.cyclonedx+json, Scan Result, Attestation)'
required: true
datatrails-client_id:
description: 'The CLIENT_ID used to access the DataTrails SCITT APIs'
required: true
datatrails-secret:
description: 'The SECRET used to access the DataTrails SCITT APIs'
required: true
subject:
description: 'Unique ID for the collection of statements about an artifact'
issuer:
description: 'The name of the issuer, set to CTW_Claims:iss'
required: true
payload:
payload-file:
description: 'The payload file to be registered on the SCITT Service (eg: SBOM, Scan Result, Attestation)'
required: true
content-type:
description: 'The payload content type (iana mediaType) to be registered on the SCITT Service (eg: application/spdx+json, application/vnd.cyclonedx+json, Scan Result, Attestation)'
required: true
signed-statement-file:
description: 'File representing the signed SCITT Statement that will be registered on SCITT.'
payload-location:
description: 'Optional location the content of the payload may be stored.'
required: false
default: 'signed-statement.cbor'
receipt-file:
description: 'The file to save the cbor receipt'
description: 'The filename to save the cbor receipt'
required: false
default: 'receipt.cbor'
signed-statement-file:
description: 'File representing the signed SCITT Statement that will be registered on SCITT.'
required: false
default: 'signed-statement.cbor'
signing-key-file:
description: 'The .pem file used to sign the statement'
required: true
issuer:
description: 'The name of the issuer, set to CTW_Claims:iss'
skip-receipt:
description: 'To skip receipt retrieval, set to 1'
required: false
default: '0'
subject:
description: 'Unique ID for the collection of statements about an artifact'
required: true
outputs:
token: # id of output
Expand All @@ -37,12 +44,14 @@ runs:
using: 'docker'
image: 'Dockerfile'
args:
- ${{ inputs.content-type }}
- ${{ inputs.datatrails-client_id }}
- ${{ inputs.datatrails-secret }}
- ${{ inputs.subject }}
- ${{ inputs.payload }}
- ${{ inputs.content-type }}
- ${{ inputs.signed-statement-file }}
- ${{ inputs.issuer }}
- ${{ inputs.payload-file }}
- ${{ inputs.payload-location}}
- ${{ inputs.receipt-file }}
- ${{ inputs.signed-statement-file }}
- ${{ inputs.signing-key-file }}
- ${{ inputs.issuer }}
- ${{ inputs.skip-receipt }}
- ${{ inputs.subject }}
67 changes: 40 additions & 27 deletions scitt-scripts/check_operation_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import os
import argparse
import logging
import sys

from time import sleep as time_sleep

Expand All @@ -10,7 +12,7 @@

# all timeouts and durations are in seconds
REQUEST_TIMEOUT = 30
POLL_TIMEOUT = 360
POLL_TIMEOUT = 60
POLL_INTERVAL = 10


Expand Down Expand Up @@ -39,10 +41,7 @@ def get_operation_status(operation_id: str, headers: dict) -> dict:

while True:
response = requests.get(url, timeout=30, headers=headers)
# print("***response:", flush=True)
# print(response, flush=True)
# print(response.json, flush=True)
# print("***response:", flush=True)

if response.status_code == 200:
break
elif response.status_code == 400:
Expand All @@ -53,27 +52,35 @@ def get_operation_status(operation_id: str, headers: dict) -> dict:
return response.json()


def poll_operation_status(operation_id: str, headers: dict) -> str:
def poll_operation_status(
operation_id: str, headers: dict, logger: logging.Logger
) -> str:
"""
polls for the operation status to be 'succeeded'.
"""

poll_attempts: int = int(POLL_TIMEOUT / POLL_INTERVAL)

for _ in range(poll_attempts):
operation_status = get_operation_status(operation_id, headers)
# print("***operation_status:", flush=True)
# print(operation_status, flush=True)
# print("***operation_status:", flush=True)

# pylint: disable=fixme
# TODO: ensure get_operation_status handles error cases from the rest request
if "status" in operation_status and operation_status["status"] == "succeeded":
return operation_status["entryID"]
logger.info("starting to poll for operation status 'succeeded'")

for _ in range(poll_attempts):
try:
operation_status = get_operation_status(operation_id, headers)

# pylint: disable=fixme
# TODO: ensure get_operation_status handles error cases from the rest request
if (
"status" in operation_status
and operation_status["status"] == "succeeded"
):
return operation_status["entryID"]

except requests.HTTPError as e:
logger.debug("failed getting operation status, error: %s", e)

time_sleep(POLL_INTERVAL)

raise TimeoutError("signed statement not registered within polling duration.")
raise TimeoutError("signed statement not registered within polling duration")


def main():
Expand Down Expand Up @@ -108,21 +115,27 @@ def main():
default=default_token_file_name,
)

# log level
parser.add_argument(
"--log-level",
type=str,
help="log level. for any individual poll errors use DEBUG, defaults to WARNING",
default="WARNING",
)

args = parser.parse_args()

# print("args.token_file_name:", flush=True)
# print(args.token_file_name, flush=True)
logger = logging.getLogger("check operation status")
logging.basicConfig(level=logging.getLevelName(args.log_level))

headers = get_token_from_file(args.token_file_name)
# print("headers:", flush=True)
# print(headers, flush=True)

# print("operation_id:", flush=True)
# print(args.operation_id, flush=True)

entry_id = poll_operation_status(args.operation_id, headers)
print(entry_id, flush=True)

try:
entry_id = poll_operation_status(args.operation_id, headers, logger)
print(entry_id)
except TimeoutError as e:
print(e, file=sys.stderr)
sys.exit(1)

if __name__ == "__main__":
main()
85 changes: 46 additions & 39 deletions scitt-scripts/create_hashed_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@

# Signed Hash envelope header labels from:
# https://github.com/OR13/draft-steele-cose-hash-envelope/blob/main/draft-steele-cose-hash-envelope.md
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM = 998
HEADER_LABEL_LOCATION = 999
# pre-adoption/private use parameters
# https://www.iana.org/assignments/cose/cose.xhtml#header-parameters
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM = -6800
HEADER_LABEL_LOCATION = -6801

# CBOR Object Signing and Encryption (COSE) "typ" (type) Header Parameter
# https://datatracker.ietf.org/doc/rfc9596/
HEADER_LABEL_TYPE = 16
COSE_TYPE="application/hashed+cose"

def open_signing_key(key_file: str) -> SigningKey:
"""
Expand All @@ -61,12 +67,12 @@ def open_payload(payload_file: str) -> str:


def create_hashed_signed_statement(
signing_key: SigningKey,
content_type: str,
issuer: str,
payload: str,
payload_location: str,
signing_key: SigningKey,
subject: str,
issuer: str,
content_type: str,
location: str,
) -> bytes:
"""
creates a hashed signed statement, given the signing_key, payload, subject and issuer
Expand All @@ -87,6 +93,7 @@ def create_hashed_signed_statement(
# create a protected header where
# the verification key is attached to the cwt claims
protected_header = {
HEADER_LABEL_TYPE: COSE_TYPE,
Algorithm: Es256,
KID: b"testkey",
ContentType: content_type,
Expand All @@ -103,7 +110,7 @@ def create_hashed_signed_statement(
},
},
HEADER_LABEL_PAYLOAD_HASH_ALGORITHM: -16, # for sha256
HEADER_LABEL_LOCATION: location,
HEADER_LABEL_LOCATION: payload_location,
}

# now create a sha256 hash of the payload
Expand Down Expand Up @@ -139,71 +146,71 @@ def main():

parser = argparse.ArgumentParser(description="Create a signed statement.")

# signing key file
# content-type
parser.add_argument(
"--signing-key-file",
"--content-type",
type=str,
help="filepath to the stored ecdsa P-256 signing key, in pem format.",
default="scitt-signing-key.pem",
help="The iana.org media type for the payload",
default="application/json",
)

# payload-file (a reference to the file that will become the payload of the SCITT Statement)
# issuer
parser.add_argument(
"--payload-file",
"--issuer",
type=str,
help="filepath to the content that will be hashed into the payload of the SCITT Statement.",
default="scitt-payload.json",
help="issuer who owns the signing key.",
)

# content-type
# output file
parser.add_argument(
"--content-type",
"--output-file",
type=str,
help="The iana.org media type for the payload",
default="application/json",
help="name of the output file to store the signed statement.",
default="signed-statement.cbor",
)

# subject
# payload-file (a reference to the file that will become the payload of the SCITT Statement)
parser.add_argument(
"--subject",
"--payload-file",
type=str,
help="subject to correlate statements made about an artifact.",
help="filepath to the content that will be hashed into the payload of the SCITT Statement.",
default="scitt-payload.json",
)

# issuer
# payload-location
parser.add_argument(
"--issuer",
"--payload-location",
type=str,
help="issuer who owns the signing key.",
help="location hint for the original statement that was hashed.",
)

# location hint
# signing key file
parser.add_argument(
"--location-hint",
"--signing-key-file",
type=str,
help="location hint for the original statement that was hashed.",
help="filepath to the stored ecdsa P-256 signing key, in pem format.",
default="scitt-signing-key.pem",
)

# output file
# subject
parser.add_argument(
"--output-file",
"--subject",
type=str,
help="name of the output file to store the signed statement.",
default="signed-statement.cbor",
help="subject to correlate statements made about an artifact.",
)

args = parser.parse_args()

signing_key = open_signing_key(args.signing_key_file)
payload = open_payload(args.payload_file)
payload_contents = open_payload(args.payload_file)

signed_statement = create_hashed_signed_statement(
signing_key,
payload,
args.subject,
args.issuer,
args.content_type,
args.location_hint,
content_type=args.content_type,
issuer=args.issuer,
payload=payload_contents,
payload_location=args.payload_location,
signing_key=signing_key,
subject=args.subject
)

with open(args.output_file, "wb") as output_file:
Expand Down
Loading

0 comments on commit c41e32d

Please sign in to comment.