Merge remote-tracking branch 'origin/main' into main

This commit is contained in:
Stephane Bourque
2020-12-02 08:24:06 -08:00
6 changed files with 223 additions and 99 deletions

View File

@@ -27,8 +27,8 @@
-ifdef(debug).
-define(DBGTRC(Msg), io:format("~s:~s (~p) => ~s~n",[?MODULE,?FUNCTION_NAME,?LINE,Msg])).
-define(DBGTRC(FMsg,Args), io:format("~s:~s (~B) => " FMsg "~n",[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)).
-define(DBGSTR(Msg), io_lib:format("~s:~s (~B) => ~s~n",[?MODULE,?FUNCTION_NAME,?LINE,Msg])).
-define(DBGSTR(FMsg,Args), io_lib:format("~s:~s (~B) => " FMsg "~n",[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)).
-define(DBGSTR(Msg), io_lib:format("~s:~s (~B) => ~s",[?MODULE,?FUNCTION_NAME,?LINE,Msg])).
-define(DBGSTR(FMsg,Args), io_lib:format("~s:~s (~B) => " FMsg,[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)).
-else.
-define(DBGTRC(Msg), true).
-define(DBGTRC(FMsg,Args), true).

View File

@@ -109,7 +109,7 @@ rpc_cmd (Node,Rpc) ->
-spec reset_comm (Node :: pid()) -> ok.
reset_comm (Node) ->
gen_server:call(Node,reset_comm).
gen_server:cast(Node,reset_comm).
@@ -175,6 +175,12 @@ handle_cast (ap_cancel, State) ->
_ = cancel_simulation(State),
{stop, normal, State};
handle_cast (reset_comm, State) ->
{noreply,ctrl_connect(State)};
handle_cast (ctlr_start_comm, State) ->
{noreply, ctlr_start_comm(State)};
handle_cast (R,State) ->
?L_E(?DBGSTR("got unknown request: ~p",[R])),
{noreply, State}.
@@ -186,10 +192,29 @@ handle_cast (R,State) ->
Request :: term(),
From :: {pid(),Tag::term()},
State :: #ap_state{},
Reply :: term(),
Reply :: ok | invalid | ignored,
Reason :: term(),
NewState :: #ap_state{}.
handle_call ({exec_rpc, RPC}, _From, #ap_state{status=paused}=State) when is_map(RPC) andalso
is_map_key(<<"method">>,RPC) andalso
is_map_key(<<"id">>,RPC) ->
case maps:get(<<"method">>,RPC) of
<<"echo">> ->
case ovsdb_ap_rpc:eval_req(<<"echo">>, maps:get(<<"id">>,RPC), RPC, State#ap_state.store) of
{ok, Result} when is_map(Result) andalso is_map_key(<<"result">>,Result) ->
ok = ovsdb_ap_comm:send_term(State#ap_state.comm,Result),
{reply,ok,State};
_ ->
{reply, ignored, State}
end;
_ ->
{reply, ignored, State}
end;
handle_call ({exec_rpc, _RPC}, _From, #ap_state{status=paused}=State) ->
{reply, ignored, State};
handle_call ({exec_rpc, RPC}, _From, State) when is_map(RPC) andalso
is_map_key(<<"method">>,RPC) andalso
is_map_key(<<"id">>,RPC) ->
@@ -225,7 +250,7 @@ handle_call ({exec_rpc, RPC}, _From, State) when is_map(RPC) andalso
end;
handle_call ({exec_rpc, RPC}, _From, State) ->
io:format("invalid RPC: ~p~n",[RPC]),
?L_E(?DBGSTR("invalid RPC: ~p~n",[RPC])),
{reply,invalid, State};
handle_call (Request, From, State) ->
@@ -242,9 +267,11 @@ handle_call (Request, From, State) ->
NewState :: #ap_state{}.
handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State};
handle_info({'EXIT', Pid, Reason}, State) ->
?L_E(?DBGSTR("Abnormal exit from ~p with reason: ~p",[Pid,Reason])),
%% @TODO: implement proper restart strategy for linked processes
{noreply, State};
@@ -313,8 +340,9 @@ prepare_state (ID, SimMan) ->
State :: #ap_state{},
NewState :: #ap_state{}.
set_status (Status, #ap_state{config=Cfg}=State) ->
set_status (Status, #ap_state{status=OldStatus, config=Cfg}=State) ->
ovsdb_client_handler:ap_status(Status,ovsdb_ap_config:id(Cfg)),
?L_I(?DBGSTR("AP ~p : status change := ~p -> ~p",[self(),OldStatus,Status])),
State#ap_state{status=Status}.
@@ -335,26 +363,13 @@ startup_ap (#ap_state{status=init, config=Cfg, sim_manager=Man}=State) ->
-spec run_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}.
run_simulation (#ap_state{status=ready,config=Cfg}=State) ->
Opts = [
{host, ovsdb_ap_config:tip_redirector(host,Cfg)},
{port, ovsdb_ap_config:tip_redirector(port,Cfg)},
{ca, ovsdb_ap_config:ca_certs(Cfg)},
{cert, ovsdb_ap_config:client_cert(Cfg)}
],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
set_status(running, State#ap_state{comm=Comm});
run_simulation (#ap_state{status=paused,config=Cfg}=State) ->
Opts = [
{host, ovsdb_ap_config:tip_manager(host,Cfg)},
{port, ovsdb_ap_config:tip_manager(port,Cfg)},
{ca, ovsdb_ap_config:ca_certs(Cfg)},
{cert, ovsdb_ap_config:client_cert(Cfg)}
],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
set_status(running, State#ap_state{comm=Comm}).
run_simulation (#ap_state{status=ready}=State) ->
NewState = ctrl_connect(State),
set_status(running,NewState);
run_simulation (#ap_state{status=paused}=State) ->
set_status(running,State).
@@ -363,7 +378,8 @@ run_simulation (#ap_state{status=paused,config=Cfg}=State) ->
-spec stop_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}.
stop_simulation (State) ->
set_status(ready, State).
NewState = ctrl_disconnect(State),
set_status(ready, NewState).
@@ -383,4 +399,66 @@ pause_simulation (State) ->
-spec cancel_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}.
cancel_simulation (State) ->
State.
State.
%--------ctrl_connect/1------------------connect to either the tip redirector or manager based on state / old connections are closed if open
-spec ctrl_connect (State :: #ap_state{}) -> NewState :: #ap_state{}.
ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) ->
Opts = [
{host, ovsdb_ap_config:tip_redirector(host,Cfg)},
{port, ovsdb_ap_config:tip_redirector(port,Cfg)},
{ca, ovsdb_ap_config:ca_certs(Cfg)},
{cert, ovsdb_ap_config:client_cert(Cfg)}
],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
gen_server:cast(self(),ctlr_start_comm),
State#ap_state{comm=Comm};
ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) ->
O = case ovsdb_ap_config:tip_manager(host,Cfg) of
[] ->
[{host, ovsdb_ap_config:tip_redirector(host,Cfg)},
{port, ovsdb_ap_config:tip_redirector(port,Cfg)}];
_ ->
[{host, ovsdb_ap_config:tip_manager(host,Cfg)},
{port, ovsdb_ap_config:tip_manager(port,Cfg)}]
end,
Opts = [{ca, ovsdb_ap_config:ca_certs(Cfg)},{cert, ovsdb_ap_config:client_cert(Cfg)}|O],
{ok, Comm} = ovsdb_ap_comm:start_link(Opts),
gen_server:cast(self(),ctlr_start_comm),
State#ap_state{comm=Comm};
ctrl_connect (#ap_state{comm=Comm}=State) ->
ovsdb_ap_comm:end_comm(Comm),
ctrl_connect (State#ap_state{comm=none}).
%--------ctrl_disconnect/1---------------disconnect and closes communication port but otherwise does not chenge status
-spec ctrl_disconnect (State :: #ap_state{}) -> NewState :: #ap_state{}.
ctrl_disconnect (#ap_state{comm=none}=State) ->
State;
ctrl_disconnect (#ap_state{comm=Comm}=State) ->
ovsdb_ap_comm:end_comm(Comm),
State#ap_state{comm=none}.
%--------ctlr_start_comm/1---------------asychrounously starts the connection after comm is created
-spec ctlr_start_comm (State :: #ap_state{}) -> NewState :: #ap_state{}.
ctlr_start_comm (#ap_state{comm=Comm}=State) ->
ovsdb_ap_comm:start_comm(Comm),
State.

View File

@@ -15,14 +15,6 @@
%%-----------------------------------------------------------------------------
%% types and specifications
-record (c_state, {
socket :: ssl:sslsocket(),
ap :: pid(),
status :: active | idle,
rxb = <<"">> :: binary()
}).
-type options() :: [
{host, string()} | % tip controller host name
{port, integer()} | % port to connect
@@ -30,15 +22,21 @@
{cert, binary()} %
].
-export_type([options/0]).
-record (c_state, {
options :: options(),
socket :: none | ssl:sslsocket(),
ap :: pid(),
status :: active | idle,
rxb = <<"">> :: binary()
}).
%%------------------------------------------------------------------------------
%% API
-export ([start_link/1,create_comm/2,send_term/2,end_comm/1]).
-export ([start_link/1,create_comm/2,send_term/2,end_comm/1,start_comm/1]).
-spec start_link (Options :: options()) -> {ok, pid()}.
@@ -51,7 +49,7 @@ start_link (Options) ->
-spec send_term(Comm :: pid(), Data :: term()) -> ok | {error, Reason :: string()}.
send_term (Comm, Data) when is_map(Data) ->
send_term (Comm, Data) when is_pid(Comm) andalso is_map(Data) ->
Comm ! {send, self(), Data},
ok;
@@ -67,6 +65,13 @@ end_comm (Comm) ->
ok.
-spec start_comm (Comm :: pid()) -> ok.
start_comm (Comm) ->
Comm ! {start, self()},
ok.
%%------------------------------------------------------------------------------
%% internals
@@ -74,15 +79,11 @@ end_comm (Comm) ->
-spec create_comm (Options :: options(), AP :: pid()) -> ok.
create_comm (Opts, AP) ->
H = proplists:get_value(host,Opts),
P = proplists:get_value(port,Opts),
CAs = [X || {'Certificate',X,not_encrypted} <- public_key:pem_decode(proplists:get_value(ca,Opts))],
[{'Certificate',Cert,not_encrypted},
{KeyType,Key,not_encrypted}] = public_key:pem_decode(proplists:get_value(cert,Opts)),
State = #c_state{
socket = connect_to_server(H,P,CAs,Cert,{KeyType,Key}),
options = Opts,
socket = none,
ap = AP,
status = active
status = idle
},
comm_loop(State).
@@ -113,12 +114,16 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) ->
{send, AP, Data} ->
ToSend = jiffy:encode(Data),
io:format("Sending: ~s~n",[ToSend]),
?L_I(?DBGSTR("Sending: ~s",[ToSend])),
ok = ssl:send(S,ToSend),
comm_loop(State#c_state{status=active});
{down, AP} ->
ssl:close(S)
ssl:close(S);
{start, AP} ->
NewState = start_connection(State),
comm_loop(NewState)
after
10000 ->
@@ -127,6 +132,22 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) ->
-spec start_connection (State :: #c_state{}) -> NewState :: #c_state{}.
start_connection (#c_state{options=Opts}=State) ->
H = proplists:get_value(host,Opts),
P = proplists:get_value(port,Opts),
CAs = [X || {'Certificate',X,not_encrypted} <- public_key:pem_decode(proplists:get_value(ca,Opts))],
[{'Certificate',Cert,not_encrypted},
{KeyType,Key,not_encrypted}] = public_key:pem_decode(proplists:get_value(cert,Opts)),
State#c_state{
socket = connect_to_server(H,P,CAs,Cert,{KeyType,Key}),
status = active
}.
-spec connect_to_server (Host, Port, CAs, Cert, Key) -> Socket when
Host :: string(), %% host name to connect to (can be IP address in string format)
Port :: integer(), %% port to connect to
@@ -144,6 +165,7 @@ connect_to_server (Host, Port, CAs, Cert, Key) ->
{session_tickets,auto},
{mode,binary},
{active,once}],
?L_I(?DBGSTR("AP connecting to ~s:~B",[Host,Port])),
case ssl:connect(Host, Port, Opts) of
{ok, Socket} -> Socket;
{error, Reason} ->

View File

@@ -98,11 +98,6 @@ client_cert (Cfg) -> Cfg#cfg.client_cert.
-spec tip_redirector (Part :: host | port, Config :: cfg()) -> string() | integer().
% tip_redirector (Part,#cfg{store_ref=Store}) ->
% [#'AWLAN_Node'{data=D}|_] = ets:lookup(Store,'AWLAN_Node'),
% #{redirector_addr:=R} = D,
% get_host_or_port(Part,R).
tip_redirector (Part,#cfg{store_ref=Store}) ->
[#'AWLAN_Node'{redirector_addr=R}|_] = ets:lookup(Store,'AWLAN_Node'),
get_host_or_port(Part,R).
@@ -110,25 +105,26 @@ tip_redirector (Part,#cfg{store_ref=Store}) ->
-spec tip_manager (Part :: host | port, Config :: cfg()) -> string() | integer().
% tip_manager (Part,#cfg{store_ref=Store}) ->
% [#'AWLAN_Node'{data=D}|_] = ets:lookup(Store,'AWLAN_Node'),
% #{manager_addr:=R} = D,
% get_host_or_port(Part,R).
tip_manager (Part,#cfg{store_ref=Store}) ->
[#'AWLAN_Node'{manager_addr=R}|_] = ets:lookup(Store,'AWLAN_Node'),
get_host_or_port(Part,R).
-spec get_host_or_port (Part :: host | port, Addr :: string() | binary()) -> string() | integer().
-spec get_host_or_port (Part :: host | port, Addr :: binary()) -> string() | integer().
get_host_or_port (Part, Addr) when is_binary(Addr) ->
[H,P] = string:split(Addr,":",trailing),
Parts = string:split(Addr,":",all),
case Part of
host when is_binary(H) -> binary_to_list(H);
host -> H;
port when is_binary(P) -> binary_to_integer(P);
port -> list_to_integer(P)
host -> case Parts of
[_,H,_] -> binary_to_list(H);
[H,_] -> binary_to_list(H);
_ -> ""
end;
port -> case Parts of
[_,_,P] -> binary_to_integer(P);
[_,P] -> binary_to_integer(P);
_ -> 0
end
end.

View File

@@ -28,20 +28,20 @@
Reason :: term().
eval_req(<<"echo">>, Id, _Data, _Store) ->
?DBGTRC("RPC request: ~s (~s)~n",[<<"echo">>,Id]),
?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"echo">>,Id])),
{ok, make_result(Id,#{})};
eval_req(<<"transact">>,Id,#{<<"params">>:=P},Store) ->
?DBGTRC("RPC request: ~s (~s)~n",[<<"transact">>,Id]),
?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"transact">>,Id])),
Qr = table_query(P,Store),
{ok, make_result(Id,Qr)};
eval_req(<<"get_schema">>,Id,_,_Store) ->
?DBGTRC("RPC request: ~s (~s)~n",[<<"get_schema">>,Id]),
?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"get_schema">>,Id])),
{ok, ignore};
eval_req (Method, Id, _Data, _Store) ->
io:format("RPC request: ~s (~s)~n",[Method,Id]),
?L_I(?DBGSTR("RPC request: ~s (~s)",[Method,Id])),
{error,io_lib:format("~s not recognized",[Method])}.
@@ -96,12 +96,34 @@ table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"select">>, <<"columns">>:=C, <<"
#{ <<"rows">> => make_res_rows(Res,C,[])};
table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"update">>, <<"row">>:=C}],S) ->
#{}.
table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"update">>, <<"row">>:=C, <<"where">>:=W}],S) ->
M = create_match_spec(T,[],W),
Res = ets:select(S,M),
D = ets:select_delete(S,[setelement(3,hd(M),[true])]),
ets:insert(S,update_records(T,C,Res,[])),
case network_update(C) of
true ->
ovsdb_ap:reset_comm(self()),
#{<<"count">> => D};
_ ->
#{<<"count">> => D}
end.
%--------network_update/1----------------special handling if any network address hase changed
-spec network_update (Updates :: #{binary()=>any()}) -> true | false.
network_update (#{<<"manager_addr">>:=_}) -> true;
network_update (#{<<"redirector_addr">>:=_}) -> true;
network_update (_) -> false.
-spec make_res_rows (Res :: [tuple()], Cols :: [binary()], Acc :: [#{}]) -> [#{}].
%--------make_res_rows/3-----------------formats query results into proper rows map
-spec make_res_rows (Res :: [[any()]], Cols :: [binary()], Acc :: [#{}]) -> [#{}].
make_res_rows ([],_,Acc) ->
lists:reverse(Acc);
@@ -116,20 +138,50 @@ make_res_rows ([_|T],C,Acc) ->
%--------create_match_spec/3-------------creates proper match specification from RPC command for ETS search
-spec create_match_spec (TableName :: binary(), Columns :: [binary()], Where :: []) -> [{tuple(),list(),list()}].
create_match_spec (T,[],W) -> %% an empty list of desired columns means all columns
create_match_spec(T,rec_fields(T),W);
create_match_spec (T,C,_W) ->
create_match_spec (T,C,W) ->
Fxy = fun(X,true) -> list_to_atom(lists:flatten([$$,integer_to_list(X)])); (_,false) -> list_to_atom("_") end,
Fields = rec_fields(T),
MP = [Fxy(I,Y) || {I,X} <- lists:zip(lists:seq(1,length(Fields)),Fields), case lists:member(X,C) of true -> Y=true; _ -> Y=false, true end],
[{list_to_tuple([binary_to_atom(T)|MP]),[],['$$']}].
Clauses = [make_match_clause(T,list_to_tuple(X))||X<-W],
[{list_to_tuple([binary_to_atom(T)|MP]),Clauses,['$$']}].
-spec make_match_clause (TableName :: binary(), {Arg1 :: binary(), Op :: binary(), Arg2 :: binary()}) -> {atom(),any(),any()}.
make_match_clause (T,{A1,Op,A2}) ->
Fields = rec_fields(T),
FieldMap = lists:zip(lists:seq(1,length(Fields)),Fields),
OpMap = #{<<"==">>=>'==', <<"!=">>=>'/=', <<"<=">>=>'<=', <<"<">>=>'<', <<">=">>=>'>=', <<">">>=>'>'},
{maps:get(Op,OpMap,'=='),mk_fp(A1,FieldMap),mk_fp(A2,FieldMap)}.
mk_fp (Val,IndexFields) ->
case lists:keyfind(Val,2,IndexFields) of
false -> Val;
{N,_} -> list_to_atom(lists:flatten([$$,integer_to_list(N)]))
end.
%--------update_recors/4-----------------create an updated record to eb inserted into ETS from RCP call
-spec update_records (TableName :: binary(), NewValues :: #{binary():=any()}, Records :: [[any()]], Acc :: [[any()]]) -> [tuple()].
update_records (T,_,[],Acc) ->
[list_to_tuple([binary_to_atom(T)|X]) || X <- Acc];
update_records (T,V,[R|Rest],Acc) ->
Cand = lists:zip(rec_fields(T),R),
Updt = [Uv || {F,Ov} <- Cand, case maps:is_key(F,V) of true -> Uv=maps:get(F,V), true; _ -> Uv=Ov, true end],
update_records(T,V,Rest,[Updt|Acc]).
%%------------------------------------------------------------------------------
@@ -141,9 +193,3 @@ create_match_spec (T,C,_W) ->
rec_fields (<<"AWLAN_Node">>) ->
[atom_to_binary(X)||X<-record_info(fields,'AWLAN_Node')].

View File

@@ -36,28 +36,10 @@
%%------------------------------------------------------------------------------
%% the tables
-type awlan_node_map() :: #{
mqtt_settings => #{},
redirector_addr => binary(),
manager_addr => binary(),
sku_number => [set|[proplists:proplist()]],
serial_number => binary(),
model => binary(),
firmware_version => binary(),
platform_version => binary(),
revision => binary(),
version_matrix => [map|[proplists:proplist()]]
}.
% -record ('AWLAN_Node',{
% row_idx = 0 :: integer(),
% data = #{} :: awlan_node_map()
% }).
-record ('AWLAN_Node',{
row_idx = 0 :: integer(),
mqtt_settings = #{} :: #{},
mqtt_settings = [map,[]] :: [map|[proplists:proplist()]],
redirector_addr = <<>> :: binary(),
manager_addr = <<>> :: binary(),
sku_number = [set,[]] :: [set|[proplists:proplist()]],