Skip to content

Commit

Permalink
Merge pull request #21 from anycable/chore/upgrade-anycable-redis
Browse files Browse the repository at this point in the history
AnyCable Rails 1.3.0 and  Redis 5.0 compatibility
  • Loading branch information
palkan committed Mar 11, 2022
2 parents 495e550 + 8d61c28 commit 042a8d0
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 76 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ jobs:
graphql: '~> 1.12.0'
client_id: 'false'
interpreter: yes
anycable_rails: '~> 1.2'
- ruby: 2.7
graphql: '~> 1.12.0'
client_id: 'false'
interpreter: yes
anycable_rails: '~> 1.2.0'
- ruby: 2.6
graphql: '~> 1.11.0'
client_id: 'true'
interpreter: no
anycable_rails: '~> 1.0'
container:
image: ruby:${{ matrix.ruby }}
env:
CI: true
GRAPHQL_RUBY_VERSION: ${{ matrix.graphql }}
ANYCABLE_RAILS_VERSION: ${{ matrix.anycable_rails }}
GRAPHQL_ANYCABLE_USE_CLIENT_PROVIDED_UNIQ_ID: ${{ matrix.client_id }}
steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ gemspec

gem "graphql", ENV.fetch("GRAPHQL_RUBY_VERSION", "~> 1.12")
gem "anycable", ENV.fetch("ANYCABLE_VERSION", "~> 1.0")
gem "anycable-rails", github: "anycable/anycable-rails"
gem "anycable-rails", ENV.fetch("ANYCABLE_RAILS_VERSION", "~> 1.2")

group :development, :test do
gem "pry"
Expand Down
32 changes: 16 additions & 16 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def execute_all(event, object)
return if fingerprints.empty?

fingerprint_subscription_ids = Hash[fingerprints.zip(
redis.pipelined do
redis.pipelined do |pipeline|
fingerprints.map do |fingerprint|
redis.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
pipeline.smembers(SUBSCRIPTIONS_PREFIX + fingerprint)
end
end
)]
Expand Down Expand Up @@ -160,16 +160,16 @@ def write_subscription(query, events)
events: events.map { |e| [e.topic, e.fingerprint] }.to_h.to_json,
}

redis.multi do
redis.sadd(CHANNEL_PREFIX + channel_uniq_id, subscription_id)
redis.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data)
redis.multi do |pipeline|
pipeline.sadd(CHANNEL_PREFIX + channel_uniq_id, subscription_id)
pipeline.mapped_hmset(SUBSCRIPTION_PREFIX + subscription_id, data)
events.each do |event|
redis.zincrby(FINGERPRINTS_PREFIX + event.topic, 1, event.fingerprint)
redis.sadd(SUBSCRIPTIONS_PREFIX + event.fingerprint, subscription_id)
pipeline.zincrby(FINGERPRINTS_PREFIX + event.topic, 1, event.fingerprint)
pipeline.sadd(SUBSCRIPTIONS_PREFIX + event.fingerprint, subscription_id)
end
next unless config.subscription_expiration_seconds
redis.expire(CHANNEL_PREFIX + channel_uniq_id, config.subscription_expiration_seconds)
redis.expire(SUBSCRIPTION_PREFIX + subscription_id, config.subscription_expiration_seconds)
pipeline.expire(CHANNEL_PREFIX + channel_uniq_id, config.subscription_expiration_seconds)
pipeline.expire(SUBSCRIPTION_PREFIX + subscription_id, config.subscription_expiration_seconds)
end
end

Expand All @@ -191,19 +191,19 @@ def delete_subscription(subscription_id)
events = redis.hget(SUBSCRIPTION_PREFIX + subscription_id, :events)
events = events ? JSON.parse(events) : {}
fingerprint_subscriptions = {}
redis.pipelined do
redis.pipelined do |pipeline|
events.each do |topic, fingerprint|
redis.srem(SUBSCRIPTIONS_PREFIX + fingerprint, subscription_id)
score = redis.zincrby(FINGERPRINTS_PREFIX + topic, -1, fingerprint)
pipeline.srem(SUBSCRIPTIONS_PREFIX + fingerprint, subscription_id)
score = pipeline.zincrby(FINGERPRINTS_PREFIX + topic, -1, fingerprint)
fingerprint_subscriptions[FINGERPRINTS_PREFIX + topic] = score
end
# Delete subscription itself
redis.del(SUBSCRIPTION_PREFIX + subscription_id)
pipeline.del(SUBSCRIPTION_PREFIX + subscription_id)
end
# Clean up fingerprints that doesn't have any subscriptions left
redis.pipelined do
redis.pipelined do |pipeline|
fingerprint_subscriptions.each do |key, score|
redis.zremrangebyscore(key, '-inf', '0') if score.value.zero?
pipeline.zremrangebyscore(key, '-inf', '0') if score.value.zero?
end
end
delete_legacy_subscription(subscription_id)
Expand Down Expand Up @@ -252,7 +252,7 @@ def read_subscription_id(channel)
end

def write_subscription_id(channel, val)
channel.connection.socket.istate["sid"] = val
channel.connection.anycable_socket.istate["sid"] = val
channel.instance_variable_set(:@__sid__, val)
end

Expand Down
2 changes: 1 addition & 1 deletion spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

let(:channel) do
socket = double("Socket", istate: AnyCable::Socket::State.new({}))
connection = double("Connection", socket: socket)
connection = double("Connection", anycable_socket: socket)
double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection)
end

Expand Down
2 changes: 1 addition & 1 deletion spec/graphql/broadcast_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def subscribe(query)

let(:channel) do
socket = double("Socket", istate: AnyCable::Socket::State.new({}))
connection = double("Connection", socket: socket)
connection = double("Connection", anycable_socket: socket)
double("Channel", connection: connection)
end

Expand Down
2 changes: 2 additions & 0 deletions spec/integration_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def stream_from(broadcasting)
attr_reader :request, :socket, :identifiers, :subscriptions,
:schema

alias anycable_socket socket

def initialize(socket, identifiers: nil, subscriptions: nil)
@socket = socket
@identifiers = identifiers ? JSON.parse(identifiers) : {}
Expand Down
21 changes: 16 additions & 5 deletions spec/integrations/rails_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

require "integration_helper"
require "rails"
require "action_cable/engine"

# Stub Rails.root for Anyway Config
module Rails
Expand All @@ -13,12 +14,15 @@ def self.root
end

require "anycable-rails"
require "anycable/rails/channel_state"
require "anycable/rails/actioncable/connection"

# Load server to trigger load hooks
require "action_cable/server"
require "action_cable/server/base"
# Only for anycable-rails <1.3.0
unless defined?(::AnyCable::Rails::Connection)
require "anycable/rails/channel_state"
require "anycable/rails/actioncable/connection"
end

module ApplicationCable
class Connection < ActionCable::Connection::Base
Expand Down Expand Up @@ -68,9 +72,16 @@ def context
let(:schema) { BroadcastSchema }
let(:channel_class) { "ApplicationCable::GraphqlChannel" }

before do
allow(AnyCable).to receive(:connection_factory)
.and_return(->(socket, **options) { ApplicationCable::Connection.call(socket, **options) })
if defined?(::AnyCable::Rails::Connection)
before do
allow(AnyCable).to receive(:connection_factory)
.and_return(->(socket, **options) { ::AnyCable::Rails::Connection.new(ApplicationCable::Connection, socket, **options) })
end
else
before do
allow(AnyCable).to receive(:connection_factory)
.and_return(->(socket, **options) { ApplicationCable::Connection.call(socket, **options) })
end
end

let(:variables) { {id: "a"} }
Expand Down
1 change: 0 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
end

require_relative "support/graphql_schema"
require_relative "support/graphql_schema_broadcast"

RSpec.configure do |config|
# Enable flags like --only-failures and --next-failure
Expand Down
52 changes: 46 additions & 6 deletions spec/support/graphql_schema.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# frozen_string_literal: true

class Product < GraphQL::Schema::Object
field :id, ID, null: false, hash_key: :id
field :title, String, null: true, hash_key: :title
end


POSTS = [
{ id: "a", title: "GraphQL is good?", actions: %w[yes no] },
{ id: "b", title: "Is there life after GraphQL?", actions: %w[no still-no] }
].freeze

class Product < GraphQL::Schema::Object
field :id, ID, null: false, hash_key: :id
field :title, String, null: true, hash_key: :title
end

class Post < GraphQL::Schema::Object
field :id, ID, null: false, broadcastable: true
Expand Down Expand Up @@ -54,3 +52,45 @@ class AnycableSchema < GraphQL::Schema

subscription SubscriptionType
end

return unless TESTING_GRAPHQL_RUBY_INTERPRETER # Broadcast requires interpreter

module Interpreted
class Post < GraphQL::Schema::Object
field :id, ID, null: false, broadcastable: true
field :title, String, null: true
field :actions, [String], null: false, broadcastable: false
end

class PostCreated < GraphQL::Schema::Subscription
payload_type Post
end


class PostUpdated < GraphQL::Schema::Subscription
argument :id, ID, required: true

field :post, Post, null: false

def subscribe(id:)
{post: POSTS.find { |post| post[:id] == id }}
end

def update(*)
{post: object}
end
end

class BroadcastSubscriptionType < GraphQL::Schema::Object
field :post_created, subscription: PostCreated
field :post_updated, subscription: PostUpdated
end
end

class BroadcastSchema < GraphQL::Schema
use GraphQL::Execution::Interpreter
use GraphQL::Analysis::AST
use GraphQL::AnyCable, broadcast: true, default_broadcastable: true

subscription Interpreted::BroadcastSubscriptionType
end
45 changes: 0 additions & 45 deletions spec/support/graphql_schema_broadcast.rb

This file was deleted.

0 comments on commit 042a8d0

Please sign in to comment.