src/main/Main.java
Стартовый класс. Создаются экземпляры системы сообщений, фронтенда, и сервиса аккаунтов. Фронтенд и аккаунт сервисов являются потоками. Оба принимают в конструкторе ссылку на систему обмена сообщениями. Во фронтенде также есть обработчик запросов для Jetty сервера.
package main; import org.eclipse.jetty.server.Server; public class Main { public static void main(String[] args) throws Exception { MessageSystem ms = new MessageSystem(); Frontend frontend = new Frontend(ms); AccountService accountService = new AccountService(ms); (new Thread(frontend)).start(); (new Thread(accountService)).start(); Server server = new Server(8080); server.setHandler(frontend); server.start(); server.join(); } }
src/main/MessageSystem.java
Система обмена сообщениями. Содержит в себе карту потокобезопасных очередей сообщений для каждого имеющегося адреса и адресный сервис. Метод addService добавляет абонента в адресный сервис и карту очередей сообщений. Метод sendMessage отправляет сообщение. Метод execForAbonent получает очередь сообщений для заданного абонента и выполняет их в потоке абонента, т.к. вызывается из потока абонента.
package main; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class MessageSystem { private Map<Address, ConcurrentLinkedQueue<Msg>> messages = new HashMap<Address, ConcurrentLinkedQueue<Msg>>(); private AddressService addressService = new AddressService(); public void addService(Abonent abonent){ addressService.setAddress(abonent); messages.put(abonent.getAddress(), new ConcurrentLinkedQueue<Msg>()); } public void sendMessage(Msg message){ Queue<Msg> messageQueue = messages.get(message.getTo()); messageQueue.add(message); } public void execForAbonent(Abonent abonent) { Queue<Msg> messageQueue = messages.get(abonent.getAddress()); if(messageQueue == null){ return; } while(!messageQueue.isEmpty()){ Msg message = messageQueue.poll(); message.exec(abonent); } } public AddressService getAddressService(){ return addressService; } }
src/main/AddressService.java
Этот сервис хранит адреса абонентов. Например, есть класс, который реализует интерфейс абонента, т.е. обладает адресом. Можно этот класс записать в этот сервис вызвав функцию setAddress, а потом получать адрес этого класса через getAddress. У нас в системе обмена сообщениями участвуют только два потока/класса фронтенд и сервсис аккаунтов, поэтому запись адреса с именем класса в качестве ключа вполне подходит.
package main; import java.util.HashMap; import java.util.Map; public class AddressService { private Map<Class<?>, Address> addresses = new HashMap<Class<?>, Address>(); public Address getAddress(Class<?> abonentClass) { return addresses.get(abonentClass); } public void setAddress(Abonent abonent) { addresses.put(abonent.getClass(), abonent.getAddress()); } }
src/main/Address.java
Адрес по сути представляет собой уникальный целочисленный идентификатор. Т.к. получение идентификатора происходит путем потокобезопасного инкремента, то он всегда будет уникальным даже если создание экземпляров происходит параллельно в нескольких потоках.
package main; import java.util.concurrent.atomic.AtomicInteger; public class Address { static private AtomicInteger abonentIdCreator = new AtomicInteger(); final private int abonentId; public Address(){ this.abonentId = abonentIdCreator.incrementAndGet(); } public int hashCode(){ return abonentId; } }
src/main/Abonent.java
Абонент это интерфейс. Нечто что может обладать адресом должно реализовывать этот интерфейс.
package main; public interface Abonent { Address getAddress(); }
src/main/Msg.java
Сообщение содержит в себе адрес отправителя и адрес получателя. Сообщение содержит в себе абстрактный метод exec, который должен проводить обработку этого сообщения или другими словами выполнять это сообщение.
package main; public abstract class Msg { private Address from; private Address to; public Msg(Address from, Address to){ this.from = from; this.to = to; } protected Address getFrom(){ return from; } protected Address getTo(){ return to; } abstract void exec(Abonent abonent); }
src/main/Frontend.java
Каждый 10 миллисекунд фронтенд просит систему обмена сообщениями выполнить сообщения пришедшие для фронтенда. Выполнение происходит в потоке фронтенда.package main; import java.io.IOException; import java.util.HashMap; import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; public class Frontend extends AbstractHandler implements Abonent, Runnable{ private static String GAME_NAME = "/test/"; private Address address; private MessageSystem ms; private Map<String, Integer> nameToId = new HashMap<String, Integer>(); public Frontend(MessageSystem ms){ this.ms = ms; this.address = new Address(); ms.addService(this); } public void run(){ while(true){ ms.execForAbonent(this); TimeHelper.sleep(10); } } public Address getAddress() { return address; } public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { response.setContentType("text/html;charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); baseRequest.setHandled(true); if(!target.equals(GAME_NAME)) return; String name = "Tully"; Integer id = nameToId.get(name); if(id != null){ response.getWriter().println("<h1>User name: " + name + " Id: " + id +"</h1>"); } else { response.getWriter().println("<h1>Wait for authorization</h1>"); Address addressAS = ms.getAddressService().getAddress(AccountService.class); ms.sendMessage(new MsgGetUserId(getAddress(), addressAS, name)); } } public void setId(String name, Integer id){ nameToId.put(name, id); } }
src/main/AccountService.java
Каждые 10 миллисекунд сервис аккаунтов просит систему обмена сообщениями обработать пришедшие для него сообщения.
package main; import java.util.HashMap; import java.util.Map; public class AccountService implements Abonent, Runnable{ private Address address; private MessageSystem ms; private Map<String, Integer> fakeAccounter = new HashMap<String, Integer>(); public AccountService(MessageSystem ms){ this.ms = ms; this.address = new Address(); ms.addService(this); this.fakeAccounter.put("Tully", 1); this.fakeAccounter.put("Sully", 2); } public void run(){ while(true){ ms.execForAbonent(this); TimeHelper.sleep(10); } } public Integer getUserId(String name){ TimeHelper.sleep(5000); return fakeAccounter.get(name); } public Address getAddress() { return address; } public MessageSystem getMessageSystem(){ return ms; } }
src/main/TimeHelper.java
Приостанавливает поток на заданное время в миллисекундах.package main; public class TimeHelper { public static void sleep(int period){ try{ Thread.sleep(period); } catch (InterruptedException e) { e.printStackTrace(); } } public static void sleep(){ try{ Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }
src/main/MsgGetUserId.java
Получает у сервиса аккаунтов идентификатор пользователя по его имени. Затем полученным значением обновляет идентификатор пользователя на фронтенде.
package main; public class MsgGetUserId extends MsgToAS { private String name; public MsgGetUserId(Address from, Address to, String name) { super(from, to); this.name= name; } void exec(AccountService accountService) { Integer id = accountService.getUserId(name); accountService.getMessageSystem().sendMessage(new MsgUpdateUserId(getTo(), getFrom(), name, id)); } }
src/main/MsgToAS.java
Абстрактное сообщение для сервиса аккаунтов.package main; public abstract class MsgToAS extends Msg{ public MsgToAS(Address from, Address to) { super(from, to); } void exec(Abonent abonent) { if(abonent instanceof AccountService){ exec((AccountService) abonent); } } abstract void exec(AccountService accountService); }
src/main/MsgToFrontend.java
Абстрактно сообщение для фронтенда.package main; public abstract class MsgToFrontend extends Msg{ public MsgToFrontend(Address from, Address to) { super(from, to); } public void exec(Abonent abonent) { if(abonent instanceof Frontend){ exec((Frontend)abonent); } } abstract void exec(Frontend frontend); }
src/main/MsgUpdateUserId.java
Это сообщение предназначено для обновления идентификатора пользователя на фронтенде.
package main; public class MsgUpdateUserId extends MsgToFrontend { private String name; private Integer id; public MsgUpdateUserId(Address from, Address to, String name, Integer id) { super(from, to); this.name = name; this.id = id; } void exec(Frontend frontend) { frontend.setId(name, id); } }
--
https://github.com/vitaly-chibrikov/tp_java_2013_02/tree/master/lecture3