diff --git a/src/ovsdb_ap.erl b/src/ovsdb_ap.erl index 85fe386..ce512ff 100644 --- a/src/ovsdb_ap.erl +++ b/src/ovsdb_ap.erl @@ -25,7 +25,7 @@ -export([start_ap/1,stop_ap/1,pause_ap/1,cancel_ap/1]). %% comm API --export([rpc_cmd/2,reset_comm/1,mqtt_conf/2,sock_recon/2,post_event/4,post_event/3]). +-export([rpc_cmd/2,reset_comm/1,mqtt_conf/2,post_event/4,post_event/3]). %% gen_server callbacks @@ -135,10 +135,6 @@ reset_comm (Node) -> mqtt_conf (Node, Conf) -> gen_server:cast(Node, {mqtt_conf,Conf}). --spec sock_recon (Node :: pid(), Delay :: integer()) -> ok. -sock_recon (Node, Delay) -> - gen_server:cast(Node, {stats_update, {sock_recon, {Delay}, io_lib:format("socket abnormally closed -> reconnect attempt in ~.2fsec",[Delay/1000])}}). - -spec post_event (Node :: pid(), Event :: atom(), Args :: tuple(), Comment :: binary() | string()) -> ok. post_event (Node, Event, Args, Comment) -> gen_server:cast(Node,{stats_update, {Event, Args, Comment}}). @@ -469,12 +465,13 @@ cancel_simulation (State) -> -spec ctrl_connect (State :: #ap_state{}) -> NewState :: #ap_state{}. -ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) -> +ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg, id=ID}=State) -> Opts = [ {host, ovsdb_ap_config:tip_redirector(host,Cfg)}, {port, ovsdb_ap_config:tip_redirector(port,Cfg)}, {ca, ovsdb_ap_config:ca_certs(Cfg)}, {cert, ovsdb_ap_config:client_cert(Cfg)}, + {id, ID}, {key, ovsdb_ap_config:client_key(Cfg)} ], {ok, Comm} = ovsdb_ap_comm:start_link(Opts), @@ -482,7 +479,7 @@ ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) -> post_event(tip_connect,{<<"redirector">>},<<"connecting to the TIP controller (redirector)">>), State#ap_state{comm=Comm}; -ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) -> +ctrl_connect (#ap_state{comm=none, status=running, config=Cfg, id=ID}=State) -> O = case ovsdb_ap_config:tip_manager(host,Cfg) of [] -> post_event(tip_connect,{<<"redirector">>},<<"connecting to the TIP controller (redirector)">>), @@ -495,7 +492,8 @@ ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) -> end, Opts = [{cacert, ovsdb_ap_config:ca_certs(Cfg)}, {cert, ovsdb_ap_config:client_cert(Cfg)}, - {key,ovsdb_ap_config:client_key(Cfg)}|O], + {key,ovsdb_ap_config:client_key(Cfg)}, + {id, ID} |O], {ok, Comm} = ovsdb_ap_comm:start_link(Opts), gen_server:cast(self(),ctlr_start_comm), State#ap_state{comm=Comm}; diff --git a/src/ovsdb_ap_comm.erl b/src/ovsdb_ap_comm.erl index f30ab1a..471bd0b 100644 --- a/src/ovsdb_ap_comm.erl +++ b/src/ovsdb_ap_comm.erl @@ -17,6 +17,8 @@ -type options() :: [ {host, string()} | % tip controller host name {port, integer()} | % port to connect + {ca, binary()} | % in memory PEM file of the server CA chain + {id, binary()} | % for tagging messages the ID of the AP node {cacert, binary()} | % in memory PEM file of the server CA chain {cert, binary()} | {key, {atom(),binary() } } @@ -26,6 +28,7 @@ -record (c_state, { options :: options(), + id :: binary(), socket :: none | ssl:sslsocket(), ap :: pid(), status :: active | idle | error, @@ -70,15 +73,17 @@ create_comm (Opts, AP) -> options = Opts, socket = none, ap = AP, + id = proplists:get_value(id,Opts,<<"unknown AP">>), status = idle }, comm_loop(State). -spec comm_loop (State :: #c_state{}) -> ok. -comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) -> +comm_loop (#c_state{socket=S, rxb=Rx, ap=AP, id=ID, options=Opts}=State) -> receive {ssl, S, Data} -> - Buffer = process_rx_data(<>,AP), + {Buffer, Pretty} = process_rx_data(<>,AP), + ?L_I(?DBGSTR("GOT REQUEST: from ~s:~B to ~s~n~s",[proplists:get_value(host,Opts,"unknown"),proplists:get_value(port,Opts,0),ID,Pretty])), ovsdb_ap:post_event(AP,comm_event,{<<"RX">>,size(Data)},io_lib:format("receive ~Bbytes",[size(Data)])), case ssl:setopts(S,[{active,once}]) of ok -> @@ -89,6 +94,7 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) -> {ssl_closed, S} -> ovsdb_ap:post_event(AP,comm_error,{<<"socket_closed">>},<<>>), + ?L_E(?DBGSTR("Socket closed by server")), comm_loop(try_reconnect(State)); {ssl_error, S, Reason} -> @@ -105,7 +111,9 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) -> comm_loop(State#c_state{status=error}); _ -> ToSend = jiffy:encode(Data), + ToSendP = jiffy:encode(Data,[pretty]), %?L_I(?DBGSTR("Sending: ~B bytes of date",[size(ToSend)])), + ?L_I(?DBGSTR("SENDING RESPONSE: from ~s to ~s:~B~n~s",[ID,proplists:get_value(host,Opts,"unknown"),proplists:get_value(port,Opts,0),ToSendP])), case ssl:send(S,ToSend) of ok -> ovsdb_ap:post_event(AP,comm_event,{<<"TX">>,size(ToSend)},io_lib:format("sending ~Bbytes",[size(ToSend)])), @@ -153,7 +161,7 @@ try_reconnect (#c_state{restart=R, ap=AP}=State) -> Rj = R + rand:uniform(250) - 125, ?L_I(?DBGSTR("socket closed by server, trying to reconnect in ~.2fsec",[Rj/1000])), _ = timer:send_after(Rj,{start, AP}), - ovsdb_ap:sock_recon(AP,Rj), + ovsdb_ap:post_event(AP,sock_recon,{Rj},io_lib:format("socket abnormally closed -> reconnect attempt in ~.2fsec",[Rj/1000])), State#c_state{socket=none, status=error, restart=min(R*2,32000)}. @@ -185,15 +193,15 @@ connect_to_server (Host, Port, CAs, Cert, Key) -> end. %--------process_rx_data/2---------------process data in buffer and ensures only valid decoded JSON is sent to AP --spec process_rx_data (Data :: binary(), AP :: pid()) -> Buffer :: binary(). +-spec process_rx_data (Data :: binary(), AP :: pid()) -> {Buffer :: binary(), PrettyJSON :: binary()}. process_rx_data (Data, AP) -> try jiffy:decode(Data,[return_maps,copy_strings,return_trailer]) of {has_trailer,Map,Tail} -> ovsdb_ap:rpc_cmd(AP,Map), - iolist_to_binary(Tail); + {iolist_to_binary(Tail),jiffy:encode(Map,[pretty])}; Map -> ovsdb_ap:rpc_cmd(AP,Map), - <<"">> + {<<"">>,jiffy:encode(Map,[pretty])} catch error:{_,truncated_json} -> Data; diff --git a/src/ovsdb_client_handler.erl b/src/ovsdb_client_handler.erl index 1690484..a61e5d2 100644 --- a/src/ovsdb_client_handler.erl +++ b/src/ovsdb_client_handler.erl @@ -338,10 +338,6 @@ code_change (_,OldState,_) -> trigger_execute (0, State) -> gen_server:cast(self(),execute), - State; - -trigger_execute (D, State) -> - {ok,_} = timer:apply_after(D,gen_server,cast,[self(),execute]), State.