Skip to content

Commit

Permalink
Strip unneeded parts from MQ client
Browse files Browse the repository at this point in the history
Add tracing for mqmetric functions
Add a mutex around callback controller (#148)

Verify qdepth for CHSTATUS operations
  • Loading branch information
ibmmqmet committed Sep 9, 2020
1 parent a7f5d93 commit 0d58d13
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 39 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog
Newest updates are at the top of this file.

## Sep 10 2020 - v5.1.2
* mqmetric - Add loglevel=TRACE and trace-points for all key functions in the package
* mqmetric - Add channel status bytes and buffer counts
* mqmetric - Check queue depth appropriate for all CHSTATUS operations
* ibmmq - Fix for potential race condition (#148)

## Aug 07 2020 - v5.1.1
* ibmmq - Fix STS structure (#146)
* Add flag for Windows build that seems no longer to be automatically set by cgo
Expand Down
10 changes: 8 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,17 @@ ENV RDURL="https://public.dhe.ibm.com/ibmdl/export/pub/software/websphere/messag
VRMF=9.2.0.0

# Install the MQ client from the Redistributable package. This also contains the
# header files we need to compile against.
# header files we need to compile against. Setup the subset of the package
# we are going to keep - the genmqpkg.sh script removes unneeded parts
ENV genmqpkg_incnls=1 \
genmqpkg_incsdk=1 \
genmqpkg_inctls=1

RUN cd /opt/mqm \
&& curl -LO "$RDURL/$VRMF-$RDTAR" \
&& tar -zxf ./*.tar.gz \
&& rm -f ./*.tar.gz
&& rm -f ./*.tar.gz \
&& bin/genmqpkg.sh -b /opt/mqm

# Insert the script that will do the build
COPY buildInDocker.sh $GOPATH
Expand Down
29 changes: 29 additions & 0 deletions ibmmq/mqicb.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import "C"
import (
"fmt"
"strings"
"sync"
"unsafe"
)

Expand All @@ -54,6 +55,9 @@ type cbInfo struct {
// This map is indexed by a combination of the hConn and hObj values
var cbMap = make(map[string]*cbInfo)

// Add a mutex to control access to it as there may be several threads going for different qmgrs
var mutex sync.Mutex

/*
MQCALLBACK_Go is a wrapper callback function that will invoke the user-supplied callback
after converting the C structures into the corresponding Go format.
Expand Down Expand Up @@ -90,7 +94,9 @@ func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQ
}

key := makeKey(hConn, mqcbc.Hobj)
mapLock()
info, ok := cbMap[key]
mapUnlock()

// The MQ Client libraries seem to sometimes call us with an EVENT
// even if it's not been registered. And therefore the cbMap does not
Expand All @@ -105,6 +111,7 @@ func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQ
if !ok {
if gocbc.CallType == MQCBCT_EVENT_CALL && mqcbc.Hobj == C.MQHO_NONE {
key = makePartialKey(hConn)
mapLock()
for k, i := range cbMap {
if strings.HasPrefix(k, key) {
ok = true
Expand All @@ -114,6 +121,7 @@ func MQCALLBACK_Go(hConn C.MQHCONN, mqmd *C.MQMD, mqgmo *C.MQGMO, mqBuffer C.PMQ
break
}
}
mapUnlock()
}
} else {
cbHObj = info.hObj
Expand Down Expand Up @@ -185,14 +193,18 @@ func (object *MQObject) CB(goOperation int32, gocbd *MQCBD, gomd *MQMD, gogmo *M
// Add or remove the control information in the map used by the callback routines
switch mqOperation {
case C.MQOP_DEREGISTER:
mapLock()
delete(cbMap, key)
mapUnlock()
case C.MQOP_REGISTER:
// Stash the hObj and real function to be called
info := &cbInfo{hObj: object,
callbackFunction: gocbd.CallbackFunction,
connectionArea: nil,
callbackArea: gocbd.CallbackArea}
mapLock()
cbMap[key] = info
mapUnlock()
default: // Other values leave the map alone
}

Expand Down Expand Up @@ -230,14 +242,18 @@ func (object *MQQueueManager) CB(goOperation int32, gocbd *MQCBD) error {
// Add or remove the control information in the map used by the callback routines
switch mqOperation {
case C.MQOP_DEREGISTER:
mapLock()
delete(cbMap, key)
mapUnlock()
case C.MQOP_REGISTER:
// Stash an hObj and real function to be called
info := &cbInfo{hObj: &MQObject{qMgr: object, Name: ""},
callbackFunction: gocbd.CallbackFunction,
connectionArea: nil,
callbackArea: gocbd.CallbackArea}
mapLock()
cbMap[key] = info
mapUnlock()
default: // Other values leave the map alone
}

Expand All @@ -259,11 +275,13 @@ func (x *MQQueueManager) Ctl(goOperation int32, goctlo *MQCTLO) error {
// Need to make sure control information is available before the callback
// is enabled. So this gets setup even if the MQCTL fails.
key := makePartialKey(x.hConn)
mapLock()
for k, info := range cbMap {
if strings.HasPrefix(k, key) {
info.connectionArea = goctlo.ConnectionArea
}
}
mapUnlock()

C.MQCTL(x.hConn, mqOperation, (C.PMQVOID)(unsafe.Pointer(&mqctlo)), &mqcc, &mqrc)

Expand Down Expand Up @@ -295,14 +313,25 @@ func makePartialKey(hConn C.MQHCONN) string {
func cbRemoveConnection(hConn C.MQHCONN) {
// Remove all of the hObj values for this hconn
key := makePartialKey(hConn)
mapLock()
for k, _ := range cbMap {
if strings.HasPrefix(k, key) {
delete(cbMap, k)
}
}
mapUnlock()
}

func cbRemoveHandle(hConn C.MQHCONN, hObj C.MQHOBJ) {
key := makeKey(hConn, hObj)
mapLock()
delete(cbMap, key)
mapUnlock()
}

func mapLock() {
mutex.Lock()
}
func mapUnlock() {
mutex.Unlock()
}
2 changes: 0 additions & 2 deletions ibmmq/mqistr.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build go1.13

package ibmmq

/*
Expand Down
56 changes: 54 additions & 2 deletions mqmetric/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ storage mechanisms including Prometheus and InfluxDB.
package mqmetric

/*
Copyright (c) IBM Corporation 2016, 2019
Copyright (c) IBM Corporation 2016, 2020
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,10 @@ const (
ATTR_CHL_RQMNAME = "rqmname"

ATTR_CHL_MESSAGES = "messages"
ATTR_CHL_BYTES_SENT = "bytes_sent"
ATTR_CHL_BYTES_RCVD = "bytes_rcvd"
ATTR_CHL_BUFFERS_SENT = "buffers_sent"
ATTR_CHL_BUFFERS_RCVD = "buffers_rcvd"
ATTR_CHL_BATCHES = "batches"
ATTR_CHL_STATUS = "status"
ATTR_CHL_STATUS_SQUASH = ATTR_CHL_STATUS + "_squash"
Expand Down Expand Up @@ -81,7 +85,9 @@ text. The elements can be expanded later; just trying to give a starting point
for now.
*/
func ChannelInitAttributes() {
traceEntry("ChannelInitAttributes")
if chlAttrsInit {
traceExit("ChannelInitAttributes", 1)
return
}
ChannelStatus.Attributes = make(map[string]*StatusAttribute)
Expand All @@ -101,6 +107,18 @@ func ChannelInitAttributes() {
attr = ATTR_CHL_MESSAGES
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Messages (API Calls for SVRCONN)", ibmmq.MQIACH_MSGS)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
attr = ATTR_CHL_BYTES_SENT
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Bytes sent", ibmmq.MQIACH_BYTES_SENT)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
attr = ATTR_CHL_BYTES_RCVD
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Bytes rcvd", ibmmq.MQIACH_BYTES_RCVD)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
attr = ATTR_CHL_BUFFERS_SENT
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Buffers sent", ibmmq.MQIACH_BUFFERS_SENT)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
attr = ATTR_CHL_BUFFERS_RCVD
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Buffers rcvd", ibmmq.MQIACH_BUFFERS_RCVD)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
attr = ATTR_CHL_BATCHES
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "Completed Batches", ibmmq.MQIACH_BATCHES)
ChannelStatus.Attributes[attr].delta = true // We have to manage the differences as MQ reports cumulative values
Expand Down Expand Up @@ -156,18 +174,26 @@ func ChannelInitAttributes() {
attr = ATTR_CHL_MAX_INSTC
ChannelStatus.Attributes[attr] = newStatusAttribute(attr, "MaxInstC", -1)

traceExit("ChannelInitAttributes", 0)
}

// If we need to list the channels that match a pattern. Not needed for
// the status queries as they (unlike the pub/sub resource stats) accept
// patterns in the PCF command
func InquireChannels(patterns string) ([]string, error) {
traceEntry("InquireChannels")
ChannelInitAttributes()
return inquireObjects(patterns, ibmmq.MQOT_CHANNEL)
rc, err := inquireObjects(patterns, ibmmq.MQOT_CHANNEL)

traceExitErr("InquireChannels", 0, err)
return rc, err
}

func CollectChannelStatus(patterns string) error {
var err error

traceEntry("CollectChannelStatus")

channelsSeen = make(map[string]bool) // Record which channels have been seen in this period
ChannelInitAttributes()

Expand All @@ -178,6 +204,7 @@ func CollectChannelStatus(patterns string) error {

channelPatterns := strings.Split(patterns, ",")
if len(channelPatterns) == 0 {
traceExit("CollectChannelStatus", 1)
return nil
}

Expand Down Expand Up @@ -245,6 +272,7 @@ func CollectChannelStatus(patterns string) error {
}
}
}
traceExitErr("CollectChannelStatus", 0, err)
return err

}
Expand All @@ -253,6 +281,9 @@ func CollectChannelStatus(patterns string) error {
// Collect the responses and build up the statistics
func collectChannelStatus(pattern string, instanceType int32) error {
var err error

traceEntryF("collectChannelStatus", "Pattern: %s", pattern)

statusClearReplyQ()

putmqmd, pmo, cfh, buf := statusSetCommandHeaders()
Expand Down Expand Up @@ -283,6 +314,7 @@ func collectChannelStatus(pattern string, instanceType int32) error {
// And now put the command to the queue
err = cmdQObj.Put(putmqmd, pmo, buf)
if err != nil {
traceExitErr("collectChannelStatus", 1, err)
return err
}

Expand All @@ -298,13 +330,16 @@ func collectChannelStatus(pattern string, instanceType int32) error {
}
}

traceExitErr("collectChannelStatus", 0, err)
return err
}

// Given a PCF response message, parse it to extract the desired statistics
func parseChlData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
var elem *ibmmq.PCFParameter

traceEntry("parseChlData")

chlName := ""
connName := ""
jobName := ""
Expand All @@ -321,6 +356,7 @@ func parseChlData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
offset := 0
datalen := len(buf)
if cfh == nil || cfh.ParameterCount == 0 {
traceExit("parseChlData", 1)
return ""
}

Expand Down Expand Up @@ -379,6 +415,7 @@ func parseChlData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
for k, _ := range channelsSeen {
re := regexp.MustCompile(subKey)
if re.MatchString(k) {
traceExit("parseChlData", 2)
return ""
}
}
Expand Down Expand Up @@ -421,6 +458,7 @@ func parseChlData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
ChannelStatus.Attributes[ATTR_CHL_MAX_INST].Values[key] = newStatusValueInt64(maxInst)
}

traceExitF("parseChlData", 0, "Key: %s", key)
return key
}

Expand All @@ -430,6 +468,7 @@ func parseChlData(instanceType int32, cfh *ibmmq.MQCFH, buf []byte) string {
func ChannelNormalise(attr *StatusAttribute, v int64) float64 {
var f float64

traceEntry("ChannelNormalise")
if attr.squash {
switch attr.pcfAttr {

Expand Down Expand Up @@ -467,6 +506,9 @@ func ChannelNormalise(attr *StatusAttribute, v int64) float64 {
f = 0
}
}

traceExit("ChannelNormalise", 0)

return f
}

Expand All @@ -476,9 +518,12 @@ func ChannelNormalise(attr *StatusAttribute, v int64) float64 {
func inquireChannelAttributes(objectPatternsList string, infoMap map[string]*ObjInfo) error {
var err error

traceEntry("inquireChannelAttributes")

statusClearReplyQ()

if objectPatternsList == "" {
traceExitErr("inquireChannelAttributes", 1, err)
return err
}

Expand Down Expand Up @@ -528,6 +573,7 @@ func inquireChannelAttributes(objectPatternsList string, infoMap map[string]*Obj
// And now put the command to the queue
err = cmdQObj.Put(putmqmd, pmo, buf)
if err != nil {
traceExitErr("inquireChannelAttributes", 2, err)
return err
}

Expand All @@ -538,6 +584,8 @@ func inquireChannelAttributes(objectPatternsList string, infoMap map[string]*Obj
}
}
}
traceExit("inquireChannelAttributes", 0)

return nil
}

Expand All @@ -546,13 +594,16 @@ func parseChannelAttrData(cfh *ibmmq.MQCFH, buf []byte, infoMap map[string]*ObjI
var ci *ObjInfo
var ok bool

traceEntry("parseChannelAttrData")

chlName := ""

parmAvail := true
bytesRead := 0
offset := 0
datalen := len(buf)
if cfh.ParameterCount == 0 {
traceExit("parseChannelAttrData", 1)
return
}
// Parse it once to extract the fields that are needed for the map key
Expand Down Expand Up @@ -631,6 +682,7 @@ func parseChannelAttrData(cfh *ibmmq.MQCFH, buf []byte, infoMap map[string]*ObjI
}
}

traceExit("parseChannelAttrData", 0)
return
}

Expand Down
Loading

0 comments on commit 0d58d13

Please sign in to comment.