Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Add Request ID and UUID headers to all output plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
sugr4 committed Sep 21, 2016
1 parent 0e6bc2a commit 1cd9e4b
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 17 deletions.
15 changes: 12 additions & 3 deletions source/code/plugins/oms_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Common
require 'zlib'
require 'digest'
require 'date'
require 'securerandom'

require_relative 'omslog'
require_relative 'oms_configuration'
Expand Down Expand Up @@ -679,13 +680,20 @@ def create_ods_request(path, record, compress, extra_headers=nil, serializer=met

azure_resource_id = OMS::Configuration.azure_resource_id
if !azure_resource_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = OMS::Configuration.azure_resource_id
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = azure_resource_id
end

omscloud_id = OMS::Configuration.omscloud_id
if !omscloud_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = OMS::Configuration.omscloud_id
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = omscloud_id
end

uuid = OMS::Configuration.get_vm_uuid
if !uuid.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-UUID")] = uuid
end

headers[OMS::CaseSensitiveString.new("X-Request-ID")] = SecureRandom.uuid

headers["Content-Type"] = "application/json"
if compress == true
Expand Down Expand Up @@ -808,7 +816,8 @@ def start_request(req, secure_http, ignore404 = false)

if res.code != "200"
# Retry all failure error codes...
res_summary = "(class=#{res.class.name}; code=#{res.code}; message=#{res.message}; body=#{res.body};)"
res_summary = "(request-id=#{req["X-Request-ID"]}; class=#{res.class.name}; code=#{res.code}; message=#{res.message}; body=#{res.body};)"
Log.error_once("HTTP Error: #{res_summary}")
raise RetryRequestException, "HTTP error: #{res_summary}"
end

Expand Down
8 changes: 8 additions & 0 deletions source/code/plugins/oms_configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Configuration
@@NotifyBlobODSEndpoint = nil
@@OmsCloudId = nil
@@AzureResourceId = nil
@@UUID = nil

class << self

Expand Down Expand Up @@ -115,6 +116,9 @@ def load_configuration(conf_path, cert_path, key_path)
if line =~ /OMSCLOUD_ID/
@@OmsCloudId = line.sub("OMSCLOUD_ID=","")
end
if line =~ /UUID/
@@UUID = line.sub("UUID=","")
end
end

begin
Expand Down Expand Up @@ -162,6 +166,10 @@ def azure_resource_id
def omscloud_id
@@OmsCloudId
end

def get_vm_uuid
@@UUID
end
end # Class methods

end # class Common
Expand Down
26 changes: 18 additions & 8 deletions source/code/plugins/out_oms_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def initialize
require 'time'
require 'uri'
require 'zlib'
require 'securerandom'
require_relative 'omslog'
require_relative 'oms_configuration'
require_relative 'oms_common'
Expand Down Expand Up @@ -53,28 +54,35 @@ def shutdown
# log_type: string. log type
# time_generated_field_name: string. name of the time generated field
# records: hash[]. an array of data
def post_data(log_type, time_generated_field_name, records)
def post_data(log_type, time_generated_field_name, records, request_id)
headers = {}
headers[OMS::CaseSensitiveString.new("Log-Type")] = log_type
headers[OMS::CaseSensitiveString.new("x-ms-date")] = Time.now.utc.httpdate()

azure_resource_id = OMS::Configuration.azure_resource_id
azure_resource_id = OMS::Configuration.azure_resource_id if defined?(OMS::Configuration.azure_resource_id)
if !azure_resource_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = OMS::Configuration.azure_resource_id
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = azure_resource_id
end

omscloud_id = OMS::Configuration.omscloud_id
omscloud_id = OMS::Configuration.omscloud_id if defined?(OMS::Configuration.omscloud_id)
if !omscloud_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = OMS::Configuration.omscloud_id
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = omscloud_id
end

uuid = OMS::Configuration.get_vm_uuid if defined?(OMS::Configuration.get_vm_uuid)
if !uuid.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-UUID")] = uuid
end

headers[OMS::CaseSensitiveString.new("X-Request-ID")] = request_id

if time_generated_field_name != ''
headers[OMS::CaseSensitiveString.new("time-generated-field")] = time_generated_field_name
end

api_endpoint = OMS::Configuration.ods_endpoint.clone
api_endpoint.query = "api-version=#{@api_version}"

req = OMS::Common.create_ods_request(api_endpoint.request_uri, records, @compress, headers, lambda { |data| OMS::Common.safe_dump_simple_hash_array(data) })

unless req.nil?
Expand Down Expand Up @@ -114,7 +122,8 @@ def handle_records(tag, records)

if @logtype_regex =~ log_type
start = Time.now
dataSize = post_data(log_type, time_generated_field_name, records)
request_id = SecureRandom.uuid
dataSize = post_data(log_type, time_generated_field_name, records, request_id)
time = Time.now - start
@log.trace "Success sending #{dataSize} bytes of data through API #{time.round(3)}s"
else
Expand All @@ -125,9 +134,10 @@ def handle_records(tag, records)
end
rescue OMS::RetryRequestException => e
@log.info "Encountered retryable exception. Will retry sending data later."
Log.error_once("Error for Request-ID: #{request_id} Error: #{e}")
@log.debug "Error:'#{e}'"
# Re-raise the exception to inform the fluentd engine we want to retry sending this chunk of data later.
raise e.message
raise e.message, "Request-ID: #{request_id}"
rescue => e
# We encountered something unexpected. We drop the data because
# if bad data caused the exception, the engine will continuously
Expand Down
24 changes: 18 additions & 6 deletions source/code/plugins/out_oms_blob.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create_blob_get_request(uri)
# file_path: string. file path
# returns:
# HTTPRequest. blob PUT request
def create_blob_put_request(uri, msg, file_path = nil)
def create_blob_put_request(uri, msg, request_id, file_path = nil)
headers = {}

headers[OMS::CaseSensitiveString.new("x-ms-meta-TimeZoneid")] = OMS::Common.get_current_timezone
Expand All @@ -79,24 +79,34 @@ def create_blob_put_request(uri, msg, file_path = nil)

azure_resource_id = OMS::Configuration.azure_resource_id
if !azure_resource_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = OMS::Configuration.azure_resource_id
headers[OMS::CaseSensitiveString.new("x-ms-AzureResourceId")] = azure_resource_id
end

omscloud_id = OMS::Configuration.omscloud_id
if !omscloud_id.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = OMS::Configuration.omscloud_id
headers[OMS::CaseSensitiveString.new("x-ms-OMSCloudId")] = omscloud_id
end

uuid = OMS::Configuration.get_vm_uuid
if !uuid.to_s.empty?
headers[OMS::CaseSensitiveString.new("x-ms-UUID")] = uuid
end

headers[OMS::CaseSensitiveString.new("X-Request-ID")] = request_id

headers["Content-Type"] = "application/octet-stream"
headers["Content-Length"] = msg.bytesize.to_s

req = Net::HTTP::Put.new(uri.request_uri, headers)
req.body = msg
return req
rescue OMS::RetryRequestException => e
Log.error_once("HTTP error for Request-ID: #{request_id} Error: #{e}")
raise e.message, "Request-ID: #{request_id}"
end # create_blob_put_request

# get the blob SAS URI from ODS
# parameters:
# parameters
# container_type: string. ContainerType of the data
# data_type: string. DataTypeId of the data
# custom_data_type: string. CustomDataType of the CustomLog
Expand Down Expand Up @@ -192,9 +202,10 @@ def get_committed_blocks(uri)
# string. block id
def upload_block(uri, msg)
base64_blockid = Base64.encode64(SecureRandom.uuid)
request_id = SecureRandom.uuid
append_uri = URI.parse("#{uri.to_s}&comp=block&blockid=#{base64_blockid}")

put_block_req = create_blob_put_request(append_uri, msg, nil)
put_block_req = create_blob_put_request(append_uri, msg, request_id, nil)
http = OMS::Common.create_secure_http(append_uri, @proxy_config)
OMS::Common.start_request(put_block_req, http)

Expand All @@ -216,7 +227,8 @@ def commit_blocks(uri, blocks_committed, blocks_uncommitted, file_path)
commit_msg = doc.to_s

blocklist_uri = URI.parse("#{uri.to_s}&comp=blocklist")
put_blocklist_req = create_blob_put_request(blocklist_uri, commit_msg, file_path)
request_id = SecureRandom.uuid
put_blocklist_req = create_blob_put_request(blocklist_uri, commit_msg, request_id, file_path)
http = OMS::Common.create_secure_http(blocklist_uri, @proxy_config)
OMS::Common.start_request(put_blocklist_req, http)
end # commit_blocks
Expand Down

0 comments on commit 1cd9e4b

Please sign in to comment.