WIP работа на resolver

This commit is contained in:
kirillius 2026-05-14 19:22:38 +03:00
parent c1d68c0bfa
commit c0d5fb5d1f
5 changed files with 357 additions and 67 deletions

View File

@ -40,6 +40,37 @@ public class Config {
@JSONProperty @JSONProperty
private volatile boolean cachingAS = true; private volatile boolean cachingAS = true;
@Getter
@Setter
@JSONProperty(required = false)
private volatile boolean cachingDomains = true;
@Getter
@Setter
@JSONArrayProperty(type = String.class, required = false)
private volatile List<String> domainResolvers = List.of("8.8.8.8", "77.88.8.8");
/**
* Time in minutes
*/
@Getter
@Setter
@JSONProperty(required = false)
private volatile int domainLookupInterval = 5;
@Getter
@Setter
@JSONProperty(required = false)
private volatile int autoLookupPrefixLength = 24;
/**
* Time in hours
*/
@Getter
@Setter
@JSONProperty(required = false)
private volatile int domainsTimeToLive = 48;
/** /**
* Update ASN prefixes every N hours * Update ASN prefixes every N hours
*/ */
@ -62,7 +93,6 @@ public class Config {
* Path where to store temporary data * Path where to store temporary data
*/ */
@Setter @Setter
@Getter @Getter
@JSONProperty @JSONProperty

View File

@ -12,7 +12,12 @@ import ru.kirillius.pf.sdn.core.Subscription.SubscriptionService;
import ru.kirillius.pf.sdn.core.Util.IPv4Util; import ru.kirillius.pf.sdn.core.Util.IPv4Util;
import ru.kirillius.utils.logging.SystemLogger; import ru.kirillius.utils.logging.SystemLogger;
import java.io.*; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -22,19 +27,21 @@ import java.util.concurrent.atomic.AtomicReference;
* Builds the effective set of network resources by combining subscriptions, caches, and filters. * Builds the effective set of network resources by combining subscriptions, caches, and filters.
*/ */
public class NetworkingService extends AppService { public class NetworkingService extends AppService {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final static String CTX = NetworkingService.class.getSimpleName(); private final static String CTX = NetworkingService.class.getSimpleName();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final File cacheFile; private final File domainCacheFile;
private final File asCacheFile;
private final EventListener<NetworkResourceBundle> resourceUpdateSubscription; private final EventListener<NetworkResourceBundle> resourceUpdateSubscription;
private final EventListener<ContextEventsHandler.ConfigChangeContext> configChangeSubscription; private final EventListener<ContextEventsHandler.ConfigChangeContext> configChangeSubscription;
private final AtomicReference<Future<?>> updateProcess = new AtomicReference<>();
private void rebuildInputs() { @Getter
inputResources.clear(); private final NetworkResourceBundle inputResources = new NetworkResourceBundle();
inputResources.add(context.getConfig().getCustomResources()); @Getter
inputResources.add(context.getServiceManager().getService(SubscriptionService.class).getOutputResources()); private final NetworkResourceBundle outputResources = new NetworkResourceBundle();
triggerUpdate(false); @Getter
} private final List<String> autoResolveDomains = new ArrayList<>();
private final Map<Integer, List<IPv4Subnet>> prefixCache = new ConcurrentHashMap<>();
private final Map<String, ResolverCacheEntry> domainCache = new ConcurrentHashMap<>();
/** /**
* Creates the networking service, wiring subscriptions and restoring cached state. * Creates the networking service, wiring subscriptions and restoring cached state.
@ -51,26 +58,40 @@ public class NetworkingService extends AppService {
NetworkingService.this.rebuildInputs(); NetworkingService.this.rebuildInputs();
} }
}); });
cacheFile = new File(context.getConfig().getCacheDirectory(), "as-cache.json"); domainCacheFile = new File(context.getConfig().getCacheDirectory(), "domain-cache.json");
if (cacheFile.exists() && context.getConfig().isCachingAS()) { asCacheFile = new File(context.getConfig().getCacheDirectory(), "as-cache.json");
if (asCacheFile.exists() && context.getConfig().isCachingAS()) {
SystemLogger.message("Loading as cache file", CTX); SystemLogger.message("Loading as cache file", CTX);
try (var is = new FileInputStream(cacheFile)) { try (var is = new FileInputStream(asCacheFile)) {
var json = new JSONObject(new JSONTokener(is)); var json = new JSONObject(new JSONTokener(is));
json.keySet().forEach(key -> { json.keySet().forEach(key -> {
var as = Integer.parseInt(key); var as = Integer.parseInt(key);
prefixCache.put(as, JSONUtility.deserializeCollection(json.getJSONArray(key), IPv4Subnet.class, null).toList()); prefixCache.put(as, JSONUtility.deserializeCollection(json.getJSONArray(key), IPv4Subnet.class, null).toList());
}); });
} catch (Exception e) { } catch (Exception e) {
SystemLogger.error("Failed to load as cache file " + cacheFile.getPath(), CTX, e); SystemLogger.error("Failed to load as cache file " + asCacheFile.getPath(), CTX, e);
}
}
if (domainCacheFile.exists() && context.getConfig().isCachingAS()) {
SystemLogger.message("Loading domain cache file", CTX);
try (var is = new FileInputStream(domainCacheFile)) {
var json = new JSONObject(new JSONTokener(is));
json.keySet().forEach(host -> {
domainCache.put(host, JSONUtility.deserializeStructure(json.getJSONObject(host), ResolverCacheEntry.class));
});
} catch (Exception e) {
SystemLogger.error("Failed to load domain cache file " + asCacheFile.getPath(), CTX, e);
} }
} }
} }
private final AtomicReference<Future<?>> updateProcess = new AtomicReference<>(); private void rebuildInputs() {
@Getter inputResources.clear();
private final NetworkResourceBundle inputResources = new NetworkResourceBundle(); inputResources.add(context.getConfig().getCustomResources());
@Getter inputResources.add(context.getServiceManager().getService(SubscriptionService.class).getOutputResources());
private final NetworkResourceBundle outputResources = new NetworkResourceBundle(); triggerUpdate(false);
}
/** /**
* Indicates whether an update job is currently executing. * Indicates whether an update job is currently executing.
@ -80,8 +101,6 @@ public class NetworkingService extends AppService {
return future != null && !future.isDone() && !future.isCancelled(); return future != null && !future.isDone() && !future.isCancelled();
} }
private final Map<Integer, List<IPv4Subnet>> prefixCache = new ConcurrentHashMap<>();
/** /**
* Schedules an update of network resources, optionally ignoring cached prefixes. * Schedules an update of network resources, optionally ignoring cached prefixes.
*/ */
@ -96,50 +115,6 @@ public class NetworkingService extends AppService {
SystemLogger.message("Update is started", CTX); SystemLogger.message("Update is started", CTX);
var config = context.getConfig(); var config = context.getConfig();
var filteredResources = config.getFilteredResources(); var filteredResources = config.getFilteredResources();
var asn = new ArrayList<>(inputResources.getASN());
asn.removeAll(filteredResources.getASN());
var asnToFetch = new ArrayList<>(asn);
if (!ignoreCache) {
asnToFetch.removeAll(prefixCache.keySet());
}
fetchPrefixes(asnToFetch);
if (config.isCachingAS()) {
try (var os = new FileOutputStream(cacheFile)) {
var json = new JSONObject();
prefixCache.forEach((key, asnList) -> {
json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null));
});
os.write(json.toString().getBytes());
} catch (IOException e) {
SystemLogger.error("Unable to write file " + cacheFile.getPath(), CTX, e);
}
}
var subnets = new HashSet<>(inputResources.getSubnets());
asn.forEach(n -> {
var cached = prefixCache.get(n);
if (cached == null) {
return;
}
subnets.addAll(cached);
SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX);
});
filteredResources.getSubnets().forEach(subnets::remove);
SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX);
var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage());
var unmerged = new AtomicInteger();
subnets.forEach(subnet -> {
if (!merged.getMergedSubnets().contains(subnet)) {
unmerged.getAndIncrement();
}
});
SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX);
var domains = new HashSet<>(inputResources.getDomains()); var domains = new HashSet<>(inputResources.getDomains());
filteredResources.getDomains().forEach(domains::remove); filteredResources.getDomains().forEach(domains::remove);
@ -157,6 +132,74 @@ public class NetworkingService extends AppService {
domains.removeAll(domainsToRemove); domains.removeAll(domainsToRemove);
var asn = new ArrayList<>(inputResources.getASN());
asn.removeAll(filteredResources.getASN());
var asnToFetch = new ArrayList<>(asn);
if (!ignoreCache) {
asnToFetch.removeAll(prefixCache.keySet());
}
fetchPrefixes(asnToFetch);
if (config.isCachingAS()) {
try (var os = new FileOutputStream(asCacheFile)) {
var json = new JSONObject();
prefixCache.forEach((key, asnList) -> {
json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null));
});
os.write(json.toString().getBytes());
} catch (IOException e) {
SystemLogger.error("Unable to write file " + asCacheFile.getPath(), CTX, e);
}
}
resolveDomains(autoResolveDomains);
if (config.isCachingDomains()) {
try (var os = new FileOutputStream(domainCacheFile)) {
var json = new JSONObject();
domainCache.forEach((key, entry) -> {
json.put(String.valueOf(key), JSONUtility.serializeStructure(domainCache.get(key)));
});
os.write(json.toString().getBytes());
} catch (IOException e) {
SystemLogger.error("Unable to write file " + domainCacheFile.getPath(), CTX, e);
}
}
var subnets = new HashSet<>(inputResources.getSubnets());
asn.forEach(n -> {
var cached = prefixCache.get(n);
if (cached == null) {
return;
}
subnets.addAll(cached);
SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX);
});
//добавляем отрезолвенные домены
domains.forEach(domain -> {
var entry = domainCache.get(domain);
if (entry != null) {
subnets.addAll(entry.getAddresses().keySet());
}
});
filteredResources.getSubnets().forEach(subnets::remove);
SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX);
var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage());
var unmerged = new AtomicInteger();
subnets.forEach(subnet -> {
if (!merged.getMergedSubnets().contains(subnet)) {
unmerged.getAndIncrement();
}
});
SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX);
outputResources.setASN(Collections.unmodifiableList(asn)); outputResources.setASN(Collections.unmodifiableList(asn));
outputResources.setSubnets(merged.getResult()); outputResources.setSubnets(merged.getResult());
outputResources.setDomains(domains.stream().toList()); outputResources.setDomains(domains.stream().toList());
@ -174,6 +217,42 @@ public class NetworkingService extends AppService {
})); }));
} }
private void resolveDomains(List<String> domains) {
var resolvedSubnets = new ArrayList<IPv4Subnet>();
var resolver = context.getServiceManager().getService(ResolverService.class);
for (var domain : domains) {
var task = resolver.resolve(domain);
while (!task.isDone() && !task.isCancelled()) {
Thread.yield();
}
try {
var subnets = task.get();
var entry = domainCache.get(domain);
var addresses = entry.getAddresses();
entry.setLastUpdate(Instant.now());
subnets.forEach(subnet -> addresses.put(subnet, Instant.now()));
resolvedSubnets.addAll(domainCache.get(domain).getAddresses().keySet());
} catch (InterruptedException | ExecutionException e) {
SystemLogger.error("Error happened while resolving domain " + domain, CTX, e);
}
}
//remove old entries
for (var domain : domainCache.keySet()) {
var entry = domainCache.get(domain);
var addresses = entry.getAddresses();
for (var subnet : addresses.keySet()) {
var time = addresses.get(subnet);
if (time.isBefore(Instant.now().minus(context.getConfig().getDomainsTimeToLive(), ChronoUnit.HOURS))) {
addresses.remove(subnet);
}
}
if (addresses.isEmpty()) {
domainCache.remove(domain);
}
}
}
/** /**
* Fetches prefixes for the given autonomous systems and stores them in the cache. * Fetches prefixes for the given autonomous systems and stores them in the cache.
*/ */

View File

@ -0,0 +1,22 @@
package ru.kirillius.pf.sdn.core.Networking;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import ru.kirillius.json.JSONMapProperty;
import ru.kirillius.json.JSONProperty;
import ru.kirillius.json.JSONSerializable;
import java.time.Instant;
import java.util.Map;
@JSONSerializable
@Getter
@Setter
@NoArgsConstructor
public class ResolverCacheEntry {
@JSONProperty
private Instant lastUpdate;
@JSONMapProperty(keyType = IPv4Subnet.class, valueType = Instant.class)
private Map<IPv4Subnet, Instant> addresses;
}

View File

@ -0,0 +1,39 @@
package ru.kirillius.pf.sdn.core.Networking;
import ru.kirillius.pf.sdn.core.AppService;
import ru.kirillius.pf.sdn.core.Context;
import ru.kirillius.pf.sdn.core.Util.DomainUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ResolverService extends AppService {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final static String CTX = ResolverService.class.getSimpleName();
public Future<List<IPv4Subnet>> resolve(String host) {
return executor.submit(() -> {
var resolved = new ArrayList<IPv4Subnet>();
for (var domainResolver : context.getConfig().getDomainResolvers()) {
DomainUtil.lookup(host, domainResolver).stream().map(addr -> new IPv4Subnet(addr, 32)).forEach(resolved::add);
}
return resolved;
});
}
public ResolverService(Context context) {
super(context);
}
/**
* Removes event subscriptions and shuts down the executor.
*/
@Override
public void close() throws IOException {
executor.shutdown();
}
}

View File

@ -0,0 +1,120 @@
package ru.kirillius.pf.sdn.core.Networking;
import lombok.Getter;
import org.json.JSONObject;
import org.json.JSONTokener;
import ru.kirillius.json.JSONUtility;
import ru.kirillius.pf.sdn.core.AppService;
import ru.kirillius.pf.sdn.core.Context;
import ru.kirillius.pf.sdn.core.Util.DomainUtil;
import ru.kirillius.utils.logging.SystemLogger;
import java.io.*;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
public class ResolverServiceObsolete extends AppService {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final static String CTX = ResolverServiceObsolete.class.getSimpleName();
private final File cacheFile;
private final Map<String, ResolverCacheEntry> cacheEntries = new ConcurrentHashMap<>();
private final Future<?> backgroundWorker;
public void saveCache() throws IOException {
try (var writer = new BufferedWriter(new FileWriter(cacheFile))) {
writer.write(JSONUtility.serializeMap(cacheEntries, String.class, ResolverCacheEntry.class, null, null).toString());
}
}
public void maintainCache() {
cacheEntries.keySet().forEach(host -> {
var entry = cacheEntries.get(host);
var lastSeen = entry.getLastSeen();
for (var subnet : lastSeen.keySet()) {
if (lastSeen.get(subnet).isBefore(Instant.now().minus(context.getConfig().getDomainsTimeToLive(), ChronoUnit.HOURS))) {
lastSeen.remove(subnet);
}
}
});
}
public Future<List<IPv4Subnet>> resolve(String host, boolean useCache) {
return executor.submit(() -> {
if (useCache && cacheEntries.containsKey(host)) {
var cached = cacheEntries.get(host);
if (cached.getLastUpdate().isAfter(Instant.now().minus(context.getConfig().getDomainLookupInterval(), ChronoUnit.MINUTES))) {
return cached.getAddresses();
}
}
var resolved = new HashSet<IPv4Subnet>();
for (var domainResolver : context.getConfig().getDomainResolvers()) {
DomainUtil.lookup(host, domainResolver).stream().map(addr -> new IPv4Subnet(addr, 32)).forEach(resolved::add);
}
if (useCache) {
if (!cacheEntries.containsKey(host)) {
cacheEntries.put(host, new ResolverCacheEntry());
}
var cached = cacheEntries.get(host);
cached.setLastUpdate(Instant.now());
cached.getAddresses().addAll(resolved);
var lastSeen = cached.getLastSeen();
resolved.forEach(net -> lastSeen.put(net, Instant.now()));
resolved.addAll(cached.getAddresses());
}
return resolved.stream().toList();
});
}
/**
* Creates the networking service, wiring subscriptions and restoring cached state.
*/
public ResolverServiceObsolete(Context context) {
super(context);
cacheFile = new File(context.getConfig().getCacheDirectory(), "resolver-cache.json");
if (cacheFile.exists() && context.getConfig().isCachingAS()) {
SystemLogger.message("Loading resolver cache file", CTX);
try (var is = new FileInputStream(cacheFile)) {
var json = new JSONObject(new JSONTokener(is));
var deserialized = JSONUtility.deserializeMap(json, String.class, ResolverCacheEntry.class, null, null);
cacheEntries.putAll(deserialized);
} catch (Exception e) {
SystemLogger.error("Failed to load resolver cache file " + cacheFile.getPath(), CTX, e);
}
}
backgroundWorker = executor.submit(this::autoResolve);
}
@Getter
private final List<String> autoLookupHosts = new CopyOnWriteArrayList<>();
private void autoResolve() {
try {
Thread.sleep(context.getConfig().getDomainLookupInterval());
autoLookupHosts.stream().toList().forEach(host -> {
//TODO продумать обновление
});
} catch (InterruptedException e) {
return;
}
}
/**
* Removes event subscriptions and shuts down the executor.
*/
@Override
public void close() throws IOException {
backgroundWorker.cancel(true);
executor.shutdown();
}
}