Fixing some SSL stuff

This commit is contained in:
Stephane Bourque
2020-10-16 10:33:10 -07:00
parent b52d8063a1
commit 8ea431cc65
3 changed files with 91 additions and 21 deletions

View File

@@ -22,17 +22,18 @@
code_change/3]).
%% internal exports for spawn
-export([mqttserver_worker/2,mqttserver_processor/2]).
-export([mqttserver_worker/2,mqttserver_worker_secure/2,mqttserver_processor_init/2,mqttserver_processor_secure_init/2]).
-define(SERVER, ?MODULE).
-define(DBG, io:format("~s: ~p~n",[?FUNCTION_NAME,?LINE])).
-record(mqtt_server_state, { server_port, num_servers, server_pids, main_listen_socket, session_count = maps:new() }).
-record(mqtt_server_state, { server_port, num_servers, num_listeners, server_pids, main_listen_socket, session_count = maps:new() , secure }).
%%%===================================================================
%%% API
%%%===================================================================
creation_info() ->
[ #{ id => ?MODULE ,
[ #{ id => ?MODULE ,
start => { ?MODULE , start_link, [] },
restart => permanent,
shutdown => 100,
@@ -61,13 +62,28 @@ start_link() ->
{stop, Reason :: term()} | ignore).
init([]) ->
ServerPort = application:get_env(?MQTT_APP,server_port,?MQTT_DEFAULT_SERVER_PORT),
NumListeners = application:get_env(?MQTT_APP,num_listeners,10),
NumServers = application:get_env(?MQTT_APP,num_servers,10),
{ ok , ListenSocket } = gen_tcp:listen(ServerPort,[{active,false},{packet,2}]),
Pids = start_servers(NumServers,ListenSocket,self()),
{ok, #mqtt_server_state{ server_port = ServerPort, num_servers = NumServers, main_listen_socket = ListenSocket, server_pids = Pids}}.
Secure = application:get_env(?MQTT_APP,secure,false),
CertFile = application:get_env(?MQTT_APP,certfile,""),
CaCert = application:get_env(?MQTT_APP,cacertfile,""),
KeyFile = application:get_env(?MQTT_APP,keyfile,""),
{ ok , ListenSocket } = case Secure of
false ->
gen_tcp:listen(ServerPort,[{active,false},{reuseaddr,true}]);
true ->
ssl:listen(ServerPort,[{active,false},{reuseaddr,true},{password,"mypassword"},{certfile,CertFile},{keyfile,KeyFile},{cacertfile,CaCert}])
end,
Pids = case Secure of
false -> start_listeners(NumListeners,ListenSocket,self());
true -> start_listeners_secure(NumListeners,ListenSocket,self())
end,
{ok, #mqtt_server_state{ server_port = ServerPort,
num_servers = NumServers,
num_listeners = NumListeners,
main_listen_socket = ListenSocket,
server_pids = Pids,
secure = Secure }}.
%% @private
%% @doc Handling call messages
@@ -127,21 +143,30 @@ code_change(_OldVsn, State = #mqtt_server_state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
start_servers(NumServers,ListenSocket,ParentPid)->
start_servers(NumServers,ListenSocket,[],ParentPid).
start_listeners(NumListeners,ListenSocket,ParentPid)->
start_listeners(NumListeners,ListenSocket,[],ParentPid).
start_servers(0,_,Pids,_)->
start_listeners(0,_,Pids,_)->
Pids;
start_servers(NumServers,ListenSock,Pids,ParentPid)->
start_listeners(NumListeners,ListenSock,Pids,ParentPid)->
Pid = spawn(?MODULE,mqttserver_worker,[ListenSock,ParentPid]),
start_servers(NumServers-1,ListenSock,[Pid|Pids],ParentPid).
start_listeners(NumListeners-1,ListenSock,[Pid|Pids],ParentPid).
start_listeners_secure(NumListeners,ListenSocket,ParentPid)->
start_listeners_secure(NumListeners,ListenSocket,[],ParentPid).
start_listeners_secure(0,_,Pids,_)->
Pids;
start_listeners_secure(NumListeners,ListenSock,Pids,ParentPid)->
Pid = spawn(?MODULE,mqttserver_worker_secure,[ListenSock,ParentPid]),
start_listeners_secure(NumListeners-1,ListenSock,[Pid|Pids],ParentPid).
mqttserver_worker(ListenSock,ParentPid)->
lager:info("Server ~p starting to listen.",[self()]),
case gen_tcp:accept(ListenSock) of
{ok,Socket} ->
mqtt_server:increase_session(ParentPid,ListenSock),
Pid = spawn(?MODULE,mqttserver_processor,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = inet:peername(Socket)}]),
Pid = spawn(?MODULE,mqttserver_processor_init,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = inet:peername(Socket)}]),
gen_tcp:controlling_process(Socket,Pid),
mqttserver_worker(ListenSock,ParentPid);
Error ->
@@ -149,15 +174,57 @@ mqttserver_worker(ListenSock,ParentPid)->
ok
end.
mqttserver_worker_secure(ListenSock,ParentPid)->
lager:info("Server ~p starting to listen.",[self()]),
case ssl:transport_accept(ListenSock) of
{ok,Socket} ->
mqtt_server:increase_session(ParentPid,ListenSock),
Pid = spawn(?MODULE,mqttserver_processor_secure_init,[Socket,#mqtt_processor_state{listener_pid = self(), parent_pid = ParentPid, peer_ip = ssl:peername(Socket)}]),
ssl:controlling_process(Socket,Pid),
mqttserver_worker_secure(ListenSock,ParentPid);
Error ->
lager:info("accept failed - server shutting down: ~p~n",[Error]),
ok
end.
mqttserver_processor_init(Socket,State)->
inet:setopts(Socket,[{active,true}]),
mqttserver_processor(Socket,State).
mqttserver_processor(Socket,State)->
inet:setopts(Socket,[{active,once}]),
receive
{tcp,Socket,Data} ->
Answer = mqttserver_process:process(Data,State), % Not implemented in this example
{ Answer, NewState } = mqttserver_process:process(Data,State), % Not implemented in this example
gen_tcp:send(Socket,Answer),
mqttserver_processor(Socket,State);
mqttserver_processor(Socket,NewState);
{tcp_closed,Socket} ->
mqtt_server:increase_session(State#mqtt_processor_state.parent_pid,State#mqtt_processor_state.listener_pid),
lager:info("Socket ~w closed [~w]~n",[Socket,self()]),
ok
ok;
Anything ->
io:format("Anything(not secure): ~p~n",[Anything]),
mqttserver_processor(Socket,State)
end.
mqttserver_processor_secure_init(Socket,State)->
ssl:setopts(Socket,[{active,true}]),
mqttserver_processor_secure(Socket,State).
mqttserver_processor_secure(Socket,State)->
receive
{ssl,Socket,Data} ->
{ Answer,NewState} = mqttserver_process:process(Data,State), % Not implemented in this example
ssl:send(Socket,Answer),
mqttserver_processor_secure(Socket,NewState);
{ssl_closed,Socket} ->
mqtt_server:increase_session(State#mqtt_processor_state.parent_pid,State#mqtt_processor_state.listener_pid),
lager:info("Socket ~w closed [~w]~n",[Socket,self()]),
ok;
Anything ->
io:format("Anything ->~p~n",[Anything]),
mqttserver_processor_secure(Socket,State)
end.

View File

@@ -15,6 +15,7 @@
%% API
-export([process/2]).
process(_Data,_State)->
{ <<>>, {} }.
process(Data,State)->
io:format("Data >>> ~p~n",[Data]),
{ <<"Hello from MQTT">>, State }.

View File

@@ -6,6 +6,7 @@
start(_Type, _Args) ->
lager:start(),
application:ensure_all_started(ssl),
mqttsim_sup:start_link().
stop(_State) ->
@@ -13,4 +14,5 @@ stop(_State) ->
start() ->
lager:start(),
application:ensure_all_started(ssl),
application:ensure_all_started(mqttsim).