diff --git a/include/common.hrl b/include/common.hrl index 44eeba2..256ac7a 100644 --- a/include/common.hrl +++ b/include/common.hrl @@ -27,8 +27,8 @@ -ifdef(debug). -define(DBGTRC(Msg), io:format("~s:~s (~p) => ~s~n",[?MODULE,?FUNCTION_NAME,?LINE,Msg])). -define(DBGTRC(FMsg,Args), io:format("~s:~s (~B) => " FMsg "~n",[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)). - -define(DBGSTR(Msg), io_lib:format("~s:~s (~B) => ~s~n",[?MODULE,?FUNCTION_NAME,?LINE,Msg])). - -define(DBGSTR(FMsg,Args), io_lib:format("~s:~s (~B) => " FMsg "~n",[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)). + -define(DBGSTR(Msg), io_lib:format("~s:~s (~B) => ~s",[?MODULE,?FUNCTION_NAME,?LINE,Msg])). + -define(DBGSTR(FMsg,Args), io_lib:format("~s:~s (~B) => " FMsg,[?MODULE,?FUNCTION_NAME,?LINE] ++ Args)). -else. -define(DBGTRC(Msg), true). -define(DBGTRC(FMsg,Args), true). diff --git a/src/ovsdb_ap.erl b/src/ovsdb_ap.erl index c97af93..94a6c83 100644 --- a/src/ovsdb_ap.erl +++ b/src/ovsdb_ap.erl @@ -109,7 +109,7 @@ rpc_cmd (Node,Rpc) -> -spec reset_comm (Node :: pid()) -> ok. reset_comm (Node) -> - gen_server:call(Node,reset_comm). + gen_server:cast(Node,reset_comm). @@ -175,6 +175,12 @@ handle_cast (ap_cancel, State) -> _ = cancel_simulation(State), {stop, normal, State}; +handle_cast (reset_comm, State) -> + {noreply,ctrl_connect(State)}; + +handle_cast (ctlr_start_comm, State) -> + {noreply, ctlr_start_comm(State)}; + handle_cast (R,State) -> ?L_E(?DBGSTR("got unknown request: ~p",[R])), {noreply, State}. @@ -186,10 +192,29 @@ handle_cast (R,State) -> Request :: term(), From :: {pid(),Tag::term()}, State :: #ap_state{}, - Reply :: term(), + Reply :: ok | invalid | ignored, Reason :: term(), NewState :: #ap_state{}. +handle_call ({exec_rpc, RPC}, _From, #ap_state{status=paused}=State) when is_map(RPC) andalso + is_map_key(<<"method">>,RPC) andalso + is_map_key(<<"id">>,RPC) -> + case maps:get(<<"method">>,RPC) of + <<"echo">> -> + case ovsdb_ap_rpc:eval_req(<<"echo">>, maps:get(<<"id">>,RPC), RPC, State#ap_state.store) of + {ok, Result} when is_map(Result) andalso is_map_key(<<"result">>,Result) -> + ok = ovsdb_ap_comm:send_term(State#ap_state.comm,Result), + {reply,ok,State}; + _ -> + {reply, ignored, State} + end; + _ -> + {reply, ignored, State} + end; + +handle_call ({exec_rpc, _RPC}, _From, #ap_state{status=paused}=State) -> + {reply, ignored, State}; + handle_call ({exec_rpc, RPC}, _From, State) when is_map(RPC) andalso is_map_key(<<"method">>,RPC) andalso is_map_key(<<"id">>,RPC) -> @@ -225,7 +250,7 @@ handle_call ({exec_rpc, RPC}, _From, State) when is_map(RPC) andalso end; handle_call ({exec_rpc, RPC}, _From, State) -> - io:format("invalid RPC: ~p~n",[RPC]), + ?L_E(?DBGSTR("invalid RPC: ~p~n",[RPC])), {reply,invalid, State}; handle_call (Request, From, State) -> @@ -242,9 +267,11 @@ handle_call (Request, From, State) -> NewState :: #ap_state{}. +handle_info({'EXIT', _Pid, normal}, State) -> + {noreply, State}; + handle_info({'EXIT', Pid, Reason}, State) -> ?L_E(?DBGSTR("Abnormal exit from ~p with reason: ~p",[Pid,Reason])), - %% @TODO: implement proper restart strategy for linked processes {noreply, State}; @@ -313,8 +340,9 @@ prepare_state (ID, SimMan) -> State :: #ap_state{}, NewState :: #ap_state{}. -set_status (Status, #ap_state{config=Cfg}=State) -> +set_status (Status, #ap_state{status=OldStatus, config=Cfg}=State) -> ovsdb_client_handler:ap_status(Status,ovsdb_ap_config:id(Cfg)), + ?L_I(?DBGSTR("AP ~p : status change := ~p -> ~p",[self(),OldStatus,Status])), State#ap_state{status=Status}. @@ -335,26 +363,13 @@ startup_ap (#ap_state{status=init, config=Cfg, sim_manager=Man}=State) -> -spec run_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}. -run_simulation (#ap_state{status=ready,config=Cfg}=State) -> - Opts = [ - {host, ovsdb_ap_config:tip_redirector(host,Cfg)}, - {port, ovsdb_ap_config:tip_redirector(port,Cfg)}, - {ca, ovsdb_ap_config:ca_certs(Cfg)}, - {cert, ovsdb_ap_config:client_cert(Cfg)} - ], - {ok, Comm} = ovsdb_ap_comm:start_link(Opts), - set_status(running, State#ap_state{comm=Comm}); - -run_simulation (#ap_state{status=paused,config=Cfg}=State) -> - Opts = [ - {host, ovsdb_ap_config:tip_manager(host,Cfg)}, - {port, ovsdb_ap_config:tip_manager(port,Cfg)}, - {ca, ovsdb_ap_config:ca_certs(Cfg)}, - {cert, ovsdb_ap_config:client_cert(Cfg)} - ], - {ok, Comm} = ovsdb_ap_comm:start_link(Opts), - set_status(running, State#ap_state{comm=Comm}). +run_simulation (#ap_state{status=ready}=State) -> + NewState = ctrl_connect(State), + set_status(running,NewState); +run_simulation (#ap_state{status=paused}=State) -> + set_status(running,State). + @@ -363,7 +378,8 @@ run_simulation (#ap_state{status=paused,config=Cfg}=State) -> -spec stop_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}. stop_simulation (State) -> - set_status(ready, State). + NewState = ctrl_disconnect(State), + set_status(ready, NewState). @@ -383,4 +399,66 @@ pause_simulation (State) -> -spec cancel_simulation (State :: #ap_state{}) -> NewState :: #ap_state{}. cancel_simulation (State) -> - State. \ No newline at end of file + State. + + + + +%--------ctrl_connect/1------------------connect to either the tip redirector or manager based on state / old connections are closed if open + +-spec ctrl_connect (State :: #ap_state{}) -> NewState :: #ap_state{}. + +ctrl_connect (#ap_state{comm=none, status=ready, config=Cfg}=State) -> + Opts = [ + {host, ovsdb_ap_config:tip_redirector(host,Cfg)}, + {port, ovsdb_ap_config:tip_redirector(port,Cfg)}, + {ca, ovsdb_ap_config:ca_certs(Cfg)}, + {cert, ovsdb_ap_config:client_cert(Cfg)} + ], + {ok, Comm} = ovsdb_ap_comm:start_link(Opts), + gen_server:cast(self(),ctlr_start_comm), + State#ap_state{comm=Comm}; + +ctrl_connect (#ap_state{comm=none, status=running, config=Cfg}=State) -> + O = case ovsdb_ap_config:tip_manager(host,Cfg) of + [] -> + [{host, ovsdb_ap_config:tip_redirector(host,Cfg)}, + {port, ovsdb_ap_config:tip_redirector(port,Cfg)}]; + _ -> + [{host, ovsdb_ap_config:tip_manager(host,Cfg)}, + {port, ovsdb_ap_config:tip_manager(port,Cfg)}] + end, + Opts = [{ca, ovsdb_ap_config:ca_certs(Cfg)},{cert, ovsdb_ap_config:client_cert(Cfg)}|O], + {ok, Comm} = ovsdb_ap_comm:start_link(Opts), + gen_server:cast(self(),ctlr_start_comm), + State#ap_state{comm=Comm}; + +ctrl_connect (#ap_state{comm=Comm}=State) -> + ovsdb_ap_comm:end_comm(Comm), + ctrl_connect (State#ap_state{comm=none}). + + + + +%--------ctrl_disconnect/1---------------disconnect and closes communication port but otherwise does not chenge status + +-spec ctrl_disconnect (State :: #ap_state{}) -> NewState :: #ap_state{}. + +ctrl_disconnect (#ap_state{comm=none}=State) -> + State; + +ctrl_disconnect (#ap_state{comm=Comm}=State) -> + ovsdb_ap_comm:end_comm(Comm), + State#ap_state{comm=none}. + + + +%--------ctlr_start_comm/1---------------asychrounously starts the connection after comm is created + +-spec ctlr_start_comm (State :: #ap_state{}) -> NewState :: #ap_state{}. + +ctlr_start_comm (#ap_state{comm=Comm}=State) -> + ovsdb_ap_comm:start_comm(Comm), + State. + + diff --git a/src/ovsdb_ap_comm.erl b/src/ovsdb_ap_comm.erl index 3f68187..d773698 100644 --- a/src/ovsdb_ap_comm.erl +++ b/src/ovsdb_ap_comm.erl @@ -15,14 +15,6 @@ %%----------------------------------------------------------------------------- %% types and specifications --record (c_state, { - socket :: ssl:sslsocket(), - ap :: pid(), - status :: active | idle, - rxb = <<"">> :: binary() -}). - - -type options() :: [ {host, string()} | % tip controller host name {port, integer()} | % port to connect @@ -30,15 +22,21 @@ {cert, binary()} % ]. - - -export_type([options/0]). +-record (c_state, { + options :: options(), + socket :: none | ssl:sslsocket(), + ap :: pid(), + status :: active | idle, + rxb = <<"">> :: binary() +}). + %%------------------------------------------------------------------------------ %% API --export ([start_link/1,create_comm/2,send_term/2,end_comm/1]). +-export ([start_link/1,create_comm/2,send_term/2,end_comm/1,start_comm/1]). -spec start_link (Options :: options()) -> {ok, pid()}. @@ -51,7 +49,7 @@ start_link (Options) -> -spec send_term(Comm :: pid(), Data :: term()) -> ok | {error, Reason :: string()}. -send_term (Comm, Data) when is_map(Data) -> +send_term (Comm, Data) when is_pid(Comm) andalso is_map(Data) -> Comm ! {send, self(), Data}, ok; @@ -67,6 +65,13 @@ end_comm (Comm) -> ok. +-spec start_comm (Comm :: pid()) -> ok. + +start_comm (Comm) -> + Comm ! {start, self()}, + ok. + + %%------------------------------------------------------------------------------ %% internals @@ -74,15 +79,11 @@ end_comm (Comm) -> -spec create_comm (Options :: options(), AP :: pid()) -> ok. create_comm (Opts, AP) -> - H = proplists:get_value(host,Opts), - P = proplists:get_value(port,Opts), - CAs = [X || {'Certificate',X,not_encrypted} <- public_key:pem_decode(proplists:get_value(ca,Opts))], - [{'Certificate',Cert,not_encrypted}, - {KeyType,Key,not_encrypted}] = public_key:pem_decode(proplists:get_value(cert,Opts)), State = #c_state{ - socket = connect_to_server(H,P,CAs,Cert,{KeyType,Key}), + options = Opts, + socket = none, ap = AP, - status = active + status = idle }, comm_loop(State). @@ -113,12 +114,16 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) -> {send, AP, Data} -> ToSend = jiffy:encode(Data), - io:format("Sending: ~s~n",[ToSend]), + ?L_I(?DBGSTR("Sending: ~s",[ToSend])), ok = ssl:send(S,ToSend), comm_loop(State#c_state{status=active}); {down, AP} -> - ssl:close(S) + ssl:close(S); + + {start, AP} -> + NewState = start_connection(State), + comm_loop(NewState) after 10000 -> @@ -127,6 +132,22 @@ comm_loop (#c_state{socket=S, rxb=Rx, ap=AP}=State) -> +-spec start_connection (State :: #c_state{}) -> NewState :: #c_state{}. + +start_connection (#c_state{options=Opts}=State) -> + H = proplists:get_value(host,Opts), + P = proplists:get_value(port,Opts), + CAs = [X || {'Certificate',X,not_encrypted} <- public_key:pem_decode(proplists:get_value(ca,Opts))], + [{'Certificate',Cert,not_encrypted}, + {KeyType,Key,not_encrypted}] = public_key:pem_decode(proplists:get_value(cert,Opts)), + State#c_state{ + socket = connect_to_server(H,P,CAs,Cert,{KeyType,Key}), + status = active + }. + + + + -spec connect_to_server (Host, Port, CAs, Cert, Key) -> Socket when Host :: string(), %% host name to connect to (can be IP address in string format) Port :: integer(), %% port to connect to @@ -144,6 +165,7 @@ connect_to_server (Host, Port, CAs, Cert, Key) -> {session_tickets,auto}, {mode,binary}, {active,once}], + ?L_I(?DBGSTR("AP connecting to ~s:~B",[Host,Port])), case ssl:connect(Host, Port, Opts) of {ok, Socket} -> Socket; {error, Reason} -> diff --git a/src/ovsdb_ap_config.erl b/src/ovsdb_ap_config.erl index b152d50..3bebb35 100644 --- a/src/ovsdb_ap_config.erl +++ b/src/ovsdb_ap_config.erl @@ -98,11 +98,6 @@ client_cert (Cfg) -> Cfg#cfg.client_cert. -spec tip_redirector (Part :: host | port, Config :: cfg()) -> string() | integer(). -% tip_redirector (Part,#cfg{store_ref=Store}) -> -% [#'AWLAN_Node'{data=D}|_] = ets:lookup(Store,'AWLAN_Node'), -% #{redirector_addr:=R} = D, -% get_host_or_port(Part,R). - tip_redirector (Part,#cfg{store_ref=Store}) -> [#'AWLAN_Node'{redirector_addr=R}|_] = ets:lookup(Store,'AWLAN_Node'), get_host_or_port(Part,R). @@ -110,25 +105,26 @@ tip_redirector (Part,#cfg{store_ref=Store}) -> -spec tip_manager (Part :: host | port, Config :: cfg()) -> string() | integer(). -% tip_manager (Part,#cfg{store_ref=Store}) -> -% [#'AWLAN_Node'{data=D}|_] = ets:lookup(Store,'AWLAN_Node'), -% #{manager_addr:=R} = D, -% get_host_or_port(Part,R). - tip_manager (Part,#cfg{store_ref=Store}) -> [#'AWLAN_Node'{manager_addr=R}|_] = ets:lookup(Store,'AWLAN_Node'), get_host_or_port(Part,R). --spec get_host_or_port (Part :: host | port, Addr :: string() | binary()) -> string() | integer(). +-spec get_host_or_port (Part :: host | port, Addr :: binary()) -> string() | integer(). get_host_or_port (Part, Addr) when is_binary(Addr) -> - [H,P] = string:split(Addr,":",trailing), + Parts = string:split(Addr,":",all), case Part of - host when is_binary(H) -> binary_to_list(H); - host -> H; - port when is_binary(P) -> binary_to_integer(P); - port -> list_to_integer(P) + host -> case Parts of + [_,H,_] -> binary_to_list(H); + [H,_] -> binary_to_list(H); + _ -> "" + end; + port -> case Parts of + [_,_,P] -> binary_to_integer(P); + [_,P] -> binary_to_integer(P); + _ -> 0 + end end. diff --git a/src/ovsdb_ap_rpc.erl b/src/ovsdb_ap_rpc.erl index a03de4b..236a96b 100644 --- a/src/ovsdb_ap_rpc.erl +++ b/src/ovsdb_ap_rpc.erl @@ -28,20 +28,20 @@ Reason :: term(). eval_req(<<"echo">>, Id, _Data, _Store) -> - ?DBGTRC("RPC request: ~s (~s)~n",[<<"echo">>,Id]), + ?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"echo">>,Id])), {ok, make_result(Id,#{})}; eval_req(<<"transact">>,Id,#{<<"params">>:=P},Store) -> - ?DBGTRC("RPC request: ~s (~s)~n",[<<"transact">>,Id]), + ?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"transact">>,Id])), Qr = table_query(P,Store), {ok, make_result(Id,Qr)}; eval_req(<<"get_schema">>,Id,_,_Store) -> - ?DBGTRC("RPC request: ~s (~s)~n",[<<"get_schema">>,Id]), + ?L_I(?DBGSTR("RPC request: ~s (~s)",[<<"get_schema">>,Id])), {ok, ignore}; eval_req (Method, Id, _Data, _Store) -> - io:format("RPC request: ~s (~s)~n",[Method,Id]), + ?L_I(?DBGSTR("RPC request: ~s (~s)",[Method,Id])), {error,io_lib:format("~s not recognized",[Method])}. @@ -96,12 +96,34 @@ table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"select">>, <<"columns">>:=C, <<" #{ <<"rows">> => make_res_rows(Res,C,[])}; -table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"update">>, <<"row">>:=C}],S) -> - #{}. +table_query ([_,#{<<"table">>:=T, <<"op">>:= <<"update">>, <<"row">>:=C, <<"where">>:=W}],S) -> + M = create_match_spec(T,[],W), + Res = ets:select(S,M), + D = ets:select_delete(S,[setelement(3,hd(M),[true])]), + ets:insert(S,update_records(T,C,Res,[])), + case network_update(C) of + true -> + ovsdb_ap:reset_comm(self()), + #{<<"count">> => D}; + _ -> + #{<<"count">> => D} + end. + + + +%--------network_update/1----------------special handling if any network address hase changed + +-spec network_update (Updates :: #{binary()=>any()}) -> true | false. + +network_update (#{<<"manager_addr">>:=_}) -> true; +network_update (#{<<"redirector_addr">>:=_}) -> true; +network_update (_) -> false. --spec make_res_rows (Res :: [tuple()], Cols :: [binary()], Acc :: [#{}]) -> [#{}]. +%--------make_res_rows/3-----------------formats query results into proper rows map + +-spec make_res_rows (Res :: [[any()]], Cols :: [binary()], Acc :: [#{}]) -> [#{}]. make_res_rows ([],_,Acc) -> lists:reverse(Acc); @@ -116,20 +138,50 @@ make_res_rows ([_|T],C,Acc) -> +%--------create_match_spec/3-------------creates proper match specification from RPC command for ETS search + -spec create_match_spec (TableName :: binary(), Columns :: [binary()], Where :: []) -> [{tuple(),list(),list()}]. create_match_spec (T,[],W) -> %% an empty list of desired columns means all columns create_match_spec(T,rec_fields(T),W); -create_match_spec (T,C,_W) -> +create_match_spec (T,C,W) -> Fxy = fun(X,true) -> list_to_atom(lists:flatten([$$,integer_to_list(X)])); (_,false) -> list_to_atom("_") end, Fields = rec_fields(T), MP = [Fxy(I,Y) || {I,X} <- lists:zip(lists:seq(1,length(Fields)),Fields), case lists:member(X,C) of true -> Y=true; _ -> Y=false, true end], - [{list_to_tuple([binary_to_atom(T)|MP]),[],['$$']}]. + Clauses = [make_match_clause(T,list_to_tuple(X))||X<-W], + [{list_to_tuple([binary_to_atom(T)|MP]),Clauses,['$$']}]. +-spec make_match_clause (TableName :: binary(), {Arg1 :: binary(), Op :: binary(), Arg2 :: binary()}) -> {atom(),any(),any()}. +make_match_clause (T,{A1,Op,A2}) -> + Fields = rec_fields(T), + FieldMap = lists:zip(lists:seq(1,length(Fields)),Fields), + OpMap = #{<<"==">>=>'==', <<"!=">>=>'/=', <<"<=">>=>'<=', <<"<">>=>'<', <<">=">>=>'>=', <<">">>=>'>'}, + {maps:get(Op,OpMap,'=='),mk_fp(A1,FieldMap),mk_fp(A2,FieldMap)}. + + +mk_fp (Val,IndexFields) -> + case lists:keyfind(Val,2,IndexFields) of + false -> Val; + {N,_} -> list_to_atom(lists:flatten([$$,integer_to_list(N)])) + end. + + + +%--------update_recors/4-----------------create an updated record to eb inserted into ETS from RCP call + +-spec update_records (TableName :: binary(), NewValues :: #{binary():=any()}, Records :: [[any()]], Acc :: [[any()]]) -> [tuple()]. + +update_records (T,_,[],Acc) -> + [list_to_tuple([binary_to_atom(T)|X]) || X <- Acc]; + +update_records (T,V,[R|Rest],Acc) -> + Cand = lists:zip(rec_fields(T),R), + Updt = [Uv || {F,Ov} <- Cand, case maps:is_key(F,V) of true -> Uv=maps:get(F,V), true; _ -> Uv=Ov, true end], + update_records(T,V,Rest,[Updt|Acc]). %%------------------------------------------------------------------------------ @@ -141,9 +193,3 @@ create_match_spec (T,C,_W) -> rec_fields (<<"AWLAN_Node">>) -> [atom_to_binary(X)||X<-record_info(fields,'AWLAN_Node')]. - - - - - - diff --git a/src/ovsdb_ap_tables.hrl b/src/ovsdb_ap_tables.hrl index 5d02ee5..e53a106 100644 --- a/src/ovsdb_ap_tables.hrl +++ b/src/ovsdb_ap_tables.hrl @@ -36,28 +36,10 @@ %%------------------------------------------------------------------------------ %% the tables --type awlan_node_map() :: #{ - mqtt_settings => #{}, - redirector_addr => binary(), - manager_addr => binary(), - sku_number => [set|[proplists:proplist()]], - serial_number => binary(), - model => binary(), - firmware_version => binary(), - platform_version => binary(), - revision => binary(), - version_matrix => [map|[proplists:proplist()]] -}. - -% -record ('AWLAN_Node',{ -% row_idx = 0 :: integer(), -% data = #{} :: awlan_node_map() -% }). - -record ('AWLAN_Node',{ row_idx = 0 :: integer(), - mqtt_settings = #{} :: #{}, + mqtt_settings = [map,[]] :: [map|[proplists:proplist()]], redirector_addr = <<>> :: binary(), manager_addr = <<>> :: binary(), sku_number = [set,[]] :: [set|[proplists:proplist()]],