Merge remote-tracking branch 'origin/main' into main

This commit is contained in:
Stephane Bourque
2020-12-08 11:08:27 -08:00
3 changed files with 20 additions and 18 deletions

View File

@@ -25,7 +25,7 @@
-export([start_ap/1,stop_ap/1,pause_ap/1,cancel_ap/1]).
%% comm API
-export([rpc_cmd/2,reset_comm/1,mqtt_conf/2,sock_recon/2,post_event/4,post_event/3]).
-export([rpc_cmd/2,reset_comm/1,mqtt_conf/2,post_event/4,post_event/3]).
%% gen_server callbacks
@@ -135,10 +135,6 @@ reset_comm (Node) ->
mqtt_conf (Node, Conf) ->
gen_server:cast(Node, {mqtt_conf,Conf}).
-spec sock_recon (Node :: pid(), Delay :: integer()) -> ok.
sock_recon (Node, Delay) ->
gen_server:cast(Node, {stats_update, {sock_recon, {Delay}, io_lib:format("socket abnormally closed -> reconnect attempt in ~.2fsec",[Delay/1000])}}).
-spec post_event (Node :: pid(), Event :: atom(), Args :: tuple(), Comment :: binary() | string()) -> ok.
post_event (Node, Event, Args, Comment) ->
gen_server:cast(Node,{stats_update, {Event, Args, Comment}}).
@@ -469,12 +465,13 @@ cancel_simulation (State) ->
-spec ctrl_connect (State :: #ap_state{}) -> NewState :: #ap_state{}.
ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) ->
ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg, id=ID}=State) ->
Opts = [
{host, ovsdb_ap_config:tip_redirector(host,Cfg)},
{port, ovsdb_ap_config:tip_redirector(port,Cfg)},
{ca, ovsdb_ap_config:ca_certs(Cfg)},
{cert, ovsdb_ap_config:client_cert(Cfg)},
{id, ID},
{key, ovsdb_ap_config:client_key(Cfg)}
],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
@@ -482,7 +479,7 @@ ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) ->
post_event(tip_connect,{<<"redirector">>},<<"connecting to the TIP controller (redirector)">>),
State#ap_state{comm=Comm};
ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) ->
ctrl_connect (#ap_state{comm=none, status=running, config=Cfg, id=ID}=State) ->
O = case ovsdb_ap_config:tip_manager(host,Cfg) of
[] ->
post_event(tip_connect,{<<"redirector">>},<<"connecting to the TIP controller (redirector)">>),
@@ -495,7 +492,8 @@ ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) ->
end,
Opts = [{cacert, ovsdb_ap_config:ca_certs(Cfg)},
{cert, ovsdb_ap_config:client_cert(Cfg)},
{key,ovsdb_ap_config:client_key(Cfg)}|O],
{key,ovsdb_ap_config:client_key(Cfg)},
{id, ID} |O],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
gen_server:cast(self(),ctlr_start_comm),
State#ap_state{comm=Comm};

View File

@@ -17,6 +17,8 @@
-type options() :: [
{host, string()} | % tip controller host name
{port, integer()} | % port to connect
{ca, binary()} | % in memory PEM file of the server CA chain
{id, binary()} | % for tagging messages the ID of the AP node
{cacert, binary()} | % in memory PEM file of the server CA chain
{cert, binary()} |
{key, {atom(),binary() } }
@@ -26,6 +28,7 @@
-record (c_state, {
options :: options(),
id :: binary(),
socket :: none | ssl:sslsocket(),
ap :: pid(),
status :: active | idle | error,
@@ -70,15 +73,17 @@ create_comm (Opts, AP) ->
options = Opts,
socket = none,
ap = AP,
id = proplists:get_value(id,Opts,<<"unknown AP">>),
status = idle
},
comm_loop(State).
-spec comm_loop (State :: #c_state{}) -> ok.
comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) ->
comm_loop (#c_state{socket=S, rxb=Rx, ap=AP, id=ID, options=Opts}=State) ->
receive
{ssl, S, Data} ->
Buffer = process_rx_data(<<Rx/binary,Data/binary>>,AP),
{Buffer, Pretty} = process_rx_data(<<Rx/binary,Data/binary>>,AP),
?L_I(?DBGSTR("GOT REQUEST: from ~s:~B to ~s~n~s",[proplists:get_value(host,Opts,"unknown"),proplists:get_value(port,Opts,0),ID,Pretty])),
ovsdb_ap:post_event(AP,comm_event,{<<"RX">>,size(Data)},io_lib:format("receive ~Bbytes",[size(Data)])),
case ssl:setopts(S,[{active,once}]) of
ok ->
@@ -89,6 +94,7 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) ->
{ssl_closed, S} ->
ovsdb_ap:post_event(AP,comm_error,{<<"socket_closed">>},<<>>),
?L_E(?DBGSTR("Socket closed by server")),
comm_loop(try_reconnect(State));
{ssl_error, S, Reason} ->
@@ -105,7 +111,9 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) ->
comm_loop(State#c_state{status=error});
_ ->
ToSend = jiffy:encode(Data),
ToSendP = jiffy:encode(Data,[pretty]),
%?L_I(?DBGSTR("Sending: ~B bytes of date",[size(ToSend)])),
?L_I(?DBGSTR("SENDING RESPONSE: from ~s to ~s:~B~n~s",[ID,proplists:get_value(host,Opts,"unknown"),proplists:get_value(port,Opts,0),ToSendP])),
case ssl:send(S,ToSend) of
ok ->
ovsdb_ap:post_event(AP,comm_event,{<<"TX">>,size(ToSend)},io_lib:format("sending ~Bbytes",[size(ToSend)])),
@@ -153,7 +161,7 @@ try_reconnect (#c_state{restart=R, ap=AP}=State) ->
Rj = R + rand:uniform(250) - 125,
?L_I(?DBGSTR("socket closed by server, trying to reconnect in ~.2fsec",[Rj/1000])),
_ = timer:send_after(Rj,{start, AP}),
ovsdb_ap:sock_recon(AP,Rj),
ovsdb_ap:post_event(AP,sock_recon,{Rj},io_lib:format("socket abnormally closed -> reconnect attempt in ~.2fsec",[Rj/1000])),
State#c_state{socket=none, status=error, restart=min(R*2,32000)}.
@@ -185,15 +193,15 @@ connect_to_server (Host, Port, CAs, Cert, Key) ->
end.
%--------process_rx_data/2---------------process data in buffer and ensures only valid decoded JSON is sent to AP
-spec process_rx_data (Data :: binary(), AP :: pid()) -> Buffer :: binary().
-spec process_rx_data (Data :: binary(), AP :: pid()) -> {Buffer :: binary(), PrettyJSON :: binary()}.
process_rx_data (Data, AP) ->
try jiffy:decode(Data,[return_maps,copy_strings,return_trailer]) of
{has_trailer,Map,Tail} ->
ovsdb_ap:rpc_cmd(AP,Map),
iolist_to_binary(Tail);
{iolist_to_binary(Tail),jiffy:encode(Map,[pretty])};
Map ->
ovsdb_ap:rpc_cmd(AP,Map),
<<"">>
{<<"">>,jiffy:encode(Map,[pretty])}
catch
error:{_,truncated_json} ->
Data;

View File

@@ -338,10 +338,6 @@ code_change (_,OldState,_) ->
trigger_execute (0, State) ->
gen_server:cast(self(),execute),
State;
trigger_execute (D, State) ->
{ok,_} = timer:apply_after(D,gen_server,cast,[self(),execute]),
State.