Fixing API

This commit is contained in:
Stephane Bourque
2020-12-18 10:04:05 -08:00
parent d421ce44f1
commit abc85aa5b5
10 changed files with 61 additions and 3825 deletions

View File

@@ -154,7 +154,7 @@
-record( mqtt_connect_variable_header, {
protocol_name = <<0,4,$M:8,$Q:8,$T:8,$T:8>> :: binary(),
protocol_version = 0 :: integer(),
protocol_version = ?MQTT_PROTOCOL_VERSION_3_11 :: ?MQTT_PROTOCOL_VERSION_3_11 | ?MQTT_PROTOCOL_VERSION_5 ,
username_flag = 0 :: integer(),
password_flag = 0 :: integer(),
will_retain_flag = 0 :: integer(),
@@ -325,43 +325,30 @@
-type mqtt_connack_variable_header_v4() :: #mqtt_connack_variable_header_v4{}.
-type mqtt_connack_variable_header_v5() :: #mqtt_connack_variable_header_v5{}.
-type mqtt_publish_variable_header_v4() :: #mqtt_publish_variable_header_v4{}.
-type mqtt_publish_variable_header_v5() :: #mqtt_publish_variable_header_v5{}.
-type mqtt_puback_variable_header_v5() :: #mqtt_puback_variable_header_v5{}.
-type mqtt_puback_variable_header_v4() :: #mqtt_puback_variable_header_v4{}.
-type mqtt_pubrec_variable_header_v4() :: #mqtt_pubrec_variable_header_v4{}.
-type mqtt_pubrec_variable_header_v5() :: #mqtt_pubrec_variable_header_v5{}.
-type mqtt_pubrel_variable_header_v4() :: #mqtt_pubrel_variable_header_v4{}.
-type mqtt_pubrel_variable_header_v5() :: #mqtt_pubrel_variable_header_v5{}.
-type mqtt_pubcomp_variable_header_v4() :: #mqtt_pubcomp_variable_header_v4{}.
-type mqtt_pubcomp_variable_header_v5() :: #mqtt_pubcomp_variable_header_v5{}.
-type mqtt_subscribe_variable_header_v4() :: #mqtt_subscribe_variable_header_v4{}.
-type mqtt_subscribe_variable_header_v5() :: #mqtt_subscribe_variable_header_v5{}.
-type mqtt_suback_variable_header_v4() :: #mqtt_suback_variable_header_v4{}.
-type mqtt_suback_variable_header_v5() :: #mqtt_suback_variable_header_v5{}.
-type mqtt_unsubscribe_variable_header_v4() :: #mqtt_unsubscribe_variable_header_v4{}.
-type mqtt_unsubscribe_variable_header_v5() :: #mqtt_unsubscribe_variable_header_v5{}.
-type mqtt_unsuback_variable_header_v4() :: #mqtt_unsuback_variable_header_v4{}.
-type mqtt_unsuback_variable_header_v5() :: #mqtt_unsuback_variable_header_v5{}.
-type mqtt_disconnect_variable_header_v4() :: #mqtt_disconnect_variable_header_v4{}.
-type mqtt_disconnect_variable_header_v5() :: #mqtt_disconnect_variable_header_v5{}.
-type mqtt_auth_variable_header_v4() :: #mqtt_auth_variable_header_v4{}.
-type mqtt_auth_variable_header_v5() :: #mqtt_auth_variable_header_v5{}.
-type mqtt_pingreq_variable_header_v4() :: #mqtt_pingreq_variable_header_v4{}.
-type mqtt_pingreq_variable_header_v5() :: #mqtt_pingreq_variable_header_v5{}.
-type mqtt_pingresp_variable_header_v4() :: #mqtt_pingresp_variable_header_v4{}.
-type mqtt_pingresp_variable_header_v5() :: #mqtt_pingresp_variable_header_v5{}.

View File

@@ -247,7 +247,6 @@ do( ?HTTP_POST , Req , #request_state{ resource = <<"simulations">> } = State )
nodes = utils:to_atom_list(Nodes) },
_=simengine:update(NewSim),
URI = << <<"/api/v1/simulations/">>/binary, (State#request_state.id)/binary >>,
io:format("URI: ~p~n",[URI]),
Sim = #{ name => NewSim#simulation.name, caname => NewSim#simulation.ca, num_devices => NewSim#simulation.num_devices, nodes => NewSim#simulation.nodes,
server => NewSim#simulation.opensync_server_name,
port=> NewSim#simulation.opensync_server_port ,

View File

@@ -150,7 +150,7 @@ gen('Survey',TimeStamp)->
undefined,undefined,undefined,14007260,undefined,154}],
[],'RAW'}].
-spec gen(atom(),string(),[string()],[{atom(),[string()]}],integer(),integer(), #{ MAC::binary() => #'Client.Stats'{} })->any().
-spec gen(atom(),binary(),[binary()],[{atom(),[binary()]}],integer(),integer(), #{ MAC::binary() => #'Client.Stats'{} })->any().
gen('ClientReport',_MAC,_LANClients,MACSSIDList,TimeStamp,StartTime,MacStats)->
WanClients = lists:foldl(fun({Band,SSID,WiFiMACs},A) ->
[gen_client_report_for_band(TimeStamp,Band,WiFiMACs,SSID,StartTime,MacStats)|A]

View File

@@ -55,28 +55,29 @@ process(#mqtt_processor_state{}=State) ->
{ok,State}.
-spec answer_msg( Msg :: mqtt_answerable(), State :: mqtt_processor_state()) -> {ok,NewState::mqtt_processor_state()}.
answer_msg( #mqtt_connect_variable_header{ protocol_version = ?MQTT_PROTOCOL_VERSION_3_11 }=Msg, State )->
VariableHeader = #mqtt_connack_variable_header_v4{ connect_acknowledge_flag = 0,connect_reason_code = ?MQTT_RC_CONNECTION_ACCEPTED },
Response = #mqtt_msg{ packet_type = ?MQTT_CONNACK , variable_header = VariableHeader },
Blob = mqtt_message:encode(Response),
_Result = (State#mqtt_processor_state.module):send(State#mqtt_processor_state.socket,Blob),
%% io:format("Sending CONNECT response(~p): ~p~n",[Result,Blob]),
Stats1 = State#mqtt_processor_state.stats#mqtt_connection_stats{ client_identifier = Msg#mqtt_connect_variable_header.client_identifier },
Stats2 = ?INCREMENT_STATS2(Stats1,msg_connect,msg_connack),
{ok,State#mqtt_processor_state{ version = Msg#mqtt_connect_variable_header.protocol_version,
stats = Stats2 }};
answer_msg( #mqtt_connect_variable_header{ protocol_version = ?MQTT_PROTOCOL_VERSION_5 }=Msg, State )->
VariableHeader = #mqtt_connack_variable_header_v5{ connect_acknowledge_flag = 0,connect_reason_code = ?MQTT_RC_CONNECTION_ACCEPTED },
Response = #mqtt_msg{ packet_type = ?MQTT_CONNACK , variable_header = VariableHeader },
Blob = mqtt_message:encode(Response),
_Result = (State#mqtt_processor_state.module):send(State#mqtt_processor_state.socket,Blob),
%% io:format("Sending CONNECT response(~p): ~p~n",[Result,Blob]),
Stats1 = State#mqtt_processor_state.stats#mqtt_connection_stats{ client_identifier = Msg#mqtt_connect_variable_header.client_identifier },
Stats2 = ?INCREMENT_STATS2(Stats1,msg_connect,msg_connack),
{ok,State#mqtt_processor_state{ version = Msg#mqtt_connect_variable_header.protocol_version,
stats = Stats2 }};
answer_msg( #mqtt_connect_variable_header{}=Msg, State )->
case Msg#mqtt_connect_variable_header.protocol_version of
?MQTT_PROTOCOL_VERSION_3_11 ->
VariableHeader = #mqtt_connack_variable_header_v4{ connect_acknowledge_flag = 0,connect_reason_code = ?MQTT_RC_CONNECTION_ACCEPTED },
Response = #mqtt_msg{ packet_type = ?MQTT_CONNACK , variable_header = VariableHeader },
Blob = mqtt_message:encode(Response),
_Result = (State#mqtt_processor_state.module):send(State#mqtt_processor_state.socket,Blob),
%% io:format("Sending CONNECT response(~p): ~p~n",[Result,Blob]),
Stats1 = State#mqtt_processor_state.stats#mqtt_connection_stats{ client_identifier = Msg#mqtt_connect_variable_header.client_identifier },
Stats2 = ?INCREMENT_STATS2(Stats1,msg_connect,msg_connack),
{ok,State#mqtt_processor_state{ version = Msg#mqtt_connect_variable_header.protocol_version, stats = Stats2 }};
?MQTT_PROTOCOL_VERSION_5 ->
VariableHeader = #mqtt_connack_variable_header_v5{ connect_acknowledge_flag = 0,connect_reason_code = ?MQTT_RC_CONNECTION_ACCEPTED },
Response = #mqtt_msg{ packet_type = ?MQTT_CONNACK , variable_header = VariableHeader },
Blob = mqtt_message:encode(Response),
_Result = (State#mqtt_processor_state.module):send(State#mqtt_processor_state.socket,Blob),
%% io:format("Sending CONNECT response(~p): ~p~n",[Result,Blob]),
Stats1 = State#mqtt_processor_state.stats#mqtt_connection_stats{ client_identifier = Msg#mqtt_connect_variable_header.client_identifier },
Stats2 = ?INCREMENT_STATS2(Stats1,msg_connect,msg_connack),
{ok,State#mqtt_processor_state{ version = Msg#mqtt_connect_variable_header.protocol_version, stats = Stats2 }};
0 ->
{ok,State}
end;
answer_msg( #mqtt_publish_variable_header_v4{}=Msg, State )->
case Msg#mqtt_publish_variable_header_v4.qos_level_flag of
0 ->

View File

@@ -66,11 +66,11 @@ listen_loop(Id,ListenSock,ParentPid)->
socket = Socket,
version = undefined
}]),
ssl:controlling_process(Socket,Pid),
listen_loop_secure(Id,ListenSock,ParentPid);
_=gen_tcp:controlling_process(Socket,Pid),
listen_loop(Id,ListenSock,ParentPid);
Error ->
io:format("SSL Handshake Error: ~p~n",[Error]),
listen_loop_secure(Id,ListenSock,ParentPid)
listen_loop(Id,ListenSock,ParentPid)
end.
listen_loop_secure(Id,ListenSock,ParentPid)->
@@ -91,7 +91,7 @@ listen_loop_secure(Id,ListenSock,ParentPid)->
socket = SslSocket,
version = undefined
}]),
ssl:controlling_process(SslSocket,Pid),
_=ssl:controlling_process(SslSocket,Pid),
listen_loop_secure(Id,ListenSock,ParentPid);
Error ->
io:format("SSL Handshake Error: ~p~n",[Error]),

View File

@@ -59,7 +59,7 @@ connack_packet_encoding_decoding_test()->
publish_packet_encoding_decoding_test()->
?DBGTRC("Starting"),
PacketVariableHeader = #mqtt_publish_variable_header_v5{
topic_name = "topic/b/c",
topic_name = <<"topic/b/c">>,
packet_identifier = 12345,
properties = [
{ payload_format_indicator, 0 },

View File

@@ -11,6 +11,8 @@
-behaviour(gen_server).
-include("../include/common.hrl").
%% API
-export([start_link/0,creation_info/0,update_stats/0,node_type/0,find_manager/2,connect/1,disconnect/0,
connected/0]).
@@ -25,7 +27,13 @@
-define(SERVER, ?MODULE).
-define(START_SERVER,{local,?MODULE}).
-record(node_state, { node_type, updater, nodeid, node_finder_timer, manager }).
-record(node_state, {
node_type,
nodeid,
updater_timer = undefined,
node_finder_timer = undefined,
manager }).
%%%===================================================================
%%% API
%%%===================================================================
@@ -80,7 +88,7 @@ init([]) ->
{ok,NodeFinder} = timer:apply_interval(7500,?MODULE,find_manager,[self(),NodeId])
end,
{ok,TRef} = timer:apply_interval(2000,?MODULE,update_stats,[]),
{ok, #node_state{ node_type = NodeType, updater = TRef, node_finder_timer = NodeFinder, nodeid = NodeId }}.
{ok, #node_state{ node_type = NodeType, updater_timer = TRef, node_finder_timer = NodeFinder, nodeid = NodeId }}.
%% @private
%% @doc Handling call messages
@@ -152,11 +160,13 @@ handle_info(_Info, State = #node_state{}) ->
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #node_state{}) -> term()).
terminate(_Reason, State = #node_state{}) ->
_ = timer:cancel(State#node_state.updater),
_= case State#node_state.node_finder_timer of
undefined -> ok;
_ -> timer:cancel(State#node_state.node_finder_timer)
end,
_ = timer:cancel(State#node_state.updater_timer),
_=case State#node_state.node_type of
manager ->
ok;
_ ->
timer:cancel(State#node_state.node_finder_timer)
end,
ok.
%% @private
@@ -239,10 +249,10 @@ try_connecting(NodeName,State)->
_=global:sync(),
manager:connect(State#node_state.node_type),
erlang:monitor_node(NodeName,true),
_=lager:info("Adding new manager ~p node.",[NodeName]),
?L_IA("Adding new manager ~p node.",[NodeName]),
State#node_state{ manager = NodeName };
pang ->
_=lager:info("Manager node ~p unresponsive.",[NodeName]),
?L_IA("Manager node ~p unresponsive.",[NodeName]),
State
end
end.

View File

@@ -4127,8 +4127,8 @@ t1(FileName)->
{ok,Msg} = mqtt_message:decode(apply(?MODULE,X,[]),4),
Decompressed = zlib:uncompress(Msg#mqtt_msg.variable_header#mqtt_publish_variable_header_v4.payload),
Report=opensync_stats:decode_msg(Decompressed,'Report'),
dump_header(FileName,E),
dump_data(FileName,Report),A
_=dump_header(FileName,E),
_=dump_data(FileName,Report),A
end,[],L).
dump_header(FileName,E)->
@@ -4140,30 +4140,3 @@ dump_data(FileName,Data)->
{ok,IoDev}=file:open(FileName,[append]),
io:fwrite(IoDev,"~p~n~n",[Data]),
_=file:close(IoDev).
t2()->
test("SIM11000001000",
[ {'BAND5GU',animals:get_an_animal(),["00:01:00:00:20:00","00:01:00:00:21:00","00:01:00:00:22:00"]},
{'BAND2G',animals:get_an_animal(),["00:01:01:00:20:00","00:01:01:00:21:00","00:01:01:00:22:00"]}]).
test(Serial,MACSSIDList)->
TimeStamp = os:system_time(),
TR = #'Report'{ nodeID = Serial,
device = [ #'Device'{
timestamp_ms = TimeStamp,
uptime = 12,
load = mqtt_os_gen:gen('Device.LoadAvg'),
mem_util = mqtt_os_gen:gen('Device.MemUtil'),
fs_util = mqtt_os_gen:gen('Device.FsUtil'),
cpuUtil = mqtt_os_gen:gen('Device.CpuUtil'),
thermal_stats = mqtt_os_gen:gen('Device.Thermal'),
radio_temp = mqtt_os_gen:gen('Device.RadioTemp'),
ps_cpu_util = mqtt_os_gen:gen('Device.PerProcessUtil',ps_cpu_util),
ps_mem_util = mqtt_os_gen:gen('Device.PerProcessUtil',ps_mem_util)
}],
neighbors = mqtt_os_gen:gen('Neighbor'),
clients = mqtt_os_gen:gen('ClientReport',MACSSIDList)
},
opensync_stats:encode_msg(TR,'Report').

File diff suppressed because it is too large Load Diff

View File

@@ -104,23 +104,22 @@ update_stats(Client,Role,Stats)->
%%% gen_server callbacks
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Config::term()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
-spec start_link(Config::term()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}.
start_link(Config) ->
gen_server:start_link(?START_SERVER, ?MODULE, [Config], []).
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #simnode_state{}} | {ok, State :: #simnode_state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
-spec init(Args :: proplists:proplist()) -> {ok, State :: #simnode_state{}}.
%%| {ok, State :: #simnode_state{}, timeout() | hibernate} |
%%{stop, Reason :: term()} | ignore).
init([Config]) ->
NodeId = utils:app_env(node_id,1),
{ok,#simnode_state{ node_id = NodeId,
ap_client_handler = proplists:get_value(ap_client,Config,undefined),
mqtt_server_handler = proplists:get_value(mqtt_server,Config,undefined),
ovsdb_server_handler = proplists:get_value(ovsdb_server,Config,undefined),
manager = none }}.
_=utils:priv_dir(),
{ ok, #simnode_state{ node_id = utils:app_env(node_id,1),
ap_client_handler = proplists:get_value(ap_client,Config,undefined),
mqtt_server_handler = proplists:get_value(mqtt_server,Config,undefined),
ovsdb_server_handler = proplists:get_value(ovsdb_server,Config,undefined),
manager = none }}.
%% @private
%% @doc Handling call messages