Compare commits

...

4 Commits

5 changed files with 243 additions and 88 deletions

View File

@ -8,6 +8,7 @@ import ru.kirillius.pf.sdn.External.API.Components.TDNS;
import ru.kirillius.pf.sdn.External.API.GitSubscription;
import ru.kirillius.pf.sdn.External.API.HEInfoProvider;
import ru.kirillius.pf.sdn.External.API.LocalFilesystemSubscription;
import ru.kirillius.pf.sdn.External.API.RIPEInfoProvider;
import ru.kirillius.pf.sdn.core.*;
import ru.kirillius.pf.sdn.core.Auth.AuthManager;
import ru.kirillius.pf.sdn.core.Auth.TokenService;
@ -80,7 +81,10 @@ public class App implements Context, Closeable {
*/
private ServiceManager loadServiceManager() {
var manager = new ServiceManager(this, List.of(AuthManager.class, ComponentHandlerService.class, TokenService.class, AppUpdateService.class, BGPInfoService.class, NetworkingService.class, SubscriptionService.class, ResourceUpdateService.class, WebService.class));
manager.getService(BGPInfoService.class).setProvider(new HEInfoProvider());
var infoService = manager.getService(BGPInfoService.class);
infoService.addProvider(new HEInfoProvider());
infoService.addProvider(new RIPEInfoProvider());
return manager;
}
@ -123,7 +127,6 @@ public class App implements Context, Closeable {
return;
}
var versions = Arrays.stream(listedFiles)
.filter(file -> file.getName().endsWith(AppUpdateService.EXTENSION))
.collect(Collectors.toMap(file -> file.getName().replace(Pattern.quote(AppUpdateService.EXTENSION), ""), file -> file));
@ -169,7 +172,6 @@ public class App implements Context, Closeable {
}
}
/**
* Requests the application to exit, optionally restarting.
*/

View File

@ -13,6 +13,7 @@ import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
@ -33,7 +34,9 @@ public class HEInfoProvider implements ASInfoProvider {
@SneakyThrows
public List<IPv4Subnet> getPrefixes(int as) {
try (var client = HttpClient.newHttpClient()) {
var request = HttpRequest.newBuilder().uri(URI.create("https://bgp.he.net/super-lg/report/api/v1/prefixes/originated/" + as)).header("Accept", "application/json").GET().build();
var request = HttpRequest.newBuilder()
.uri(URI.create("https://bgp.he.net/super-lg/report/api/v1/prefixes/originated/" + as))
.header("Accept", "application/json").timeout(Duration.ofMinutes(2)).GET().build();
var response = client.send(request, HttpResponse.BodyHandlers.ofInputStream());
if (response.statusCode() == 200) {
try (var inputStream = response.body()) {

View File

@ -0,0 +1,107 @@
package ru.kirillius.pf.sdn.External.API;
import lombok.SneakyThrows;
import org.jetbrains.annotations.NotNull;
import org.json.JSONObject;
import org.json.JSONTokener;
import ru.kirillius.pf.sdn.core.Networking.ASInfoProvider;
import ru.kirillius.pf.sdn.core.Networking.IPv4Subnet;
import ru.kirillius.utils.logging.SystemLogger;
import java.io.InputStream;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Retrieves ASN prefix information from RIPE public API.
*/
public class RIPEInfoProvider implements ASInfoProvider {
/**
* Fetches IPv4 prefixes announced by the specified autonomous system.
*/
@Override
@SneakyThrows
public List<IPv4Subnet> getPrefixes(int as) {
try (var client = HttpClient.newHttpClient()) {
var request = HttpRequest.newBuilder()
.uri(URI.create("https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS" + as))
.header("Accept", "application/json").timeout(Duration.ofMinutes(2)).GET().build();
var response = client.send(request, HttpResponse.BodyHandlers.ofInputStream());
if (response.statusCode() == 200) {
try (var inputStream = response.body()) {
return getIPv4Subnets(inputStream);
}
} else {
SystemLogger.error("Unable to get info about AS" + as, CTX);
return Collections.emptyList();
}
}
}
@Override
public IPQueryInfo queryAddress(String address) {
var request = HttpRequest.newBuilder()
.uri(URI.create("https://stat.ripe.net/data/network-info/data.json?resource=" + address))
.header("Accept", "application/json")
.GET()
.build();
try (var client = HttpClient.newHttpClient()) {
var response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
SystemLogger.error("Unable to get info about IP " + address + ", status " + response.statusCode(), CTX);
return emptyIPInfo();
}
var json = new JSONObject(new JSONTokener(response.body()));
var data = json.getJSONObject("data");
var asns = data.getJSONArray("asns");
var prefix = data.getString("prefix");
var asnList = new ArrayList<Integer>();
asns.forEach(obj -> asnList.add(Integer.parseInt(obj.toString())));
return IPQueryInfo.builder().prefixes(Collections.singletonList(new IPv4Subnet(prefix))).ASN(asnList).build();
} catch (Exception e) {
SystemLogger.error("Failed to query info about IP " + address + ": " + e.getMessage(), CTX);
return emptyIPInfo();
}
}
/**
* Parses IPv4 prefix entries from the Hurricane Electric API response.
*/
private static @NotNull ArrayList<IPv4Subnet> getIPv4Subnets(InputStream inputStream) {
var json = new JSONObject(new JSONTokener(inputStream));
var data = json.getJSONObject("data");
var array = data.getJSONArray("prefixes");
var list = new ArrayList<IPv4Subnet>();
array.forEach(obj -> {
var jo = (JSONObject) obj;
var prefix = jo.getString("prefix");
if (prefix.indexOf('.') == -1) {
return;
}
list.add(new IPv4Subnet(prefix));
});
return list;
}
private final static String CTX = RIPEInfoProvider.class.getSimpleName();
private static IPQueryInfo emptyIPInfo() {
return IPQueryInfo.builder()
.ASN(Collections.emptyList())
.prefixes(Collections.emptyList())
.build();
}
}

View File

@ -1,26 +1,59 @@
package ru.kirillius.pf.sdn.core.Networking;
import lombok.Getter;
import lombok.Setter;
import ru.kirillius.pf.sdn.core.AppService;
import ru.kirillius.pf.sdn.core.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* Delegates asynchronous retrieval of BGP prefix data through a configurable provider.
*/
public class BGPInfoService extends AppService {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void addProvider(ASInfoProvider provider) {
synchronized (providers) {
providers.add(provider);
}
}
public void fallbackNextProvider() {
synchronized (providers) {
if (provider == null) {
provider = providers.getFirst();
return;
}
for (var i = 0; i < providers.size(); i++) {
var provider = providers.get(i);
if (provider == this.provider) {
var nextIndex = i + 1;
if (nextIndex >= providers.size()) {
nextIndex = 0;
}
this.provider = providers.get(nextIndex);
return;
}
}
}
}
private final List<ASInfoProvider> providers = new ArrayList<>();
public ASInfoProvider getProvider() {
if (provider == null) {
if (providers.isEmpty()) {
return null;
}
fallbackNextProvider();
}
return provider;
}
@Getter
@Setter
private ASInfoProvider provider;
/**
@ -30,16 +63,15 @@ public class BGPInfoService extends AppService {
super(context);
}
/**
* Submits a request to obtain prefixes for the provided autonomous system number.
*/
public Future<List<IPv4Subnet>> getPrefixes(int as) {
return executor.submit(() -> provider.getPrefixes(as));
return executor.submit(() -> getProvider().getPrefixes(as));
}
public Future<ASInfoProvider.IPQueryInfo> getAddressInfo(String address) {
return executor.submit(() -> provider.queryAddress(address));
return executor.submit(() -> getProvider().queryAddress(address));
}
/**
@ -50,5 +82,4 @@ public class BGPInfoService extends AppService {
executor.shutdown();
}
}

View File

@ -92,108 +92,120 @@ public class NetworkingService extends AppService {
SystemLogger.message("Updating network manager", CTX);
updateProcess.set(executor.submit(() -> {
SystemLogger.message("Update is started", CTX);
var config = context.getConfig();
var filteredResources = config.getFilteredResources();
var asn = new ArrayList<>(inputResources.getASN());
asn.removeAll(filteredResources.getASN());
try {
SystemLogger.message("Update is started", CTX);
var config = context.getConfig();
var filteredResources = config.getFilteredResources();
var asn = new ArrayList<>(inputResources.getASN());
asn.removeAll(filteredResources.getASN());
var asnToFetch = new ArrayList<>(asn);
if (!ignoreCache) {
asnToFetch.removeAll(prefixCache.keySet());
}
fetchPrefixes(asnToFetch);
if (config.isCachingAS()) {
try (var os = new FileOutputStream(cacheFile)) {
var json = new JSONObject();
prefixCache.forEach((key, asnList) -> {
json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null));
});
os.write(json.toString().getBytes());
} catch (IOException e) {
SystemLogger.error("Unable to write file " + cacheFile.getPath(), CTX, e);
var asnToFetch = new ArrayList<>(asn);
if (!ignoreCache) {
asnToFetch.removeAll(prefixCache.keySet());
}
}
var subnets = new HashSet<>(inputResources.getSubnets());
asn.forEach(n -> {
var cached = prefixCache.get(n);
if (cached == null) {
return;
}
subnets.addAll(cached);
SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX);
});
filteredResources.getSubnets().forEach(subnets::remove);
fetchPrefixes(asnToFetch);
SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX);
var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage());
var unmerged = new AtomicInteger();
subnets.forEach(subnet -> {
if (!merged.getMergedSubnets().contains(subnet)) {
unmerged.getAndIncrement();
}
});
SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX);
var domains = new HashSet<>(inputResources.getDomains());
filteredResources.getDomains().forEach(domains::remove);
//check domain overlaps
var domainsToRemove = new HashSet<String>();
for (var domainToMatch : domains) {
var pattern = "." + domainToMatch;
for (var domain : domains) {
if (domain.endsWith(pattern)) {
domainsToRemove.add(domain);
if (config.isCachingAS()) {
try (var os = new FileOutputStream(cacheFile)) {
var json = new JSONObject();
prefixCache.forEach((key, asnList) -> {
json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null));
});
os.write(json.toString().getBytes());
} catch (IOException e) {
SystemLogger.error("Unable to write file " + cacheFile.getPath(), CTX, e);
}
}
}
domains.removeAll(domainsToRemove);
var subnets = new HashSet<>(inputResources.getSubnets());
asn.forEach(n -> {
var cached = prefixCache.get(n);
if (cached == null) {
return;
}
subnets.addAll(cached);
SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX);
});
filteredResources.getSubnets().forEach(subnets::remove);
outputResources.setASN(Collections.unmodifiableList(asn));
outputResources.setSubnets(merged.getResult());
outputResources.setDomains(domains.stream().toList());
SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX);
SystemLogger.message("Update is complete", CTX);
var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage());
var unmerged = new AtomicInteger();
subnets.forEach(subnet -> {
if (!merged.getMergedSubnets().contains(subnet)) {
unmerged.getAndIncrement();
}
});
try {
context.getEventsHandler().getNetworkManagerUpdateEvent().invoke(outputResources);
SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX);
var domains = new HashSet<>(inputResources.getDomains());
filteredResources.getDomains().forEach(domains::remove);
//check domain overlaps
var domainsToRemove = new HashSet<String>();
for (var domainToMatch : domains) {
var pattern = "." + domainToMatch;
for (var domain : domains) {
if (domain.endsWith(pattern)) {
domainsToRemove.add(domain);
}
}
}
domains.removeAll(domainsToRemove);
outputResources.setASN(Collections.unmodifiableList(asn));
outputResources.setSubnets(merged.getResult());
outputResources.setDomains(domains.stream().toList());
SystemLogger.message("Update is complete", CTX);
try {
context.getEventsHandler().getNetworkManagerUpdateEvent().invoke(outputResources);
} catch (Exception e) {
SystemLogger.error("Unable to invoke update event", CTX, e);
}
} catch (Exception e) {
SystemLogger.error("Unable to invoke update event", CTX, e);
SystemLogger.error("Something went wrong on update", CTX, e);
}
}));
}
/**
* Fetches prefixes for the given autonomous systems and stores them in the cache.
*/
private void fetchPrefixes(List<Integer> systems) {
var service = context.getServiceManager().getService(BGPInfoService.class);
systems.forEach(as -> {
SystemLogger.message("Fetching AS" + as + " prefixes...", CTX);
var future = service.getPrefixes(as);
var currentProvider = service.getProvider();
var done = false;
do {
SystemLogger.message("Fetching AS" + as + " prefixes...", CTX);
var future = service.getPrefixes(as);
while (!future.isDone() && !future.isCancelled()) {
Thread.yield();
}
while (!future.isDone() && !future.isCancelled()) {
Thread.yield();
}
try {
var iPv4Subnets = future.get();
prefixCache.put(as, iPv4Subnets);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
try {
var iPv4Subnets = future.get();
prefixCache.put(as, iPv4Subnets);
done = true;
break;
} catch (InterruptedException | ExecutionException e) {
service.fallbackNextProvider();
SystemLogger.error("Error happened while fetching AS" + as + " prefixes. Trying to use fallback BGP info provider:" + service.getProvider().getClass().getSimpleName(), CTX, e);
}
} while (service.getProvider() != currentProvider);
if (!done) {
SystemLogger.error("Unable to fetch AS" + as + " prefixes from all providers. Trying to use cache...", CTX);
}
});
}
/**
* Removes event subscriptions and shuts down the executor.
*/