使用cowboy實現websocket主要實現以下回調函數 下面的函數返回值要具體弄清楚原因參考 websocket具體協議 主要就是兩個部分 握手和數據傳輸
-export([init/3]). 常見所有處理程序回調。建立WebSocket連接,這個函數必須返回 upgrade 的元組。
-export([websocket_init/3]). 初始socket狀態,也可以用於注冊的過程,啟動一個定時器,等返回參考doc
.
然后和WebSocket握手。
-export([websocket_handle/3]).處理客戶端發送的對應的消息。返回格式 參考cow對應doc
-export([websocket_info/3]). 處理erlang之間派發的消息
-export([websocket_terminate/3]). websocket 斷開 用於掃尾工作處理
我在這里還是采用otp方式構建整個項目 主要代碼 以及效果
-module(websocket_app). -behaviour(application). %% Application callbacks -export([start/2, stop/1]). %% =================================================================== %% Application callbacks %% =================================================================== start(_StartType, _StartArgs) -> %%啟動存儲pid的樹據 可以采用 ets 表格處理 但是為了方便集群處理 我采用的mnesia ok = websocket_store:init(), %% 配置路由 Dispatch = cowboy_router:compile([ {'_', [ {"/", cowboy_static, {priv_file, websocket, "index.html"}}, {"/websocket", websocket_hander, []}, {"/static/[...]", cowboy_static, {priv_dir, websocket, "static"}} ]} ]), {ok, _} = cowboy:start_http(http, 100, [{port, 8080}], [{env, [{dispatch, Dispatch}]}]), %%啟動監督樹 websocket_sup:start_link(). stop(_State) -> ok.

-module(websocket_sup). -behaviour(supervisor). %% API -export([start_link/0]). %% Supervisor callbacks -export([init/1]). %% Helper macro for declaring children of supervisor -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 2000, Type, [I]}). %% =================================================================== %% API functions %% =================================================================== start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% =================================================================== %% Supervisor callbacks %% =================================================================== init([]) -> Children = [], {ok, { {one_for_one, 5, 10}, Children} }.
-module(websocket_hander). -behaviour(cowboy_websocket_handler). %% ------------------------------------------------------------------ %% API Function Exports %% ------------------------------------------------------------------ %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ -export([init/3]). -export([websocket_init/3]). -export([websocket_handle/3]). -export([websocket_info/3]). -export([websocket_terminate/3]). %% ------------------------------------------------------------------ %% API Function Definitions %% ------------------------------------------------------------------ %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ %%websocket 握手 init({tcp, http}, _Req, _Opts) -> %%插入數據 websocket_store:dirty_insert(self(),web), {upgrade, protocol, cowboy_websocket}. %%連接初始 websocket_init(_TransportName, Req, _Opts) -> %%連接一秒后 發送消息 erlang:start_timer(1000, self(), <<"Hello!">>), {ok, Req, undefined_state}. %%處理客戶端發送投遞的消息 websocket_handle({text, Msg}, Req, State) -> %% 把消息投遞到其余的進程 這里可以 投遞到 tcp socket 也可以投遞的 websocket 這樣就可以實現 服務支持多協議處理 lists:foreach(fun([Pid,SocketType]) -> case SocketType of tcp -> ok; web -> case Pid == self() of true -> ok; false -> Pid!{chat,Msg} end end end,websocket_store:dirty_lookall()), {reply, {text, << "That's what she said! ", Msg/binary >>}, Req, State}; websocket_handle(_Data, Req, State) -> {ok, Req, State}. %% 處理erlang 發送的消息 這里是接收 46 行發送的數據 websocket_info({chat,Msg},_Req,State)-> {reply,{text,Msg},_Req,State}; %% 處理 erlang:start_timer(1000, self(), <<"Hello!">>) 發送的消息 websocket_info({timeout, _Ref, Msg}, Req, State) -> {reply, {text, Msg}, Req, State}; websocket_info(_Info, Req, State) -> {ok, Req, State}. %%斷開socket websocket_terminate(_Reason, _Req, _State) -> websocket_store:dirty_delete(self()), ok. %% ------------------------------------------------------------------ %% Internal Function Definitions %% ------------------------------------------------------------------
%%%------------------------------------------------------------------- %%% @author thinkpad <> %%% @copyright (C) 2014, thinkpad %%% @doc %%% %%% @end %%% Created : 27 Jun 2014 by thinkpad <> %%%------------------------------------------------------------------- -module(websocket_store). -include_lib("stdlib/include/qlc.hrl" ). %% API -export([dirty_insert/2,dirty_lookall/0,dirty_lookall_record/0,dirty_delete/1]). -export([init/0,insert/2,delete/1,lookup/1,lookall/0]). -record(socket_to_pid, {pid,socket_type}). -define(WAIT_FOR_TABLES, 10000). %%%=================================================================== %%% API %%%=================================================================== init()-> dynamic_db_init(). %-------------------------------------------------------------------- %% @doc Insert a key and pid. %% @spec insert(Key, Pid) -> void() %% @end %%-------------------------------------------------------------------- insert(Pid,SocketType) when is_pid(Pid) -> Fun = fun() -> mnesia:write(#socket_to_pid{pid = Pid, socket_type = SocketType}) end, {atomic, _} = mnesia:transaction(Fun). %-------------------------------------------------------------------- %% @doc dirty insert pid and socket %% @spec dirty_insert(Pid socket) %% @end %%-------------------------------------------------------------------- dirty_insert(Pid,SocketType) when is_pid(Pid) -> mnesia:dirty_write(#socket_to_pid{pid = Pid, socket_type = SocketType}). %-------------------------------------------------------------------- %% @doc dirty_read data %% @spec dirty_lookall() %% @end %%-------------------------------------------------------------------- dirty_lookall()-> mnesia:dirty_select(socket_to_pid,[{#socket_to_pid{pid='$1',socket_type = '$2'},[],['$$']}]). dirty_delete(Pid)-> mnesia:dirty_delete(socket_to_pid,Pid). %-------------------------------------------------------------------- %% @doc look all record info %% @spec %% @end %%-------------------------------------------------------------------- dirty_lookall_record()-> mnesia:dirty_select(socket_to_pid,[{#socket_to_pid{pid='$1',socket_type = '$2'},[],['$_']}]). %%-------------------------------------------------------------------- %% @doc Find a pid given a key. %% @spec lookup(Key) -> {ok, Pid} | {error, not_found} %% @end %%-------------------------------------------------------------------- lookup(Pid) -> do(qlc:q([{X#socket_to_pid.pid,X#socket_to_pid.socket_type} || X <- mnesia:table(socket_to_pid),X#socket_to_pid.pid==Pid])). %%-------------------------------------------------------------------- %% @doc Find all list %% @spec lookall() -> {List} | {error, not_found} %% @end %%-------------------------------------------------------------------- lookall() -> do(qlc:q([[X#socket_to_pid.pid,X#socket_to_pid.socket_type] || X <- mnesia:table(socket_to_pid)])). %%-------------------------------------------------------------------- %% @doc Delete an element by pid from the registrar. %% @spec delete(Pid) -> void() %% @end %%-------------------------------------------------------------------- delete(Pid) -> try [#socket_to_pid{} = Record] = mnesia:dirty_read(socket_to_pid, Pid, #socket_to_pid.pid), mnesia:dirty_delete_object(Record) catch _C:_E -> ok end. %%-------------------------------------------------------------------- %% @doc %% @spec %% @end %%-------------------------------------------------------------------- %%%=================================================================== %%% Internal functions %%%=================================================================== dynamic_db_init() -> delete_schema(), {atomic, ok} = mnesia:create_table(socket_to_pid, [{attributes, record_info(fields, socket_to_pid)}]), ok. %% deletes a local schema. delete_schema() -> mnesia:stop(), mnesia:delete_schema([node()]), mnesia:start(). do(Query) -> F = fun() -> qlc:e(Query) end, {atomic, Value} = mnesia:transaction(F), Value.