diff --git a/src/mqtt_server.erl b/src/mqtt_server.erl index 793d38f..1bcd5d7 100644 --- a/src/mqtt_server.erl +++ b/src/mqtt_server.erl @@ -22,17 +22,18 @@ code_change/3]). %% internal exports for spawn --export([mqttserver_worker/2,mqttserver_processor/2]). +-export([mqttserver_worker/2,mqttserver_worker_secure/2,mqttserver_processor_init/2,mqttserver_processor_secure_init/2]). -define(SERVER, ?MODULE). +-define(DBG, io:format("~s: ~p~n",[?FUNCTION_NAME,?LINE])). --record(mqtt_server_state, { server_port, num_servers, server_pids, main_listen_socket, session_count = maps:new() }). +-record(mqtt_server_state, { server_port, num_servers, num_listeners, server_pids, main_listen_socket, session_count = maps:new() , secure }). %%%=================================================================== %%% API %%%=================================================================== creation_info() -> - [ #{ id => ?MODULE , + [ #{ id => ?MODULE , start => { ?MODULE , start_link, [] }, restart => permanent, shutdown => 100, @@ -61,13 +62,28 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> ServerPort = application:get_env(?MQTT_APP,server_port,?MQTT_DEFAULT_SERVER_PORT), + NumListeners = application:get_env(?MQTT_APP,num_listeners,10), NumServers = application:get_env(?MQTT_APP,num_servers,10), - - { ok , ListenSocket } = gen_tcp:listen(ServerPort,[{active,false},{packet,2}]), - - Pids = start_servers(NumServers,ListenSocket,self()), - - {ok, #mqtt_server_state{ server_port = ServerPort, num_servers = NumServers, main_listen_socket = ListenSocket, server_pids = Pids}}. + Secure = application:get_env(?MQTT_APP,secure,false), + CertFile = application:get_env(?MQTT_APP,certfile,""), + CaCert = application:get_env(?MQTT_APP,cacertfile,""), + KeyFile = application:get_env(?MQTT_APP,keyfile,""), + { ok , ListenSocket } = case Secure of + false -> + gen_tcp:listen(ServerPort,[{active,false},{reuseaddr,true}]); + true -> + ssl:listen(ServerPort,[{active,false},{reuseaddr,true},{password,"mypassword"},{certfile,CertFile},{keyfile,KeyFile},{cacertfile,CaCert}]) + end, + Pids = case Secure of + false -> start_listeners(NumListeners,ListenSocket,self()); + true -> start_listeners_secure(NumListeners,ListenSocket,self()) + end, + {ok, #mqtt_server_state{ server_port = ServerPort, + num_servers = NumServers, + num_listeners = NumListeners, + main_listen_socket = ListenSocket, + server_pids = Pids, + secure = Secure }}. %% @private %% @doc Handling call messages @@ -127,21 +143,30 @@ code_change(_OldVsn, State = #mqtt_server_state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -start_servers(NumServers,ListenSocket,ParentPid)-> - start_servers(NumServers,ListenSocket,[],ParentPid). +start_listeners(NumListeners,ListenSocket,ParentPid)-> + start_listeners(NumListeners,ListenSocket,[],ParentPid). -start_servers(0,_,Pids,_)-> +start_listeners(0,_,Pids,_)-> Pids; -start_servers(NumServers,ListenSock,Pids,ParentPid)-> +start_listeners(NumListeners,ListenSock,Pids,ParentPid)-> Pid = spawn(?MODULE,mqttserver_worker,[ListenSock,ParentPid]), - start_servers(NumServers-1,ListenSock,[Pid|Pids],ParentPid). + start_listeners(NumListeners-1,ListenSock,[Pid|Pids],ParentPid). + +start_listeners_secure(NumListeners,ListenSocket,ParentPid)-> + start_listeners_secure(NumListeners,ListenSocket,[],ParentPid). + +start_listeners_secure(0,_,Pids,_)-> + Pids; +start_listeners_secure(NumListeners,ListenSock,Pids,ParentPid)-> + Pid = spawn(?MODULE,mqttserver_worker_secure,[ListenSock,ParentPid]), + start_listeners_secure(NumListeners-1,ListenSock,[Pid|Pids],ParentPid). mqttserver_worker(ListenSock,ParentPid)-> lager:info("Server ~p starting to listen.",[self()]), case gen_tcp:accept(ListenSock) of {ok,Socket} -> mqtt_server:increase_session(ParentPid,ListenSock), - Pid = spawn(?MODULE,mqttserver_processor,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = inet:peername(Socket)}]), + Pid = spawn(?MODULE,mqttserver_processor_init,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = inet:peername(Socket)}]), gen_tcp:controlling_process(Socket,Pid), mqttserver_worker(ListenSock,ParentPid); Error -> @@ -149,15 +174,57 @@ mqttserver_worker(ListenSock,ParentPid)-> ok end. +mqttserver_worker_secure(ListenSock,ParentPid)-> + lager:info("Server ~p starting to listen.",[self()]), + case ssl:transport_accept(ListenSock) of + {ok,Socket} -> + mqtt_server:increase_session(ParentPid,ListenSock), + Pid = spawn(?MODULE,mqttserver_processor_secure_init,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = ssl:peername(Socket)}]), + ssl:controlling_process(Socket,Pid), + mqttserver_worker_secure(ListenSock,ParentPid); + Error -> + + lager:info("accept failed - server shutting down: ~p~n",[Error]), + ok + end. + +mqttserver_processor_init(Socket,State)-> + inet:setopts(Socket,[{active,true}]), + mqttserver_processor(Socket,State). + mqttserver_processor(Socket,State)-> - inet:setopts(Socket,[{active,once}]), receive {tcp,Socket,Data} -> - Answer = mqttserver_process:process(Data,State), % Not implemented in this example + { Answer, NewState } = mqttserver_process:process(Data,State), % Not implemented in this example gen_tcp:send(Socket,Answer), - mqttserver_processor(Socket,State); + mqttserver_processor(Socket,NewState); {tcp_closed,Socket} -> mqtt_server:increase_session(State#mqtt_processor_state.parent_pid,State#mqtt_processor_state.listener_pid), lager:info("Socket ~w closed [~w]~n",[Socket,self()]), - ok + ok; + Anything -> + io:format("Anything(not secure): ~p~n",[Anything]), + mqttserver_processor(Socket,State) + end. + +mqttserver_processor_secure_init(Socket,State)-> + ssl:setopts(Socket,[{active,true}]), + mqttserver_processor_secure(Socket,State). + +mqttserver_processor_secure(Socket,State)-> + receive + {ssl,Socket,Data} -> + { Answer,NewState} = mqttserver_process:process(Data,State), % Not implemented in this example + ssl:send(Socket,Answer), + mqttserver_processor_secure(Socket,NewState); + {ssl_closed,Socket} -> + mqtt_server:increase_session(State#mqtt_processor_state.parent_pid,State#mqtt_processor_state.listener_pid), + lager:info("Socket ~w closed [~w]~n",[Socket,self()]), + ok; + Anything -> + io:format("Anything ->~p~n",[Anything]), + mqttserver_processor_secure(Socket,State) + end. + + diff --git a/src/mqttserver_process.erl b/src/mqttserver_process.erl index 9ea97b1..3833646 100644 --- a/src/mqttserver_process.erl +++ b/src/mqttserver_process.erl @@ -15,6 +15,7 @@ %% API -export([process/2]). -process(_Data,_State)-> - { <<>>, {} }. +process(Data,State)-> + io:format("Data >>> ~p~n",[Data]), + { <<"Hello from MQTT">>, State }. diff --git a/src/mqttsim_app.erl b/src/mqttsim_app.erl index d1debce..bfaa404 100644 --- a/src/mqttsim_app.erl +++ b/src/mqttsim_app.erl @@ -6,6 +6,7 @@ start(_Type, _Args) -> lager:start(), + application:ensure_all_started(ssl), mqttsim_sup:start_link(). stop(_State) -> @@ -13,4 +14,5 @@ stop(_State) -> start() -> lager:start(), + application:ensure_all_started(ssl), application:ensure_all_started(mqttsim). \ No newline at end of file