File size: 7,099 Bytes
1997c01
 
 
 
 
d189e4c
1997c01
edc370b
 
 
 
 
1997c01
 
 
 
 
 
 
6866b1f
a4d5793
83cd13d
d8e3d53
6400777
dc4b91d
 
a4d5793
 
d8e3d53
0b6419d
d8e3d53
0b6419d
d8e3d53
c024d74
a4d5793
e517d5e
a4d5793
f83432c
 
10608aa
 
 
 
 
 
 
0391643
 
 
10608aa
 
 
 
 
4dcad40
10608aa
 
 
 
0391643
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
850974d
 
 
0391643
850974d
0391643
 
 
 
 
 
 
 
9c17797
 
 
0391643
e517d5e
68e8a9a
d8e3d53
cba8adc
83cd13d
68e8a9a
3b7be5c
68e8a9a
 
 
 
 
 
 
cba8adc
c024d74
0391643
f83432c
 
a07c0ed
f83432c
 
 
 
 
 
 
0391643
f83432c
cd11506
 
 
 
 
0391643
cd11506
b65f10f
f83432c
cd11506
f83432c
cd11506
 
34833f4
 
 
 
 
cd11506
 
 
25d3bcd
 
4281a78
 
34833f4
196214b
cd11506
f83432c
a07c0ed
f83432c
 
0391643
0c12c76
f83432c
24becb3
c6a5618
 
1997c01
 
c6a5618
9c17797
 
1997c01
 
83cd13d
19ae57e
1997c01
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import gradio as gr
import pandas as pd
import numpy as np
import json
from io import StringIO
from collections import OrderedDict






def test(input_json):
    print("Received input")
    # Parse the input JSON string
    try:
        inputs = json.loads(input_json)
    except json.JSONDecodeError:
        inputs = json.loads(input_json.replace("'", '"'))

    # Accessing input data
    
    matrix = inputs['input']["matrix"]
    landuses = inputs['input']["landuse_areas"]
    
    attributeMapperDict = inputs['input']["attributeMapperDict"]
    landuseMapperDict = inputs['input']["landuseMapperDict"]
    
    alpha = inputs['input']["alpha"]
    alpha = float(alpha)
    threshold = inputs['input']["threshold"]
    threshold = float(threshold)
    
    df_matrix = pd.DataFrame(matrix).T
    df_landuses = pd.DataFrame(landuses).T
    df_matrix = df_matrix.round(0).astype(int)
    df_landuses = df_landuses.round(0).astype(int)


    """
    if len(A) == len(B):
        B.index = A
    else:
        print("The lengths do not match.")
    """
    
    # create a mask based on the matrix size and ids, crop activity nodes to the mask

    mask_connected = df_matrix.index.tolist()

    valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index]
    # Identify and report missing indexes
    missing_indexes = set(mask_connected) - set(valid_indexes)
    if missing_indexes:
        print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}")
    
    # Apply the filtered mask
    df_landuses_filtered = df_landuses.loc[valid_indexes]


    # find a set of unique domains, to which subdomains are aggregated
    
    temp = []

    for key, values in attributeMapperDict.items():
      domain = attributeMapperDict[key]['domain']
      for item in domain:
        if ',' in item:
          domain_list = item.split(',')
          attributeMapperDict[key]['domain'] = domain_list
          for domain in domain_list:
            temp.append(domain) 
        else:
          if item != 0: 
              temp.append(item)  
    
    domainsUnique = list(set(temp))


    # find a list of unique subdomains, to which land uses are aggregated

    temp = []
    
    for key, values in landuseMapperDict.items():
      subdomain = str(landuseMapperDict[key])
      if subdomain != 0: 
        temp.append(subdomain) 
        
    subdomainsUnique = list(set(temp))
    

    
    
    def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList):
        df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList)
    
        for subdomain in UniqueSubdomainsList:
            for lu, lu_subdomain in LanduseToSubdomainDict.items():
                if lu_subdomain == subdomain:
                    if lu in LanduseDf.columns:
                        df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0)
                    else:
                        print(f"Warning: Column '{lu}' not found in landuse database")
    
        return df_LivabilitySubdomainsArea

    

    LivabilitySubdomainsWeights = landusesToSubdomains(df_matrix,df_landuses_filtered,landuseMapperDict,subdomainsUnique)
    # make a dictionary to output in grasshopper / etc
    LivabilitySubdomainsWeights_dictionary = LivabilitySubdomainsWeights.to_dict('index')


    
    def computeAccessibility (DistanceMatrix,weightsNames, destinationWeights=None,alpha = 0.0038, threshold = 600):
    
        decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold)
        subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=weightsNames) #destinationWeights.columns)
        # for weighted accessibility (e. g. areas)
        if not destinationWeights.empty:
            for col,columnName in zip(destinationWeights.columns, weightsNames):
                subdomainsAccessibility[columnName] = (decay_factors * destinationWeights[col].values).sum(axis=1)
        # for unweighted accessibility (e. g. points of interest)
        else:
            for columnName in weightsNames:
                subdomainsAccessibility[columnName] = (decay_factors * 1).sum(axis=1)
        
        return subdomainsAccessibility
    
    subdomainsAccessibility = computeAccessibility(df_matrix,subdomainsUnique,LivabilitySubdomainsWeights,alpha,threshold)

    # make a dictionary to output in grasshopper / etc
    subdomainsAccessibility_dictionary = subdomainsAccessibility.to_dict('index')


    def remap(value, B_min, B_max, C_min, C_max):
        return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min))    


    
    def accessibilityToLivability (DistanceMatrix,subdomainsAccessibility, SubdomainAttributeDict,UniqueDomainsList):
    
        livability = pd.DataFrame(index=DistanceMatrix.index, columns=subdomainsAccessibility.columns)
        livability.fillna(0, inplace=True)
        
        
             
        for domain in UniqueDomainsList:
            livability[domain] = 0
    
    
        # remap accessibility to livability points
        
        for key, values in SubdomainAttributeDict.items():
            if key in subdomainsAccessibility.columns:
                domain = [str(item) for item in SubdomainAttributeDict[key]['domain']]
                threshold = float(SubdomainAttributeDict[key]['thresholds'])
                max_livability = float(SubdomainAttributeDict[key]['max_points'])
                sqm_per_employee = SubdomainAttributeDict[key]['sqmPerEmpl']
                
                livability_score = remap(subdomainsAccessibility[key], 0, threshold, 0, max_livability)
                livability.loc[subdomainsAccessibility[key] >= threshold, key] = max_livability
                livability.loc[subdomainsAccessibility[key] < threshold, key] = livability_score
                if any(domain):
                    for item in domain:
                        livability.loc[subdomainsAccessibility[key] >= threshold, item] += max_livability
                        livability.loc[subdomainsAccessibility[key] < threshold, item] += livability_score


        return livability
    
    
    

    livability = accessibilityToLivability(df_matrix,subdomainsAccessibility,attributeMapperDict,domainsUnique)
    livability_dictionary = livability.to_dict('index')

    #columnList = domainsUnique+subdomainsUnique

    
    # Prepare the output
    output = {
        "subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary,
        "livability_dictionary": livability_dictionary,
        "subdomainsArea_dictionary": LivabilitySubdomainsWeights_dictionary
    }


    
    return json.dumps(output)

    # Define the Gradio interface with a single JSON input
iface = gr.Interface(
    fn=test,
    inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."),
    outputs=gr.JSON(label="Output JSON"),
    title="testspace"
)

iface.launch()