Commit 63cdd93a authored by Maxwell Salzberg's avatar Maxwell Salzberg

the websocket has been completely removed, and replaced for a stub until

we find a better way to reimpliment it in a  more sane way
parent 9b05e80e
web: bundle exec rails s thin -p $PORT
redis: redis-server
websocket: ruby script/websocket_server.rb
worker: QUEUE=* bundle exec rake resque:work
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
class SocketsController < ApplicationController
include ApplicationHelper
include SocketsHelper
include Rails.application.routes.url_helpers
helper_method :all_aspects
helper_method :current_user
helper_method 'all_comments?'
def incoming(msg)
Rails.logger.info("Socket received connection to: #{msg}")
end
def outgoing(user_or_id, object, opts={})
#this should be the actual params of the controller
@params = {:user_or_id => user_or_id, :object => object}.merge(opts)
return unless Diaspora::WebSocket.is_connected?(user_id)
@_request = SocketRequest.new({})
Diaspora::WebSocket.queue_to_user(user_id, action_hash(user, object, opts))
end
def user_id
if @params[:user_or_id].instance_of?(Fixnum)
@user_id ||= @params[:user_or_id]
else
@user_id ||= @params[:user_or_id].id
end
end
def user
@user ||= ((@params[:user_or_id].instance_of? User )? @params[:user_or_id] : User.find(user_id))
end
def current_user
user
end
def url_options
{:host => "#{AppConfig[:pod_uri].host}:#{AppConfig[:pod_uri].port}"}
end
def all_aspects
@all_aspects ||= user.aspects
end
# Override of CommentsHelper#all_comments? .
def all_comments?
false
end
class SocketRequest < ActionDispatch::Request
def format
'socket'
end
end
end
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module SocketsHelper
include ApplicationHelper
include NotificationsHelper
def obj_id(object)
if object.respond_to?(:post_id)
object.post_id
elsif object.respond_to?(:post_guid)
object.post_guid
else
object.id
end
end
def action_hash(user, object, opts={})
uid = user.id
begin
unless user.nil?
old_locale = I18n.locale
I18n.locale = user.language.to_s
end
if object.is_a? StatusMessage
post_hash = {:post => object,
:author => object.author,
:photos => object.photos,
:reshare => nil,
:comments => object.comments.map{|c|
{:comment => c,
:author => c.author
}
}
}
v = render_to_string(:partial => 'shared/stream_element', :locals => post_hash)
elsif object.is_a? Person
person_hash = {
:single_aspect_form => opts["single_aspect_form"],
:person => object,
:contact => user.contact_for(object)
}
v = render_to_string(:partial => 'people/person', :locals => person_hash)
elsif object.is_a? Comment
v = render_to_string(:partial => 'comments/comment', :locals => {:post => object.post, :comment => object, :person => object.author})
elsif object.is_a? Like
v = render_to_string(:partial => 'likes/likes', :locals => {:likes => object.target.likes.includes(:author => :profile)})
elsif object.is_a? Notification
v = render_to_string(:partial => 'notifications/popup', :locals => {:note => object, :person => opts[:actor]})
else
raise "#{object.inspect} with class #{object.class} is not actionhashable." unless object.is_a? Retraction
end
rescue Exception => e
Rails.logger.error(":event => :socket_render, :status => :fail, :user => #{user.diaspora_handle}, :object=> #{object.id}, :object_class => #{object.class}, :error_message => #{e.message}")
raise e
end
action_hash = {:class =>object.class.to_s.underscore.pluralize, :html => v, :post_id => obj_id(object)}
action_hash.merge! opts
if object.is_a? Photo
action_hash[:photo_hash] = object.thumb_hash
end
if object.is_a? Comment
post = object.post
action_hash[:comment_guid] = object.guid
action_hash[:my_post?] = (post.author.owner_id == uid)
action_hash[:post_guid] = post.guid
end
if object.is_a? Like
action_hash[:post_guid] = object.post.guid
end
action_hash[:mine?] = object.author && (object.author.owner_id == uid) if object.respond_to?(:author)
I18n.locale = old_locale unless user.nil?
action_hash.to_json
end
end
......@@ -3,7 +3,6 @@
# the COPYRIGHT file.
class ActivityStreams::Photo < Post
include Diaspora::Socketable
xml_name self.name.underscore.gsub!('/', '-')
xml_attr :image_url
......@@ -19,18 +18,6 @@ class ActivityStreams::Photo < Post
:actor_url,
:objectId
# This wrapper around {Diaspora::Socketable#socket_to_user} adds aspect_ids to opts if they are not there.
def socket_to_user(user_or_id, opts={})
unless opts[:aspect_ids]
user_id = user_or_id.instance_of?(Fixnum) ? user_or_id : user_or_id.id
aspect_ids = AspectMembership.connection.select_values(
AspectMembership.joins(:contact).where(:contacts => {:user_id => user_id, :person_id => self.author_id}).select('aspect_memberships.aspect_id').to_sql
)
opts.merge!(:aspect_ids => aspect_ids)
end
super(user_or_id, opts)
end
# This creates a new ActivityStreams::Photo from a json hash.
# Right now, it is only used by Cubbi.es, but there will be objects for all the AS types.
# @param [Hash] json An {http://www.activitystrea.ms ActivityStreams} compliant (we hope!) json hash.
......
......@@ -3,14 +3,12 @@
# the COPYRIGHT file.
class Comment < ActiveRecord::Base
require File.join(Rails.root, 'lib/diaspora/web_socket')
include ROXML
include Diaspora::Webhooks
include Diaspora::Guid
include Diaspora::Relayable
include Diaspora::Socketable
include Diaspora::Taggable
include Diaspora::Likeable
......
......@@ -69,7 +69,7 @@ class Contact < ActiveRecord::Base
def receive_shareable(shareable)
ShareVisibility.create!(:shareable_id => shareable.id, :shareable_type => shareable.class.base_class.to_s, :contact_id => self.id)
shareable.socket_to_user(self.user, :aspect_ids => self.aspect_ids) if shareable.respond_to? :socket_to_user
Diaspora::Websocket.to(self.user, :aspect_ids => self.aspect_ids).socket(shareable)
end
def contacts
......
......@@ -12,13 +12,13 @@ module Jobs
finger = Webfinger.new(account)
begin
result = finger.fetch
result.socket_to_user(user_id, opts)
Diaspora::Websocket.to(user_id).socket(opts)
rescue
Diaspora::WebSocket.queue_to_user(user_id,
Diaspora::Websocket.to(user_id).socket(
{:class => 'people',
:status => 'fail',
:query => account,
:response => I18n.t('people.webfinger.fail', :handle => account)}.to_json)
:response => I18n.t('people.webfinger.fail', :handle => account)})
end
end
end
......
......@@ -3,7 +3,6 @@
# the COPYRIGHT file.
class Like < ActiveRecord::Base
require File.join(Rails.root, 'lib/diaspora/web_socket')
include ROXML
include Diaspora::Webhooks
......@@ -11,7 +10,6 @@ class Like < ActiveRecord::Base
xml_attr :target_type
include Diaspora::Relayable
include Diaspora::Socketable
xml_attr :positive
xml_attr :diaspora_handle
......
......@@ -3,9 +3,6 @@
# the COPYRIGHT file.
#
class Notification < ActiveRecord::Base
require File.join(Rails.root, 'lib/diaspora/web_socket')
include Diaspora::Socketable
belongs_to :recipient, :class_name => 'User'
has_many :notification_actors, :dependent => :destroy
has_many :actors, :class_name => 'Person', :through => :notification_actors, :source => :person
......@@ -26,8 +23,8 @@ class Notification < ActiveRecord::Base
n = note_type.make_notification(recipient, target, actor, note_type)
end
if n
n.email_the_user(target, actor) if n
n.socket_to_user(recipient, :actor => actor) if n
n.email_the_user(target, actor)
Diaspora::Websocket.to(recipient, :actor => actor).socket(n)
n
else
nil
......
......@@ -8,8 +8,6 @@ require File.join(Rails.root, 'lib/hcard')
class Person < ActiveRecord::Base
include ROXML
include Encryptor::Public
require File.join(Rails.root, 'lib/diaspora/web_socket')
include Diaspora::Socketable
include Diaspora::Guid
xml_attr :diaspora_handle
......
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# t
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
......
......@@ -43,7 +43,8 @@ class Retraction
def perform receiving_user
Rails.logger.debug "Performing retraction for #{post_guid}"
self.target.unsocket_from_user receiving_user if target.respond_to? :unsocket_from_user
Diaspora::Websocket.to(receiving_user).retract(self.target)
self.target.destroy if self.target
Rails.logger.info("event=retraction status=complete type=#{self.type} guid=#{self.post_guid}")
end
......
......@@ -79,7 +79,7 @@ class SignedRetraction
Postzord::Dispatcher.build(receiving_user, onward_retraction).post
end
if target
self.target.unsocket_from_user receiving_user if target.respond_to? :unsocket_from_user
Diaspora::Websocket.to(receiving_user).retract(self.target)
self.target.destroy
end
Rails.logger.info("event=retraction status =complete target_type=#{self.target_type} guid =#{self.target_guid}")
......
......@@ -3,7 +3,6 @@
# the COPYRIGHT file.
class StatusMessage < Post
include Diaspora::Socketable
include Diaspora::Taggable
include ActionView::Helpers::TextHelper
......@@ -143,17 +142,6 @@ class StatusMessage < Post
XML
end
def socket_to_user(user_or_id, opts={})
unless opts[:aspect_ids]
user_id = user_or_id.instance_of?(Fixnum) ? user_or_id : user_or_id.id
aspect_ids = AspectMembership.connection.select_values(
AspectMembership.joins(:contact).where(:contacts => {:user_id => user_id, :person_id => self.author_id}).select('aspect_memberships.aspect_id').to_sql
)
opts.merge!(:aspect_ids => aspect_ids)
end
super(user_or_id, opts)
end
def after_dispatch sender
unless self.photos.empty?
self.photos.update_all(:pending => false, :public => self.public)
......
......@@ -214,7 +214,10 @@ class User < ActiveRecord::Base
end
def add_to_streams(post, aspects_to_insert)
post.socket_to_user(self, :aspect_ids => aspects_to_insert.map{|x| x.id}) if post.respond_to? :socket_to_user
inserted_aspect_ids = aspects_to_insert.map{|x| x.id}
Diaspora::Websocket.to(self, :aspect_ids => inserted_aspect_ids ).socket(post)
aspects_to_insert.each do |aspect|
aspect << post
end
......
module Diaspora
class Websocket
def initialize(*args)
end
def self.to(*args)
w = Websocket.new(*args)
w
end
def send(object)
end
def retract(object)
end
alias :socket :send
alias :unsocket :retract
end
end
......@@ -46,10 +46,6 @@
:javascript
Diaspora.I18n.loadLocale(#{get_javascript_strings_for(I18n.locale).to_json}, "#{I18n.locale}");
Diaspora.Page = "#{params[:controller].camelcase}#{params[:action].camelcase}";
- if current_user
= include_javascripts :flash_socket #unless modern_browser?
= javascript_include_tag 'web-socket-receiver'
= render 'js/websocket_js'
= yield(:head)
......
......@@ -6,5 +6,6 @@ module Diaspora
autoload :Parser
autoload :Webhooks
autoload :Websocket
end
......@@ -67,7 +67,7 @@ module Diaspora
Postzord::Dispatcher.build(user, comment_or_like).post
end
comment_or_like.socket_to_user(user) if comment_or_like.respond_to? :socket_to_user
Diaspora::Websocket.to(user).socket(comment_or_like)
if comment_or_like.after_receive(user, person)
comment_or_like
......
......@@ -4,7 +4,6 @@
module Diaspora
module Shareable
require File.join(Rails.root, 'lib/diaspora/web_socket')
include Diaspora::Webhooks
def self.included(model)
......
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Diaspora
module WebSocket
REDIS_CONNECTION_SET = 'ws-uids'
def self.redis
@redis ||= Resque.redis
end
def self.length
redis.llen :websocket
end
def self.queue_to_user(uid, data)
redis.lpush(:websocket, {:uid => uid, :data => data}.to_json)
end
def self.initialize_channels
@channels = {}
end
def self.next
redis.rpop(:websocket)
end
def self.push_to_user(uid, data)
Rails.logger.debug "event=socket-push uid=#{uid}"
@channels[uid][0].push(data) if @channels[uid]
end
def self.subscribe(uid, ws)
Rails.logger.info "event=socket-subscribe uid=#{uid} channels=#{self.length}"
self.ensure_channel(uid)
@channels[uid][0].subscribe{ |msg| ws.send msg }
@channels[uid][1] += 1
redis.sadd(REDIS_CONNECTION_SET, uid)
end
def self.ensure_channel(uid)
@channels[uid] ||= [EM::Channel.new, 0 ]
end
def self.unsubscribe(uid,sid)
Rails.logger.info "event=socket-unsubscribe sid=#{sid} uid=#{uid} channels=#{self.length}"
@channels[uid][0].unsubscribe(sid) if @channels[uid]
@channels[uid][1] -= 1
if @channels[uid][1] <= 0
@channels.delete(uid)
redis.srem(REDIS_CONNECTION_SET, uid)
end
end
def self.is_connected?(uid)
redis.sismember(REDIS_CONNECTION_SET, uid)
end
end
module Socketable
def socket_to_user(user_or_id, opts={})
begin
SocketsController.new.outgoing(user_or_id, self, opts)
rescue
nil
end
end
def unsocket_from_user(user_or_id, opts={})
begin
SocketsController.new.outgoing(user_or_id, Retraction.for(self), opts)
rescue
nil
end
end
end
end
......@@ -152,10 +152,8 @@ class Postzord::Dispatcher
# @param services [Array<User>]
def socket_to_users(users)
return unless users.present? && @object.respond_to?(:socket_to_user)
users.each do |user|
@object.socket_to_user(user)
end
return unless users.present?
Diaspora::Websocket.to(users).socket(@object)
end
end
......@@ -21,7 +21,7 @@ class Postzord::Receiver::LocalBatch < Postzord::Receiver
notify_mentioned_users if @object.respond_to?(:mentions)
# 09/27/11 this is slow
#socket_to_users if @object.respond_to?(:socket_to_user)
socket_to_users
notify_users
true
......@@ -69,9 +69,7 @@ class Postzord::Receiver::LocalBatch < Postzord::Receiver
# Issue websocket requests to all specified recipients
# @return [void]
def socket_to_users
@users.each do |user|
@object.socket_to_user(user)
end
Diaspora::Websocket.to(@users).socket(@object)
end
# Notify users of the new object
......
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
require 'spec_helper'
SocketsController.class_eval <<-EOT
def url_options
{:host => ""}
end
EOT
describe SocketsController do
before do
@user = alice
@controller = SocketsController.new
@aspect = @user.aspects.first
@message = @user.post :status_message, :text => "post through user for victory", :to => @aspect.id
end
describe 'actionhash' do
it 'actionhashes posts' do
@controller.instance_variable_set(:@params, {:user_or_id => @user, :object => @message})
json = @controller.action_hash(@user, @message)
json.include?(@message.text).should be_true
json.include?('status_message').should be_true
end
it 'actionhashes retractions' do
retraction = Retraction.for @message
json = @controller.action_hash(@user, retraction)
json.include?('retraction').should be_true
json.include?("html\":null").should be_true
end
end
describe '#outgoing' do
it 'calls queue_to_user' do
Diaspora::WebSocket.should_receive(:is_connected?).with(@user.id).and_return(true)
Diaspora::WebSocket.should_receive(:queue_to_user).with(@user.id, anything)
@controller.outgoing(@user.id, @message)
end
it 'does not call queue_to_user if the user is not connected' do
Diaspora::WebSocket.should_receive(:is_connected?).with(@user.id).and_return(false)
Diaspora::WebSocket.should_not_receive(:queue_to_user)
@controller.outgoing(@user.id, @message)
end
it 'takes a user or an id' do
Diaspora::WebSocket.should_receive(:is_connected?).with(@user.id).and_return(false)
Diaspora::WebSocket.should_not_receive(:queue_to_user)
@controller.outgoing(@user, @message)
end
end
end
......@@ -23,8 +23,8 @@ describe 'a user receives a post' do
contact = alice.contact_for(bob.person)
alice.add_contact_to_aspect(contact, alice.aspects.create(:name => "villains"))
status = bob.build_post(:status_message, :text => "Users do things", :to => @bobs_aspect.id)
Diaspora::WebSocket.stub!(:is_connected?).and_return(true)
Diaspora::WebSocket.should_receive(:queue_to_user).exactly(:once)
Diaspora::Websocket.stub!(:is_connected?).and_return(true)
Diaspora::Websocket.should_receive(:to).exactly(:once).and_return(stub.as_null_object)
zord = Postzord::Receiver::Private.new(alice, :object => status, :person => bob.person)
zord.receive_object
end
......
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
require 'spec_helper'
require File.join(Rails.root, 'lib/diaspora/web_socket')
describe Diaspora::WebSocket do
before do
@mock_redis = mock()
Diaspora::WebSocket.stub(:redis).and_return @mock_redis
end
describe '.next' do
it 'pops the data from redis' do
@mock_redis.should_receive(:rpop).with(:websocket)
Diaspora::WebSocket.next
end
end
describe '.queue_to_user' do
it 'push the data into redis' do
@mock_redis.should_receive(:lpush).with(:websocket, {:uid => "me", :data => "Socket!"}.to_json)
Diaspora::WebSocket.queue_to_user("me", "Socket!")
end
end
describe '.subscribe' do
it 'adds the uid to the uid redis set' do
Diaspora::WebSocket.stub!(:length)
Diaspora::WebSocket.initialize_channels
@mock_redis.should_receive(:sadd).with(Diaspora::WebSocket::REDIS_CONNECTION_SET, alice.id)
Diaspora::WebSocket.subscribe(alice.id, mock())
end
end
describe '.unsubscribe' do
it 'removes the uid to the uid redis set' do
Diaspora::WebSocket.stub!(:length)
Diaspora::WebSocket.initialize_channels
@mock_redis.stub!(:sadd)
Diaspora::WebSocket.subscribe(alice.id, mock())
@mock_redis.should_receive(:srem).with(Diaspora::WebSocket::REDIS_CONNECTION_SET, alice.id)
Diaspora::WebSocket.unsubscribe(alice.id, mock())
end
end
describe '.is_connected?' do
it 'calls sismember' do
@mock_redis.should_receive(:sismember).with(Diaspora::WebSocket::REDIS_CONNECTION_SET, alice.id)
Diaspora::WebSocket.is_connected?(alice.id)
end
end
end
describe Diaspora::Socketable do
before do
@user = alice
@aspect = @user.aspects.first
@post = @user.build_post(:status_message, :text => "hey", :to => @aspect.id)
@post.save
end
it 'sockets to a user' do
Diaspora::WebSocket.should_receive(:is_connected?).with(@user.id).and_return(true)
Diaspora::WebSocket.should_receive(:queue_to_user)
@post.socket_to_user(@user, :aspect_ids => @aspect.id)
end
it 'no-ops if redis isnt present' do
Diaspora::WebSocket.stub(:redis).and_return(nil)
lambda {
@post.socket_to_user(@user, :aspect_ids => @aspect.id)
}.should_not raise_error
end
end
......@@ -349,7 +349,7 @@ describe Postzord::Dispatcher do
describe '#socket_to_users' do
it 'calls socket_to_user given users' do
@zord.object.should_receive(:socket_to_user).with(bob)
Diaspora::Websocket.should_receive(:to).and_return(stub.as_null_object)
@zord.send(:socket_to_users, [bob])
end
end
......
......@@ -43,9 +43,7 @@ describe Postzord::Receiver::LocalBatch do
describe '#socket_to_users' do
it 'sockets to users' do
receiver.users.each do |user|
@object.should_receive(:socket_to_user).with(user)
end
Diaspora::Websocket.should_receive(:to).with(receiver.users).and_return(stub.as_null_object)
receiver.socket_to_users
end
end
......
......@@ -22,7 +22,7 @@ describe Jobs::SocketWebfinger do
person = Factory.create(:person)
finger.stub(:fetch).and_return(person)
person.should_receive(:socket_to_user).with(@user.id, {})
Diaspora::Websocket.should_receive(:to).with(@user.id).and_return(stub.as_null_object)
Jobs::SocketWebfinger.perform(@user.id, @account)
end
it 'Passes opts through on success' do
......@@ -32,7 +32,8 @@ describe Jobs::SocketWebfinger do
finger.stub(:fetch).and_return(person)
opts = {:symbol => true}
person.should_receive(:socket_to_user).with(@user.id, opts)
Diaspora::Websocket.should_receive(:to).with(@user.id).and_return(stub.as_null_object)
Jobs::SocketWebfinger.perform(@user.id, @account, opts)
end
it 'sockets failure message on failure' do
......@@ -41,7 +42,8 @@ describe Jobs::SocketWebfinger do
finger.stub(:fetch).and_raise(Webfinger::WebfingerFailedError)
opts = {:class => 'people', :status => 'fail', :query => @account, :response => I18n.t('people.webfinger.fail', :handle => @account )}.to_json
Diaspora::WebSocket.should_receive(:queue_to_user).with(@user.id, opts)
Diaspora::Websocket.should_receive(:to).with(@user.id).and_return(stub.as_null_object)
Jobs::SocketWebfinger.perform(@user.id, @account)
end
......
......@@ -81,7 +81,7 @@ describe Notification do
n = @request.notification_type(@user, @person).create(opts)
Notification.stub!(:make_notification).and_return n
n.should_receive(:socket_to_user).once
Diaspora::Websocket.should_receive(:to).