Erlangを使って非常に簡単なリング状分散データ構造(Chordっぽいだけだよ!)を実装してみた

まぁ大層なもんじゃなくて、200行ちょいぐらいのものなんですが。

-module(simple_chord).
-compile(export_all).

-record(peer,{id,predecessor,successor,data}).

start() ->
    crypto:start(),
    Node0 = spawn(?MODULE,init,[]),
    put(0,true),
    register(node0,Node0).

join() ->
    node0 ! {join_rand,self()},
    receive
	NodeID ->
	    io:fwrite("This node is ~w\n",[NodeID]),
	    register(get_node(NodeID),spawn(?MODULE,init,[NodeID])),
	    timer:apply_after(100,?MODULE,find_successor,[NodeID])
    end.

join(NodeID) ->
    node0 ! {join_direct,NodeID,self()},
    receive
	ok ->
	    register(get_node(NodeID),spawn(?MODULE,init,[NodeID])),
	    timer:apply_after(100,?MODULE,find_successor,[NodeID]);
	fail ->
	    io:fwrite("NodeID ~w is already used!\n",[NodeID])
    end.

init() ->
    State = init_state(0),
    process_flag(trap_exit,true),
    loop(State).

init(Id) ->
    State = init_state(Id,0,0),
    process_flag(trap_exit,true),
    loop(State).

init_state(Node_ID) ->
    State = #peer{id = Node_ID,predecessor = Node_ID,successor = Node_ID,data = dict:new()},
    State.

init_state(Self,Pre,Suc) ->
    State = #peer{id = Self,predecessor = Pre,successor = Suc,data = dict:new()},
    State.

loop(State) ->
    ID = State#peer.id,
    Predecessor = State#peer.predecessor,
    Successor = State#peer.successor,
    receive
	{join_rand,From} ->
	    From ! gen_pid(),
	    loop(State);
	
	{join_direct,WanteID,From} ->
	    case get(WanteID) of
		true ->
		    From ! fail;
		undefined ->
		    put(WanteID,true),
		    From ! ok
	    end,
	    loop(State);

	{find_successor} ->
	    get_node(Successor) ! {request_find_successor,ID},
	    loop(State);

	{request_find_successor,ParentID} ->
	    case ID == Successor of
		true ->
		    get_node(ParentID) ! {response_find_successor,Successor,ID};
		false ->
		    case is_element(ParentID,ID,Successor) of
			true -> 
			    get_node(ParentID) ! {response_find_successor,Successor,ID};
			false -> 
			    get_node(Successor) ! {request_find_successor,ParentID}
		    end
	    end,
	    loop(State);

	{response_find_successor,AnotherID1,AnotherID2} ->
	    NewSuccessor = AnotherID1,
	    NewPredecessor = AnotherID2,
	    NewState = State#peer{predecessor = NewPredecessor,
				  successor = NewSuccessor},
	    get_node(NewSuccessor) ! {change_predecessor,ID},
	    get_node(NewPredecessor) ! {change_successor,ID},
	    loop(NewState);

	{change_predecessor,NewPredecessor} ->
	    NewState = State#peer{predecessor = NewPredecessor},
	    loop(NewState);

	{change_successor,NewSuccessor} ->
	    NewState = State#peer{successor = NewSuccessor},
	    loop(NewState);

	{store,Key,Value} ->
	    case ID == Successor of
		true ->
		    NewDict = dict:store(Key,Value,State#peer.data),
		    NewState = State#peer{data = NewDict};
		false ->
		    case (Key == ID) or (is_element(Key,Predecessor,ID)) of
			true ->
			    NewDict = dict:store(Key,Value,State#peer.data),
			    NewState = State#peer{data = NewDict};
			false ->
			    get_node(Successor) ! {store,Key,Value},
			    NewState = State
		    end
	    end,
	    loop(NewState);

	{lookup,Key,ParentID} ->
	    case ID == Successor of
		true ->
		    case dict:is_key(Key,State#peer.data) of
			true ->
			    ParentID ! dict:fetch(Key,State#peer.data);
			false ->
			    ParentID ! none1
		    end;
		false ->
		    case (Key == ID) or (is_element(Key,Predecessor,ID)) of
			true ->
			    case dict:is_key(Key,State#peer.data) of
				true ->
				    ParentID ! dict:fetch(Key,State#peer.data);
				false ->
				    ParentID ! none2
			    end;
			false ->
			    get_node(Successor) ! {lookup,Key,ParentID}
		    end
	    end,
	    loop(State);

	{get_dict,ParentID} ->
	    ParentID ! State#peer.data,
	    loop(State);

	{exit} ->
	    get_node(Successor) ! {delegate_data,State#peer.data},
	    get_node(Successor) ! {change_predecessor,Predecessor},
	    get_node(Predecessor) ! {change_successor,Successor};
	
	{delegate_data,Delegate_Data} ->
	    NewState = State#peer{data = dict:from_list( (State#peer.data):to_list() ++ Delegate_Data:to_list() )},
	    loop(NewState);

	{get_pair,ParentID} ->
	    ParentID ! {Successor,Predecessor},
	    loop(State);
	
	Other ->
	    io:fwrite("~w other ~w\n",[ID,[Other]]),
	    loop(State)
    end.

find_successor(NodeID) ->
    get_node(NodeID) ! {find_successor}.

gen_pid() ->
    <<Pid:160>> = crypto:rand_bytes(20),
    case get(Pid) =:= undefined of
	true -> put(Pid,true),
		Pid;
	false  -> gen_pid()
    end.

get_node(NodeID) ->
    list_to_atom("node" ++ integer_to_list(NodeID)).

%KeyとValueの組でstoreを行う.
store(Key,Value) ->
    node0 ! {store,Key,Value}.

%対応するKeyに対するValueを取って来る。
lookup(Key) ->
    node0 ! {lookup,Key,self()},
    receive
	Value ->
	    io:fwrite("get ~w\n",[Value])
    end.

%Head < Id < Tailの関係を満たすかどうかを調べる。
is_element(Id,Head,Tail) ->
    if
	(Head < Id) and (Id < Tail) -> %% HeadとTailが0をまたがない時。-- 0 -- Head -- Tail -- 0 --
	    true;
	(Head > Tail) and ((Id > Head) or (Id < Tail)) -> %% -- Head -- 0 -- Tail -- の時
	    true;
	true ->
	    false
    end.


%指定したIDのノードを脱落させる。
exit(NodeID) ->
    get_node(NodeID) ! {exit},
    exit(get_node(NodeID),"user_exit!").

%補助関数
get_pair(NodeID) ->
    get_node(NodeID) ! {get_pair,self()},
    receive
	{Successor,Predecessor} ->
	    io:fwrite("{Successor = ~w, Predecessor = ~w}\n",[Successor,Predecessor])
    end.

get_dict(NodeID) ->
    get_node(NodeID) ! {get_dict,self()},
    receive
	Dict ->
	    io:fwrite("Dict ~w\n",[dict:to_list(Dict)])
    end.
erlang% erl -sname chord
Erlang (BEAM) emulator version 5.6.4 [source] [async-threads:0] [kernel-poll:false]

Eshell V5.6.4  (abort with ^G)
(chord@localhost)1> c(simple_chord).
{ok,simple_chord}
(chord@localhost)2> simple_chord:start().
true
(chord@localhost)3> simple_chord:join().
This node is 381978395276524628522107710167126825120972439435
{ok,{1224004145871988,#Ref<0.0.0.155>}}
(chord@localhost)4> simple_chord:join(10).
{ok,{1224004150210755,#Ref<0.0.0.160>}}
(chord@localhost)5> simple_chord:join(10).
NodeID 10 is already used!
ok
(chord@localhost)6> simple_chord:join().  
This node is 1247270923697235632412026269261998826960864476584
{ok,{1224004159687014,#Ref<0.0.0.170>}}
(chord@localhost)7> simple_chord:join().
This node is 1337922920366611218524069067930968016679729604000
{ok,{1224004160262937,#Ref<0.0.0.176>}}
(chord@localhost)8> simple_chord:join().
This node is 596705167923494375861517011386421960885518260396
{ok,{1224004160806612,#Ref<0.0.0.182>}}
(chord@localhost)9> simple_chord:join().
This node is 469390892228075407961803392604538394107634820265
{ok,{1224004161414830,#Ref<0.0.0.188>}}
(chord@localhost)10> simple_chord:store(10,"test"). 
{store,10,"test"}
(chord@localhost)11> simple_chord:lookup(10).
get [116,101,115,116]
ok
(chord@localhost)12> simple_chord:get_dict(10).
Dict [{10,[116,101,115,116]}]
ok
(chord@localhost)13> simple_chord:exit(10).
** exception error: bad argument
     in function  exit/2
        called as exit(node10,"user_exit!")
     in call from simple_chord:exit/1
(chord@localhost)14> simple_chord:get_dict(10).
** exception error: bad argument
     in function  simple_chord:get_dict/1
(chord@localhost)15> simple_chord:lookup(10).  
get [116,101,115,116]
ok
(chord@localhost)16> simple_chord:get_pair(0).
{Successor = 381978395276524628522107710167126825120972439435, Predecessor = 1337922920366611218524069067930968016679729604000}
ok
(chord@localhost)17> simple_chord:get_dict(381978395276524628522107710167126825120972439435).
Dict [{10,[116,101,115,116]}]
ok

どんな事をしているのか

何も考えずに書き始めたので、1つのシステムプロセスの中に、Erlangのプロセスを作って行く形に成ってしまいましたwww


ちなみに、通信は全てErlangのMessage Passingを使って行っています。


ErlangのMessage Passingは何も1つのプロセスの中のErlangプロセス間のやりとりだけでは無くて、LAN内におけるやりとり、さらにはglobalなやりとりも行う事が出来ます。そしてそれらを十分に隠蔽しているので、上のコードを核にちょいちょい書き換えるだけでグローバルで動く様に成ります。



流れとしては簡単で、

  1. simple_chord:start()で、node0を作る。このnode0はこの後の全てのnodeの一次接続先となって、適切なnodeIDを返したり、一時的なsuccessorとなる。
  2. simple_chord:join()では、ネットワーク上のnodeIDを新しく割り当ててもらってそれを使います。このnodeIDはネットワーク全体でuniqueになるようにしています。160bitsなのは、なんかこの後SHA1とかと対応付ける時にそれっぽくて良いからとかそんな感じでやりました。
  3. simple_chord:join(ID)では、ID決め打ちでネットワークに参加しようとします。
  4. その後はstoreしてlookupです。

ほんのちょっとの工夫としては、Key以上で一番近いIDにstoreしたり、あるノードが落ちる時はそのSuccessorにデータを渡したりしています。


まぁノードが抜け落ちた後にnode0に報告してないとかぽろぽろと抜け落ちてますが・・・。


こんなのクソ過ぎて何にもならんという感じではありますが、Erlangを使うとこんな感じで書けるんだーとかいう自分用。