From c0d5fb5d1fd2ff0f482e702974f8c5117c7948ff Mon Sep 17 00:00:00 2001 From: kirillius Date: Thu, 14 May 2026 19:22:38 +0300 Subject: [PATCH] =?UTF-8?q?WIP=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=B0=20?= =?UTF-8?q?=D0=BD=D0=B0=20resolver?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/ru/kirillius/pf/sdn/core/Config.java | 32 ++- .../core/Networking/NetworkingService.java | 211 ++++++++++++------ .../core/Networking/ResolverCacheEntry.java | 22 ++ .../sdn/core/Networking/ResolverService.java | 39 ++++ .../Networking/ResolverServiceObsolete.java | 120 ++++++++++ 5 files changed, 357 insertions(+), 67 deletions(-) create mode 100644 core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverCacheEntry.java create mode 100644 core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverService.java create mode 100644 core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverServiceObsolete.java diff --git a/core/src/main/java/ru/kirillius/pf/sdn/core/Config.java b/core/src/main/java/ru/kirillius/pf/sdn/core/Config.java index c4af32a..5ca15fc 100644 --- a/core/src/main/java/ru/kirillius/pf/sdn/core/Config.java +++ b/core/src/main/java/ru/kirillius/pf/sdn/core/Config.java @@ -40,6 +40,37 @@ public class Config { @JSONProperty private volatile boolean cachingAS = true; + @Getter + @Setter + @JSONProperty(required = false) + private volatile boolean cachingDomains = true; + + @Getter + @Setter + @JSONArrayProperty(type = String.class, required = false) + private volatile List domainResolvers = List.of("8.8.8.8", "77.88.8.8"); + + /** + * Time in minutes + */ + @Getter + @Setter + @JSONProperty(required = false) + private volatile int domainLookupInterval = 5; + + @Getter + @Setter + @JSONProperty(required = false) + private volatile int autoLookupPrefixLength = 24; + + /** + * Time in hours + */ + @Getter + @Setter + @JSONProperty(required = false) + private volatile int domainsTimeToLive = 48; + /** * Update ASN prefixes every N hours */ @@ -62,7 +93,6 @@ public class Config { * Path where to store temporary data */ - @Setter @Getter @JSONProperty diff --git a/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/NetworkingService.java b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/NetworkingService.java index 0af3ea2..b06a1d3 100644 --- a/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/NetworkingService.java +++ b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/NetworkingService.java @@ -12,7 +12,12 @@ import ru.kirillius.pf.sdn.core.Subscription.SubscriptionService; import ru.kirillius.pf.sdn.core.Util.IPv4Util; import ru.kirillius.utils.logging.SystemLogger; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -22,19 +27,21 @@ import java.util.concurrent.atomic.AtomicReference; * Builds the effective set of network resources by combining subscriptions, caches, and filters. */ public class NetworkingService extends AppService { - private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final static String CTX = NetworkingService.class.getSimpleName(); - - private final File cacheFile; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final File domainCacheFile; + private final File asCacheFile; private final EventListener resourceUpdateSubscription; private final EventListener configChangeSubscription; - - private void rebuildInputs() { - inputResources.clear(); - inputResources.add(context.getConfig().getCustomResources()); - inputResources.add(context.getServiceManager().getService(SubscriptionService.class).getOutputResources()); - triggerUpdate(false); - } + private final AtomicReference> updateProcess = new AtomicReference<>(); + @Getter + private final NetworkResourceBundle inputResources = new NetworkResourceBundle(); + @Getter + private final NetworkResourceBundle outputResources = new NetworkResourceBundle(); + @Getter + private final List autoResolveDomains = new ArrayList<>(); + private final Map> prefixCache = new ConcurrentHashMap<>(); + private final Map domainCache = new ConcurrentHashMap<>(); /** * Creates the networking service, wiring subscriptions and restoring cached state. @@ -51,26 +58,40 @@ public class NetworkingService extends AppService { NetworkingService.this.rebuildInputs(); } }); - cacheFile = new File(context.getConfig().getCacheDirectory(), "as-cache.json"); - if (cacheFile.exists() && context.getConfig().isCachingAS()) { + domainCacheFile = new File(context.getConfig().getCacheDirectory(), "domain-cache.json"); + asCacheFile = new File(context.getConfig().getCacheDirectory(), "as-cache.json"); + if (asCacheFile.exists() && context.getConfig().isCachingAS()) { SystemLogger.message("Loading as cache file", CTX); - try (var is = new FileInputStream(cacheFile)) { + try (var is = new FileInputStream(asCacheFile)) { var json = new JSONObject(new JSONTokener(is)); json.keySet().forEach(key -> { var as = Integer.parseInt(key); prefixCache.put(as, JSONUtility.deserializeCollection(json.getJSONArray(key), IPv4Subnet.class, null).toList()); }); } catch (Exception e) { - SystemLogger.error("Failed to load as cache file " + cacheFile.getPath(), CTX, e); + SystemLogger.error("Failed to load as cache file " + asCacheFile.getPath(), CTX, e); + } + } + + if (domainCacheFile.exists() && context.getConfig().isCachingAS()) { + SystemLogger.message("Loading domain cache file", CTX); + try (var is = new FileInputStream(domainCacheFile)) { + var json = new JSONObject(new JSONTokener(is)); + json.keySet().forEach(host -> { + domainCache.put(host, JSONUtility.deserializeStructure(json.getJSONObject(host), ResolverCacheEntry.class)); + }); + } catch (Exception e) { + SystemLogger.error("Failed to load domain cache file " + asCacheFile.getPath(), CTX, e); } } } - private final AtomicReference> updateProcess = new AtomicReference<>(); - @Getter - private final NetworkResourceBundle inputResources = new NetworkResourceBundle(); - @Getter - private final NetworkResourceBundle outputResources = new NetworkResourceBundle(); + private void rebuildInputs() { + inputResources.clear(); + inputResources.add(context.getConfig().getCustomResources()); + inputResources.add(context.getServiceManager().getService(SubscriptionService.class).getOutputResources()); + triggerUpdate(false); + } /** * Indicates whether an update job is currently executing. @@ -80,8 +101,6 @@ public class NetworkingService extends AppService { return future != null && !future.isDone() && !future.isCancelled(); } - private final Map> prefixCache = new ConcurrentHashMap<>(); - /** * Schedules an update of network resources, optionally ignoring cached prefixes. */ @@ -96,50 +115,6 @@ public class NetworkingService extends AppService { SystemLogger.message("Update is started", CTX); var config = context.getConfig(); var filteredResources = config.getFilteredResources(); - var asn = new ArrayList<>(inputResources.getASN()); - asn.removeAll(filteredResources.getASN()); - - var asnToFetch = new ArrayList<>(asn); - if (!ignoreCache) { - asnToFetch.removeAll(prefixCache.keySet()); - } - - fetchPrefixes(asnToFetch); - - if (config.isCachingAS()) { - try (var os = new FileOutputStream(cacheFile)) { - var json = new JSONObject(); - prefixCache.forEach((key, asnList) -> { - json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null)); - }); - os.write(json.toString().getBytes()); - } catch (IOException e) { - SystemLogger.error("Unable to write file " + cacheFile.getPath(), CTX, e); - } - } - - var subnets = new HashSet<>(inputResources.getSubnets()); - asn.forEach(n -> { - var cached = prefixCache.get(n); - if (cached == null) { - return; - } - subnets.addAll(cached); - SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX); - }); - filteredResources.getSubnets().forEach(subnets::remove); - - SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX); - - var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage()); - var unmerged = new AtomicInteger(); - subnets.forEach(subnet -> { - if (!merged.getMergedSubnets().contains(subnet)) { - unmerged.getAndIncrement(); - } - }); - - SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX); var domains = new HashSet<>(inputResources.getDomains()); filteredResources.getDomains().forEach(domains::remove); @@ -157,6 +132,74 @@ public class NetworkingService extends AppService { domains.removeAll(domainsToRemove); + var asn = new ArrayList<>(inputResources.getASN()); + asn.removeAll(filteredResources.getASN()); + + var asnToFetch = new ArrayList<>(asn); + if (!ignoreCache) { + asnToFetch.removeAll(prefixCache.keySet()); + } + + fetchPrefixes(asnToFetch); + + if (config.isCachingAS()) { + try (var os = new FileOutputStream(asCacheFile)) { + var json = new JSONObject(); + prefixCache.forEach((key, asnList) -> { + json.put(String.valueOf(key), JSONUtility.serializeCollection(asnList, IPv4Subnet.class, null)); + }); + os.write(json.toString().getBytes()); + } catch (IOException e) { + SystemLogger.error("Unable to write file " + asCacheFile.getPath(), CTX, e); + } + } + + resolveDomains(autoResolveDomains); + + if (config.isCachingDomains()) { + try (var os = new FileOutputStream(domainCacheFile)) { + var json = new JSONObject(); + domainCache.forEach((key, entry) -> { + json.put(String.valueOf(key), JSONUtility.serializeStructure(domainCache.get(key))); + }); + os.write(json.toString().getBytes()); + } catch (IOException e) { + SystemLogger.error("Unable to write file " + domainCacheFile.getPath(), CTX, e); + } + } + + var subnets = new HashSet<>(inputResources.getSubnets()); + asn.forEach(n -> { + var cached = prefixCache.get(n); + if (cached == null) { + return; + } + subnets.addAll(cached); + SystemLogger.message("Using " + cached.size() + " subnets from AS" + n, CTX); + }); + + //добавляем отрезолвенные домены + domains.forEach(domain -> { + var entry = domainCache.get(domain); + if (entry != null) { + subnets.addAll(entry.getAddresses().keySet()); + } + }); + + filteredResources.getSubnets().forEach(subnets::remove); + + SystemLogger.message("Trying to summary " + subnets.size() + " subnets...", CTX); + + var merged = IPv4Util.summarySubnets(subnets, config.getMergeSubnetsWithUsage()); + var unmerged = new AtomicInteger(); + subnets.forEach(subnet -> { + if (!merged.getMergedSubnets().contains(subnet)) { + unmerged.getAndIncrement(); + } + }); + + SystemLogger.message(subnets.size() + " subnets has been summarized and merged to " + merged.getResult().size() + " new subnets. Unmerged: " + unmerged.get(), CTX); + outputResources.setASN(Collections.unmodifiableList(asn)); outputResources.setSubnets(merged.getResult()); outputResources.setDomains(domains.stream().toList()); @@ -174,6 +217,42 @@ public class NetworkingService extends AppService { })); } + private void resolveDomains(List domains) { + var resolvedSubnets = new ArrayList(); + var resolver = context.getServiceManager().getService(ResolverService.class); + for (var domain : domains) { + var task = resolver.resolve(domain); + while (!task.isDone() && !task.isCancelled()) { + Thread.yield(); + } + try { + var subnets = task.get(); + var entry = domainCache.get(domain); + var addresses = entry.getAddresses(); + entry.setLastUpdate(Instant.now()); + subnets.forEach(subnet -> addresses.put(subnet, Instant.now())); + resolvedSubnets.addAll(domainCache.get(domain).getAddresses().keySet()); + } catch (InterruptedException | ExecutionException e) { + SystemLogger.error("Error happened while resolving domain " + domain, CTX, e); + } + } + + //remove old entries + for (var domain : domainCache.keySet()) { + var entry = domainCache.get(domain); + var addresses = entry.getAddresses(); + for (var subnet : addresses.keySet()) { + var time = addresses.get(subnet); + if (time.isBefore(Instant.now().minus(context.getConfig().getDomainsTimeToLive(), ChronoUnit.HOURS))) { + addresses.remove(subnet); + } + } + if (addresses.isEmpty()) { + domainCache.remove(domain); + } + } + } + /** * Fetches prefixes for the given autonomous systems and stores them in the cache. */ diff --git a/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverCacheEntry.java b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverCacheEntry.java new file mode 100644 index 0000000..60bfd21 --- /dev/null +++ b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverCacheEntry.java @@ -0,0 +1,22 @@ +package ru.kirillius.pf.sdn.core.Networking; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import ru.kirillius.json.JSONMapProperty; +import ru.kirillius.json.JSONProperty; +import ru.kirillius.json.JSONSerializable; + +import java.time.Instant; +import java.util.Map; + +@JSONSerializable +@Getter +@Setter +@NoArgsConstructor +public class ResolverCacheEntry { + @JSONProperty + private Instant lastUpdate; + @JSONMapProperty(keyType = IPv4Subnet.class, valueType = Instant.class) + private Map addresses; +} diff --git a/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverService.java b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverService.java new file mode 100644 index 0000000..e38d192 --- /dev/null +++ b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverService.java @@ -0,0 +1,39 @@ +package ru.kirillius.pf.sdn.core.Networking; + +import ru.kirillius.pf.sdn.core.AppService; +import ru.kirillius.pf.sdn.core.Context; +import ru.kirillius.pf.sdn.core.Util.DomainUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class ResolverService extends AppService { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final static String CTX = ResolverService.class.getSimpleName(); + + public Future> resolve(String host) { + return executor.submit(() -> { + var resolved = new ArrayList(); + for (var domainResolver : context.getConfig().getDomainResolvers()) { + DomainUtil.lookup(host, domainResolver).stream().map(addr -> new IPv4Subnet(addr, 32)).forEach(resolved::add); + } + return resolved; + }); + } + + public ResolverService(Context context) { + super(context); + } + + /** + * Removes event subscriptions and shuts down the executor. + */ + @Override + public void close() throws IOException { + executor.shutdown(); + } +} diff --git a/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverServiceObsolete.java b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverServiceObsolete.java new file mode 100644 index 0000000..74b0e6f --- /dev/null +++ b/core/src/main/java/ru/kirillius/pf/sdn/core/Networking/ResolverServiceObsolete.java @@ -0,0 +1,120 @@ +package ru.kirillius.pf.sdn.core.Networking; + +import lombok.Getter; +import org.json.JSONObject; +import org.json.JSONTokener; +import ru.kirillius.json.JSONUtility; +import ru.kirillius.pf.sdn.core.AppService; +import ru.kirillius.pf.sdn.core.Context; +import ru.kirillius.pf.sdn.core.Util.DomainUtil; +import ru.kirillius.utils.logging.SystemLogger; + +import java.io.*; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + + +public class ResolverServiceObsolete extends AppService { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private final static String CTX = ResolverServiceObsolete.class.getSimpleName(); + + private final File cacheFile; + + private final Map cacheEntries = new ConcurrentHashMap<>(); + private final Future backgroundWorker; + + public void saveCache() throws IOException { + try (var writer = new BufferedWriter(new FileWriter(cacheFile))) { + writer.write(JSONUtility.serializeMap(cacheEntries, String.class, ResolverCacheEntry.class, null, null).toString()); + } + } + + public void maintainCache() { + cacheEntries.keySet().forEach(host -> { + var entry = cacheEntries.get(host); + var lastSeen = entry.getLastSeen(); + for (var subnet : lastSeen.keySet()) { + if (lastSeen.get(subnet).isBefore(Instant.now().minus(context.getConfig().getDomainsTimeToLive(), ChronoUnit.HOURS))) { + lastSeen.remove(subnet); + } + } + }); + } + + public Future> resolve(String host, boolean useCache) { + return executor.submit(() -> { + if (useCache && cacheEntries.containsKey(host)) { + var cached = cacheEntries.get(host); + if (cached.getLastUpdate().isAfter(Instant.now().minus(context.getConfig().getDomainLookupInterval(), ChronoUnit.MINUTES))) { + return cached.getAddresses(); + } + } + var resolved = new HashSet(); + for (var domainResolver : context.getConfig().getDomainResolvers()) { + DomainUtil.lookup(host, domainResolver).stream().map(addr -> new IPv4Subnet(addr, 32)).forEach(resolved::add); + } + + if (useCache) { + if (!cacheEntries.containsKey(host)) { + cacheEntries.put(host, new ResolverCacheEntry()); + } + var cached = cacheEntries.get(host); + cached.setLastUpdate(Instant.now()); + cached.getAddresses().addAll(resolved); + var lastSeen = cached.getLastSeen(); + resolved.forEach(net -> lastSeen.put(net, Instant.now())); + resolved.addAll(cached.getAddresses()); + } + + return resolved.stream().toList(); + }); + } + + /** + * Creates the networking service, wiring subscriptions and restoring cached state. + */ + public ResolverServiceObsolete(Context context) { + super(context); + + cacheFile = new File(context.getConfig().getCacheDirectory(), "resolver-cache.json"); + if (cacheFile.exists() && context.getConfig().isCachingAS()) { + SystemLogger.message("Loading resolver cache file", CTX); + try (var is = new FileInputStream(cacheFile)) { + var json = new JSONObject(new JSONTokener(is)); + var deserialized = JSONUtility.deserializeMap(json, String.class, ResolverCacheEntry.class, null, null); + cacheEntries.putAll(deserialized); + } catch (Exception e) { + SystemLogger.error("Failed to load resolver cache file " + cacheFile.getPath(), CTX, e); + } + } + + backgroundWorker = executor.submit(this::autoResolve); + } + + @Getter + private final List autoLookupHosts = new CopyOnWriteArrayList<>(); + + private void autoResolve() { + try { + Thread.sleep(context.getConfig().getDomainLookupInterval()); + autoLookupHosts.stream().toList().forEach(host -> { + //TODO продумать обновление + }); + } catch (InterruptedException e) { + return; + } + } + + /** + * Removes event subscriptions and shuts down the executor. + */ + @Override + public void close() throws IOException { + backgroundWorker.cancel(true); + executor.shutdown(); + } +}