Skip to content

Commit

Permalink
Submsg and replies (cosmos#441)
Browse files Browse the repository at this point in the history
* Add dispatchSubmessages and reply to Keeper

* Update all mock types

* Dispatch submessages in all entry points

* Rename mask -> reflect in all tests (that was cosmwasm 0.8...)

* Basic submessage dispatch test;

* Simplify messanger interface again

* Start table tests

* Added table tests

* Debuging handling out of gas and panics

* Properly handle gas limits and out of gas panics

* Test parsing return values from WasmMsg::Instantiate

* PR feedback

* Add test to trigger 0 len data panic

* Safely handle 0 sdk msg submsg responses

* Charge gas on reply
  • Loading branch information
ethanfrey committed Mar 10, 2021
1 parent fd81a66 commit 78d5581
Show file tree
Hide file tree
Showing 10 changed files with 836 additions and 188 deletions.
45 changes: 22 additions & 23 deletions x/wasm/internal/keeper/handler_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,50 +320,49 @@ func convertWasmIBCTimeoutTimestampToCosmosTimestamp(timestamp *uint64) uint64 {
return *timestamp
}

func (h DefaultMessageHandler) Dispatch(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msgs ...wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
sdkMsgs, err := h.encoders.Encode(ctx, contractAddr, contractIBCPortID, msg)
func (h DefaultMessageHandler) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
sdkMsgs, err := h.encoders.Encode(ctx, contractAddr, contractIBCPortID, msg)
if err != nil {
return nil, nil, err
}
for _, sdkMsg := range sdkMsgs {
res, err := h.handleSdkMessage(ctx, contractAddr, sdkMsg)
if err != nil {
return err
return nil, nil, err
}
for _, sdkMsg := range sdkMsgs {
if err := h.handleSdkMessage(ctx, contractAddr, sdkMsg); err != nil {
return err
}
// append data
data = append(data, res.Data)
// append events
sdkEvents := make([]sdk.Event, len(res.Events))
for i := range res.Events {
sdkEvents[i] = sdk.Event(res.Events[i])
}
events = append(events, sdkEvents...)
}
return nil
return
}

func (h DefaultMessageHandler) handleSdkMessage(ctx sdk.Context, contractAddr sdk.Address, msg sdk.Msg) error {
func (h DefaultMessageHandler) handleSdkMessage(ctx sdk.Context, contractAddr sdk.Address, msg sdk.Msg) (*sdk.Result, error) {
if err := msg.ValidateBasic(); err != nil {
return err
return nil, err
}
// make sure this account can send it
for _, acct := range msg.GetSigners() {
if !acct.Equals(contractAddr) {
return sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "contract doesn't have permission")
return nil, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "contract doesn't have permission")
}
}

// find the handler and execute it
handler := h.router.Route(ctx, msg.Route())
if handler == nil {
return sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, msg.Route())
return nil, sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, msg.Route())
}
res, err := handler(ctx, msg)
if err != nil {
return err
}

events := make(sdk.Events, len(res.Events))
for i := range res.Events {
events[i] = sdk.Event(res.Events[i])
return nil, err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)

return nil
return res, nil
}

func convertWasmCoinsToSdkCoins(coins []wasmvmtypes.Coin) (sdk.Coins, error) {
Expand Down
209 changes: 190 additions & 19 deletions x/wasm/internal/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
abci "github.com/tendermint/tendermint/abci/types"
"path/filepath"

"github.com/CosmWasm/wasmd/x/wasm/internal/types"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Option interface {
}

type messenger interface {
Dispatch(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msgs ...wasmvmtypes.CosmosMsg) error
DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error)
}

// Keeper will have a reference to Wasmer with it's own data directory.
Expand Down Expand Up @@ -220,7 +221,7 @@ func (k Keeper) Instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.A

func (k Keeper) instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.AccAddress, initMsg []byte, label string, deposit sdk.Coins, authZ AuthorizationPolicy) (sdk.AccAddress, []byte, error) {
if !k.IsPinnedCode(ctx, codeID) {
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: init")
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: instantiate")
}

// create contract address
Expand Down Expand Up @@ -301,13 +302,14 @@ func (k Keeper) instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.A
contractInfo.IBCPortID = ibcPort
}

// store contract before dispatch so that contract could be called back
k.storeContractInfo(ctx, contractAddress, &contractInfo)
k.appendToContractHistory(ctx, contractAddress, contractInfo.InitialHistory(initMsg))

// then dispatch so that contract could be called back
err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, nil, err
return nil, nil, sdkerrors.Wrap(err, "dispatch")
}

return contractAddress, res.Data, nil
Expand Down Expand Up @@ -352,9 +354,10 @@ func (k Keeper) Execute(ctx sdk.Context, contractAddress sdk.AccAddress, caller
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, err
return nil, sdkerrors.Wrap(err, "dispatch")
}

return &sdk.Result{
Expand Down Expand Up @@ -426,8 +429,9 @@ func (k Keeper) migrate(ctx sdk.Context, contractAddress sdk.AccAddress, caller
k.appendToContractHistory(ctx, contractAddress, historyEntry)
k.storeContractInfo(ctx, contractAddress, contractInfo)

// then dispatch
if err := k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages); err != nil {
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}

Expand Down Expand Up @@ -464,7 +468,50 @@ func (k Keeper) Sudo(ctx sdk.Context, contractAddress sdk.AccAddress, msg []byte
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}

return &sdk.Result{
Data: res.Data,
}, nil
}

// reply is only called from keeper internal functions (dispatchSubmessages) after processing the submessage
// it
func (k Keeper) reply(ctx sdk.Context, contractAddress sdk.AccAddress, reply wasmvmtypes.Reply) (*sdk.Result, error) {
contractInfo, codeInfo, prefixStore, err := k.contractInstance(ctx, contractAddress)
if err != nil {
return nil, err
}

// current thought is to charge gas like a fresh run, we can revisit whether to give it a discount later
if !k.IsPinnedCode(ctx, contractInfo.CodeID) {
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: reply")
}

env := types.NewEnv(ctx, contractAddress)

// prepare querier
querier := QueryHandler{
Ctx: ctx,
Plugins: k.queryPlugins,
}
gas := gasForContract(ctx)
res, gasUsed, execErr := k.wasmer.Reply(codeInfo.CodeHash, env, reply, prefixStore, cosmwasmAPI, querier, gasMeter(ctx), gas)
consumeGas(ctx, gasUsed)
if execErr != nil {
return nil, sdkerrors.Wrap(types.ErrExecuteFailed, execErr.Error())
}

// emit all events from this contract itself
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}
Expand Down Expand Up @@ -678,15 +725,6 @@ func (k Keeper) GetByteCode(ctx sdk.Context, codeID uint64) ([]byte, error) {
return k.wasmer.GetCode(codeInfo.CodeHash)
}

func (k Keeper) dispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
if err := k.messenger.Dispatch(ctx, contractAddr, ibcPort, msg); err != nil {
return err
}
}
return nil
}

// PinCode pins the wasm contract in wasmvm cache
func (k Keeper) PinCode(ctx sdk.Context, codeID uint64) error {
codeInfo := k.GetCodeInfo(ctx, codeID)
Expand Down Expand Up @@ -740,6 +778,139 @@ func (k Keeper) InitializePinnedCodes(ctx sdk.Context) error {
return nil
}

func (k Keeper) dispatchAll(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, subMsgs []wasmvmtypes.SubMsg, msgs []wasmvmtypes.CosmosMsg) error {
// first dispatch all submessages (and the replies).
err := k.dispatchSubmessages(ctx, contractAddr, ibcPort, subMsgs)
if err != nil {
return err
}
// then dispatch all the normal messages
return k.dispatchMessages(ctx, contractAddr, ibcPort, msgs)
}

func (k Keeper) dispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
events, _, err := k.messenger.DispatchMsg(ctx, contractAddr, ibcPort, msg)
if err != nil {
return err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)
}
return nil
}

func (k Keeper) dispatchMsgWithGasLimit(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msg wasmvmtypes.CosmosMsg, gasLimit uint64) (events []sdk.Event, data [][]byte, err error) {
limitedMeter := sdk.NewGasMeter(gasLimit)
subCtx := ctx.WithGasMeter(limitedMeter)

// catch out of gas panic and just charge the entire gas limit
defer func() {
if r := recover(); r != nil {
// if it's not an OutOfGas error, raise it again
if _, ok := r.(sdk.ErrorOutOfGas); !ok {
// log it to get the original stack trace somewhere (as panic(r) keeps message but stacktrace to here
k.Logger(ctx).Info("SubMsg rethrowing panic: %#v", r)
panic(r)
}
ctx.GasMeter().ConsumeGas(gasLimit, "Sub-Message OutOfGas panic")
err = sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "SubMsg hit gas limit")
}
}()
events, data, err = k.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg)

// make sure we charge the parent what was spent
spent := subCtx.GasMeter().GasConsumed()
ctx.GasMeter().ConsumeGas(spent, "From limited Sub-Message")

return events, data, err
}

// dispatchSubmessages builds a sandbox to execute these messages and returns the execution result to the contract
// that dispatched them, both on success as well as failure
func (k Keeper) dispatchSubmessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.SubMsg) error {
for _, msg := range msgs {
// first, we build a sub-context which we can use inside the submessages
subCtx, commit := ctx.CacheContext()

// check how much gas left locally, optionally wrap the gas meter
gasRemaining := ctx.GasMeter().Limit() - ctx.GasMeter().GasConsumed()
limitGas := msg.GasLimit != nil && (*msg.GasLimit < gasRemaining)

var err error
var events []sdk.Event
var data [][]byte
if limitGas {
events, data, err = k.dispatchMsgWithGasLimit(subCtx, contractAddr, ibcPort, msg.Msg, *msg.GasLimit)
} else {
events, data, err = k.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg.Msg)
}

// if it succeeds, commit state changes from submessage, and pass on events to Event Manager
if err == nil {
commit()
ctx.EventManager().EmitEvents(events)
}
// on failure, revert state from sandbox, and ignore events (just skip doing the above)

var result wasmvmtypes.SubcallResult
if err == nil {
// just take the first one for now if there are multiple sub-sdk messages
// and safely return nothing if no data
var responseData []byte
if len(data) > 0 {
responseData = data[0]
}
result = wasmvmtypes.SubcallResult{
Ok: &wasmvmtypes.SubcallResponse{
Events: sdkEventsToWasmVmEvents(events),
Data: responseData,
},
}
} else {
result = wasmvmtypes.SubcallResult{
Err: err.Error(),
}
}

// now handle the reply, we use the parent context, and abort on error
reply := wasmvmtypes.Reply{
ID: msg.ID,
Result: result,
}

// we can ignore any result returned as there is nothing to do with the data
// and the events are already in the ctx.EventManager()
_, err = k.reply(ctx, contractAddr, reply)
if err != nil {
return err
}
}
return nil
}

func sdkEventsToWasmVmEvents(events []sdk.Event) []wasmvmtypes.Event {
res := make([]wasmvmtypes.Event, len(events))
for i, ev := range events {
res[i] = wasmvmtypes.Event{
Type: ev.Type,
Attributes: sdkAttributesToWasmVmAttributes(ev.Attributes),
}
}
return res
}

func sdkAttributesToWasmVmAttributes(attrs []abci.EventAttribute) []wasmvmtypes.EventAttribute {
res := make([]wasmvmtypes.EventAttribute, len(attrs))
for i, attr := range attrs {
res[i] = wasmvmtypes.EventAttribute{
Key: string(attr.Key),
Value: string(attr.Value),
}
}
return res
}

func gasForContract(ctx sdk.Context) uint64 {
meter := ctx.GasMeter()
if meter.IsOutOfGas() {
Expand Down
Loading

0 comments on commit 78d5581

Please sign in to comment.